engine: local, lang: core: local: Add a pool function

This adds a new local API for pool allocation, and with it a
corresponding function in the "local" import.
This commit is contained in:
James Shubin
2024-10-04 22:37:39 -04:00
parent 898b58e3e7
commit c937280664
3 changed files with 362 additions and 0 deletions

View File

@@ -38,6 +38,7 @@ import (
"fmt"
"os"
"path"
"strconv"
"strings"
"sync"
@@ -59,6 +60,10 @@ type API struct {
// VarDirImpl is the implementation for the VarDir API's. The API's are
// the collection of public methods that exist on this struct.
*VarDirImpl
// PoolImpl is the implementation for the Pool API's. The API's are the
// collection of public methods that exist on this struct.
*PoolImpl
}
// Init initializes the API before first use. It returns itself so it can be
@@ -78,6 +83,13 @@ func (obj *API) Init() *API {
Logf: obj.Logf,
})
obj.PoolImpl = &PoolImpl{}
obj.PoolImpl.Init(&PoolInit{
Prefix: obj.Prefix,
Debug: obj.Debug,
Logf: obj.Logf,
})
return obj
}
@@ -421,3 +433,162 @@ func (obj *VarDirImpl) getPrefix() (string, error) {
return obj.prefix, nil
}
// PoolInit are the init values that the Pool API needs to work correctly.
type PoolInit struct {
Prefix string
Debug bool
Logf func(format string, v ...interface{})
}
// PoolConfig configures how the Pool operates.
// XXX: These are not implemented yet.
type PoolConfig struct {
// Expiry specifies that we expire old values that have not been read
// for this many seconds. Zero disables this and they never expire.
Expiry int64 // TODO: or time.Time ?
// Random lets you allocate a random integer instead of sequential ones.
Random bool
// Max specifies the maximum integer to allocate.
Max int
}
// PoolImpl is the implementation for the Pool API's. The API's are the
// collection of public methods that exist on this struct.
type PoolImpl struct {
init *PoolInit
mutex *sync.Mutex
prefix string
prefixExists bool // is it okay to use the prefix?
}
// Init runs some initialization code for the Pool API.
func (obj *PoolImpl) Init(init *PoolInit) {
obj.init = init
obj.mutex = &sync.Mutex{}
obj.prefix = fmt.Sprintf("%s/", path.Join(obj.init.Prefix, "pool"))
}
// Pool returns a unique integer from a pool of numbers. Within a given
// namespace, it returns the same integer for a given name. It is a simple
// mechanism to allocate numbers to different inputs when we don't have a
// hashing alternative. It does not allocate zero.
func (obj *PoolImpl) Pool(ctx context.Context, namespace, uid string, config *PoolConfig) (int, error) {
if namespace == "" {
return 0, fmt.Errorf("namespace is empty")
}
if strings.Contains(namespace, "/") {
return 0, fmt.Errorf("namespace contains slash")
}
if uid == "" {
return 0, fmt.Errorf("uid is empty")
}
if strings.Contains(uid, "/") {
return 0, fmt.Errorf("uid contains slash")
}
prefix, err := obj.getPrefix()
if err != nil {
return 0, err
}
dir := fmt.Sprintf("%s/", path.Join(prefix, namespace))
file := fmt.Sprintf("%s.uid", path.Join(dir, uid)) // file
// TODO: Run clean up funcs here to get rid of any stale/expired values.
// TODO: This will happen based on the future config options we build...
obj.mutex.Lock()
defer obj.mutex.Unlock()
if err := os.MkdirAll(dir, 0755); err != nil {
return 0, err
}
fn := func(p string) (int, error) {
b, err := os.ReadFile(p)
if err != nil && !os.IsNotExist(err) {
return 0, err // real error
}
if err != nil {
return 0, nil // absent!
}
// File exists!
d, err := strconv.Atoi(strings.TrimSpace(string(b)))
if err != nil {
// Someone put corrupt data in a uid file.
return 0, err // real error
}
return d, nil // value already allocated!
}
d, err := fn(file)
if err != nil {
return 0, err // real error
}
if d != 0 {
return d, nil // Value already allocated! We're done early.
}
// Not found, so find the max. (0 without error means not found!)
files, err := os.ReadDir(dir) // ([]os.DirEntry, error)
if err != nil {
return 0, err // real error
}
m := 0
for _, f := range files {
if f.IsDir() {
continue // unexpected!
}
d, err := fn(path.Join(dir, f.Name()))
if err != nil {
return 0, err // real error
}
if d == 0 {
// Must be someone deleting files without our mutex!
return 0, fmt.Errorf("unexpected missing file")
}
m = max(m, d)
}
m++ // increment
data := []byte(fmt.Sprintf("%d\n", m)) // it's polite to end with \n
if err := os.WriteFile(file, data, 0600); err != nil {
return 0, err
}
return m, nil
}
// getPrefix gets the prefix dir to use, or errors if it can't make one. It
// makes it on first use, and returns quickly from any future calls to it.
func (obj *PoolImpl) getPrefix() (string, error) {
// NOTE: Moving this mutex to just below the first early return, would
// be a benign race, but as it turns out, it's possible that a compiler
// would see this behaviour as "undefined" and things might not work as
// intended. It could perhaps be replaced with a sync/atomic primitive
// if we wanted better performance here.
obj.mutex.Lock()
defer obj.mutex.Unlock()
if obj.prefixExists { // former race read
return obj.prefix, nil
}
// MkdirAll instead of Mkdir because we have no idea if the parent
// local/ directory was already made yet or not. (If at all.) If path is
// already a directory, MkdirAll does nothing and returns nil. (Good!)
// TODO: I hope MkdirAll is thread-safe on path creation in case another
// future local API tries to make the base (parent) directory too!
if err := os.MkdirAll(obj.prefix, 0755); err != nil {
return "", err
}
obj.prefixExists = true // former race write
return obj.prefix, nil
}