diff --git a/engine/local/local.go b/engine/local/local.go index 1581d5d0..45ed1f55 100644 --- a/engine/local/local.go +++ b/engine/local/local.go @@ -38,6 +38,7 @@ import ( "fmt" "os" "path" + "strconv" "strings" "sync" @@ -59,6 +60,10 @@ type API struct { // VarDirImpl is the implementation for the VarDir API's. The API's are // the collection of public methods that exist on this struct. *VarDirImpl + + // PoolImpl is the implementation for the Pool API's. The API's are the + // collection of public methods that exist on this struct. + *PoolImpl } // Init initializes the API before first use. It returns itself so it can be @@ -78,6 +83,13 @@ func (obj *API) Init() *API { Logf: obj.Logf, }) + obj.PoolImpl = &PoolImpl{} + obj.PoolImpl.Init(&PoolInit{ + Prefix: obj.Prefix, + Debug: obj.Debug, + Logf: obj.Logf, + }) + return obj } @@ -421,3 +433,162 @@ func (obj *VarDirImpl) getPrefix() (string, error) { return obj.prefix, nil } + +// PoolInit are the init values that the Pool API needs to work correctly. +type PoolInit struct { + Prefix string + Debug bool + Logf func(format string, v ...interface{}) +} + +// PoolConfig configures how the Pool operates. +// XXX: These are not implemented yet. +type PoolConfig struct { + // Expiry specifies that we expire old values that have not been read + // for this many seconds. Zero disables this and they never expire. + Expiry int64 // TODO: or time.Time ? + + // Random lets you allocate a random integer instead of sequential ones. + Random bool + + // Max specifies the maximum integer to allocate. + Max int +} + +// PoolImpl is the implementation for the Pool API's. The API's are the +// collection of public methods that exist on this struct. +type PoolImpl struct { + init *PoolInit + mutex *sync.Mutex + prefix string + prefixExists bool // is it okay to use the prefix? +} + +// Init runs some initialization code for the Pool API. +func (obj *PoolImpl) Init(init *PoolInit) { + obj.init = init + obj.mutex = &sync.Mutex{} + obj.prefix = fmt.Sprintf("%s/", path.Join(obj.init.Prefix, "pool")) +} + +// Pool returns a unique integer from a pool of numbers. Within a given +// namespace, it returns the same integer for a given name. It is a simple +// mechanism to allocate numbers to different inputs when we don't have a +// hashing alternative. It does not allocate zero. +func (obj *PoolImpl) Pool(ctx context.Context, namespace, uid string, config *PoolConfig) (int, error) { + if namespace == "" { + return 0, fmt.Errorf("namespace is empty") + } + if strings.Contains(namespace, "/") { + return 0, fmt.Errorf("namespace contains slash") + } + if uid == "" { + return 0, fmt.Errorf("uid is empty") + } + if strings.Contains(uid, "/") { + return 0, fmt.Errorf("uid contains slash") + } + + prefix, err := obj.getPrefix() + if err != nil { + return 0, err + } + + dir := fmt.Sprintf("%s/", path.Join(prefix, namespace)) + file := fmt.Sprintf("%s.uid", path.Join(dir, uid)) // file + + // TODO: Run clean up funcs here to get rid of any stale/expired values. + // TODO: This will happen based on the future config options we build... + + obj.mutex.Lock() + defer obj.mutex.Unlock() + if err := os.MkdirAll(dir, 0755); err != nil { + return 0, err + } + + fn := func(p string) (int, error) { + b, err := os.ReadFile(p) + if err != nil && !os.IsNotExist(err) { + return 0, err // real error + } + if err != nil { + return 0, nil // absent! + } + + // File exists! + d, err := strconv.Atoi(strings.TrimSpace(string(b))) + if err != nil { + // Someone put corrupt data in a uid file. + return 0, err // real error + } + return d, nil // value already allocated! + } + + d, err := fn(file) + if err != nil { + return 0, err // real error + } + if d != 0 { + return d, nil // Value already allocated! We're done early. + } + + // Not found, so find the max. (0 without error means not found!) + + files, err := os.ReadDir(dir) // ([]os.DirEntry, error) + if err != nil { + return 0, err // real error + } + + m := 0 + for _, f := range files { + if f.IsDir() { + continue // unexpected! + } + d, err := fn(path.Join(dir, f.Name())) + if err != nil { + return 0, err // real error + } + if d == 0 { + // Must be someone deleting files without our mutex! + return 0, fmt.Errorf("unexpected missing file") + } + + m = max(m, d) + } + + m++ // increment + data := []byte(fmt.Sprintf("%d\n", m)) // it's polite to end with \n + if err := os.WriteFile(file, data, 0600); err != nil { + return 0, err + } + + return m, nil +} + +// getPrefix gets the prefix dir to use, or errors if it can't make one. It +// makes it on first use, and returns quickly from any future calls to it. +func (obj *PoolImpl) getPrefix() (string, error) { + // NOTE: Moving this mutex to just below the first early return, would + // be a benign race, but as it turns out, it's possible that a compiler + // would see this behaviour as "undefined" and things might not work as + // intended. It could perhaps be replaced with a sync/atomic primitive + // if we wanted better performance here. + obj.mutex.Lock() + defer obj.mutex.Unlock() + + if obj.prefixExists { // former race read + return obj.prefix, nil + } + + // MkdirAll instead of Mkdir because we have no idea if the parent + // local/ directory was already made yet or not. (If at all.) If path is + // already a directory, MkdirAll does nothing and returns nil. (Good!) + // TODO: I hope MkdirAll is thread-safe on path creation in case another + // future local API tries to make the base (parent) directory too! + if err := os.MkdirAll(obj.prefix, 0755); err != nil { + return "", err + } + obj.prefixExists = true // former race write + + return obj.prefix, nil +} diff --git a/examples/lang/pool.mcl b/examples/lang/pool.mcl new file mode 100644 index 00000000..6648889f --- /dev/null +++ b/examples/lang/pool.mcl @@ -0,0 +1,19 @@ +# Remember that across runs if you use --tmp-prefix, this will give you new +# values each time! The local prefix is where the pool of values is taken from! +import "fmt" +import "local" + +$ns = "my_namespace" +$i = local.pool($ns, "james") # the uid "james" will always return the same int +$j = local.pool($ns, "purple") # this is like a pool based allocator + +print "value:i" { + msg => fmt.printf("i: %d", $i), + + Meta:autogroup => false, +} +print "value:j" { + msg => fmt.printf("j: %d", $j), + + Meta:autogroup => false, +} diff --git a/lang/core/local/pool_func.go b/lang/core/local/pool_func.go new file mode 100644 index 00000000..dc2f3f4d --- /dev/null +++ b/lang/core/local/pool_func.go @@ -0,0 +1,172 @@ +// Mgmt +// Copyright (C) 2013-2024+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . +// +// Additional permission under GNU GPL version 3 section 7 +// +// If you modify this program, or any covered work, by linking or combining it +// with embedded mcl code and modules (and that the embedded mcl code and +// modules which link with this program, contain a copy of their source code in +// the authoritative form) containing parts covered by the terms of any other +// license, the licensors of this program grant you additional permission to +// convey the resulting work. Furthermore, the licensors of this program grant +// the original author, James Shubin, additional permission to update this +// additional permission if he deems it necessary to achieve the goals of this +// additional permission. + +package corelocal + +import ( + "context" + "fmt" + + "github.com/purpleidea/mgmt/lang/funcs" + "github.com/purpleidea/mgmt/lang/interfaces" + "github.com/purpleidea/mgmt/lang/types" +) + +const ( + // PoolFuncName is the name this function is registered as. + PoolFuncName = "pool" + + // arg names... + absPathArgNameNamespace = "namespace" + absPathArgNameUID = "uid" + absPathArgNameConfig = "config" +) + +func init() { + // TODO: Add a "world" version of this function that picks a value from + // the global "world" pool that all nodes share. + funcs.ModuleRegister(ModuleName, PoolFuncName, func() interfaces.Func { return &PoolFunc{} }) // must register the func and name +} + +// PoolFunc is a function that returns a unique integer from a pool of numbers. +// Within a given namespace, it returns the same integer for a given name. It is +// a simple mechanism to allocate numbers to different inputs when we don't have +// a hashing alternative. It does not allocate zero. +type PoolFunc struct { + init *interfaces.Init + data *interfaces.FuncData + last types.Value // last value received to use for diff +} + +// String returns a simple name for this function. This is needed so this struct +// can satisfy the pgraph.Vertex interface. +func (obj *PoolFunc) String() string { + return PoolFuncName +} + +// SetData is used by the language to pass our function some code-level context. +func (obj *PoolFunc) SetData(data *interfaces.FuncData) { + obj.data = data +} + +// ArgGen returns the Nth arg name for this function. +func (obj *PoolFunc) ArgGen(index int) (string, error) { + seq := []string{absPathArgNameNamespace, absPathArgNameUID, absPathArgNameConfig} + if l := len(seq); index >= l { + return "", fmt.Errorf("index %d exceeds arg length of %d", index, l) + } + return seq[index], nil +} + +// Validate makes sure we've built our struct properly. It is usually unused for +// normal functions that users can use directly. +func (obj *PoolFunc) Validate() error { + return nil +} + +// Info returns some static info about itself. +func (obj *PoolFunc) Info() *interfaces.Info { + return &interfaces.Info{ + Pure: true, + Memo: false, + Sig: types.NewType(fmt.Sprintf("func(%s str, %s str) int", absPathArgNameNamespace, absPathArgNameUID)), + // TODO: add an optional config arg + //Sig: types.NewType(fmt.Sprintf("func(%s str, %s str, %s struct{}) int", absPathArgNameNamespace, absPathArgNameUID, absPathArgNameConfig)), + } +} + +// Init runs some startup code for this function. +func (obj *PoolFunc) Init(init *interfaces.Init) error { + obj.init = init + if obj.data == nil { + // programming error + return fmt.Errorf("missing function data") + } + return nil +} + +// Stream returns the changing values that this func has over time. +func (obj *PoolFunc) Stream(ctx context.Context) error { + defer close(obj.init.Output) // the sender closes + var value types.Value + for { + select { + case input, ok := <-obj.init.Input: + if !ok { + obj.init.Input = nil // don't infinite loop back + continue // no more inputs, but don't return! + } + //if err := input.Type().Cmp(obj.Info().Sig.Input); err != nil { + // return errwrap.Wrapf(err, "wrong function input") + //} + + if obj.last != nil && input.Cmp(obj.last) == nil { + continue // value didn't change, skip it + } + obj.last = input // store for next + + args, err := interfaces.StructToCallableArgs(input) // []types.Value, error) + if err != nil { + return err + } + result, err := obj.Call(ctx, args) + if err != nil { + return err + } + value = result + + case <-ctx.Done(): + return nil + } + + select { + case obj.init.Output <- value: + case <-ctx.Done(): + return nil + } + } +} + +// Call this function with the input args and return the value if it is possible +// to do so at this time. +func (obj *PoolFunc) Call(ctx context.Context, input []types.Value) (types.Value, error) { + // Validation of these inputs happens in the Local API which does it. + namespace := input[0].Str() + uid := input[1].Str() + // TODO: pass in config + //config := input[2].???() + + result, err := obj.init.Local.Pool(ctx, namespace, uid, nil) + if err != nil { + return nil, err + } + return &types.IntValue{ + V: int64(result), + }, nil +}