Files
mgmt/lang/funcs/engine.go
James Shubin b19583e7d3 lang: Initial implementation of the mgmt language
This is an initial implementation of the mgmt language. It is a
declarative (immutable) functional, reactive, domain specific
programming language. It is intended to be a language that is:

* safe
* powerful
* easy to reason about

With these properties, we hope this language, and the mgmt engine will
allow you to model the real-time systems that you'd like to automate.

This also includes a number of other associated changes. Sorry for the
large size of this patch.
2018-01-20 08:09:29 -05:00

609 lines
18 KiB
Go

// Mgmt
// Copyright (C) 2013-2018+ James Shubin and the project contributors
// Written by James Shubin <james@shubin.ca> and the project contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package funcs
import (
"fmt"
"strings"
"sync"
"github.com/purpleidea/mgmt/lang/interfaces"
"github.com/purpleidea/mgmt/lang/types"
"github.com/purpleidea/mgmt/pgraph"
"github.com/purpleidea/mgmt/resources"
multierr "github.com/hashicorp/go-multierror"
errwrap "github.com/pkg/errors"
)
// State represents the state of a function vertex. This corresponds to an AST
// expr, which is the memory address (pointer) in the graph.
type State struct {
Expr interfaces.Expr // pointer to the expr vertex
handle interfaces.Func // the function (if not nil, we've found it on init)
init bool // have we run Init on our func?
ready bool // has it received all the args it needs at least once?
loaded bool // has the func run at least once ?
closed bool // did we close ourself down?
notify chan struct{} // ping here when new input values exist
input chan types.Value // the top level type must be a struct
output chan types.Value
}
// Init creates the function state if it can be found in the registered list.
func (obj *State) Init() error {
handle, err := obj.Expr.Func() // build one and store it, don't re-gen
if err != nil {
return err
}
if err := handle.Validate(); err != nil {
return errwrap.Wrapf(err, "could not validate func")
}
obj.handle = handle
sig := obj.handle.Info().Sig
if sig.Kind != types.KindFunc {
return fmt.Errorf("must be kind func")
}
if len(sig.Ord) > 0 {
// since we accept input, better get our notification chan built
obj.notify = make(chan struct{})
}
obj.input = make(chan types.Value) // we close this when we're done
obj.output = make(chan types.Value) // we create it, func closes it
return nil
}
// String satisfies fmt.Stringer so that these print nicely.
func (obj *State) String() string {
return obj.Expr.String()
}
// Edge links an output vertex (value) to an input vertex with a named argument.
type Edge struct {
Args []string // list of named args that this edge sends to
}
// String displays the list of arguments this edge satisfies. It is a required
// property to be a valid pgraph.Edge.
func (obj *Edge) String() string {
return strings.Join(obj.Args, ", ")
}
// Engine represents the running time varying directed acyclic function graph.
type Engine struct {
Graph *pgraph.Graph
Hostname string
World resources.World
Debug bool
Logf func(format string, v ...interface{})
// Glitch: https://en.wikipedia.org/wiki/Reactive_programming#Glitches
Glitch bool // allow glitching? (more responsive, but less accurate)
ag chan error // used to aggregate fact events without reflect
agLock *sync.Mutex
agCount int // last one turns out the light (closes the ag channel)
topologicalSort []pgraph.Vertex // cached sorting of the graph for perf
state map[pgraph.Vertex]*State // state associated with the vertex
mutex *sync.RWMutex // concurrency guard for the table map
table map[pgraph.Vertex]types.Value // live table of output values
loaded bool // are all of the funcs loaded?
loadedChan chan struct{} // funcs loaded signal
streamChan chan error // signals a new graph can be created or problem
closeChan chan struct{} // close signal
wg *sync.WaitGroup
}
// Init initializes the struct. This is the first call you must make. Do not
// proceed with calls to other methods unless this succeeds first. This also
// loads all the functions by calling Init on each one in the graph.
// TODO: should Init take the graph as an input arg to keep it as a private field?
func (obj *Engine) Init() error {
obj.ag = make(chan error)
obj.agLock = &sync.Mutex{}
obj.state = make(map[pgraph.Vertex]*State)
obj.mutex = &sync.RWMutex{}
obj.table = make(map[pgraph.Vertex]types.Value)
obj.loadedChan = make(chan struct{})
obj.streamChan = make(chan error)
obj.closeChan = make(chan struct{})
obj.wg = &sync.WaitGroup{}
topologicalSort, err := obj.Graph.TopologicalSort()
if err != nil {
return errwrap.Wrapf(err, "topo sort failed")
}
obj.topologicalSort = topologicalSort // cache the result
for _, vertex := range obj.Graph.Vertices() {
// is this an interface we can use?
if _, exists := obj.state[vertex]; exists {
return fmt.Errorf("vertex (%+v) is not unique in the graph", vertex)
}
expr, ok := vertex.(interfaces.Expr)
if !ok {
return fmt.Errorf("vertex (%+v) was not an expr", vertex)
}
obj.state[vertex] = &State{Expr: expr} // store some state!
if e := obj.state[vertex].Init(); e != nil {
err = multierr.Append(err, e) // list of errors
}
if obj.Debug {
obj.Logf("Loading func `%s`", vertex)
}
}
if err != nil { // usually due to `not found` errors
return errwrap.Wrapf(err, "could not load requested funcs")
}
return nil
}
// Validate the graph type checks properly and other tests. Must run Init first.
// This should check that: (1) all vertices have the correct number of inputs,
// (2) that the *Info signatures all match correctly, (3) that the argument
// names match correctly, and that the whole graph is statically correct.
func (obj *Engine) Validate() error {
inList := func(needle interfaces.Func, haystack []interfaces.Func) bool {
if needle == nil {
panic("nil value passed to inList") // catch bugs!
}
for _, x := range haystack {
if needle == x {
return true
}
}
return false
}
var err error
ptrs := []interfaces.Func{} // Func is a ptr
for _, vertex := range obj.Graph.Vertices() {
node := obj.state[vertex]
// TODO: this doesn't work for facts because they're in the Func
// duplicate pointers would get closed twice, causing a panic...
if inList(node.handle, ptrs) { // check for duplicate ptrs!
e := fmt.Errorf("vertex `%s` has duplicate ptr", vertex)
err = multierr.Append(err, e)
}
ptrs = append(ptrs, node.handle)
}
for _, edge := range obj.Graph.Edges() {
if _, ok := edge.(*Edge); !ok {
e := fmt.Errorf("edge `%s` was not the correct type", edge)
err = multierr.Append(err, e)
}
}
if err != nil {
return err // stage the errors so the user can fix many at once!
}
// check if vertices expecting inputs have them
for vertex, count := range obj.Graph.InDegree() {
node := obj.state[vertex]
if exp := len(node.handle.Info().Sig.Ord); exp != count {
e := fmt.Errorf("expected %d inputs to `%s`, got %d", exp, node, count)
err = multierr.Append(err, e)
}
}
// expected vertex -> argName
expected := make(map[*State]map[string]int) // expected input fields
for vertex1 := range obj.Graph.Adjacency() {
// check for outputs that don't go anywhere?
//node1 := obj.state[vertex1]
//if len(obj.Graph.Adjacency()[vertex1]) == 0 { // no vertex1 -> vertex2
// if node1.handle.Info().Sig.Output != nil {
// // an output value goes nowhere...
// }
//}
for vertex2 := range obj.Graph.Adjacency()[vertex1] { // populate
node2 := obj.state[vertex2]
expected[node2] = make(map[string]int)
for _, key := range node2.handle.Info().Sig.Ord {
expected[node2][key] = 1
}
}
}
for vertex1 := range obj.Graph.Adjacency() {
node1 := obj.state[vertex1]
for vertex2, edge := range obj.Graph.Adjacency()[vertex1] {
node2 := obj.state[vertex2]
edge := edge.(*Edge)
// check vertex1 -> vertex2 (with e) is valid
for _, arg := range edge.Args { // loop over each arg
sig := node2.handle.Info().Sig
if len(sig.Ord) == 0 {
e := fmt.Errorf("no input expected from `%s` to `%s` with arg `%s`", node1, node2, arg)
err = multierr.Append(err, e)
continue
}
if count, exists := expected[node2][arg]; !exists {
e := fmt.Errorf("wrong input name from `%s` to `%s` with arg `%s`", node1, node2, arg)
err = multierr.Append(err, e)
} else if count == 0 {
e := fmt.Errorf("duplicate input from `%s` to `%s` with arg `%s`", node1, node2, arg)
err = multierr.Append(err, e)
}
expected[node2][arg]-- // subtract one use
out := node1.handle.Info().Sig.Out
if out == nil {
e := fmt.Errorf("no output possible from `%s` to `%s` with arg `%s`", node1, node2, arg)
err = multierr.Append(err, e)
continue
}
typ, exists := sig.Map[arg] // key in struct
if !exists {
// second check of this!
e := fmt.Errorf("wrong input name from `%s` to `%s` with arg `%s`", node1, node2, arg)
err = multierr.Append(err, errwrap.Wrapf(e, "programming error"))
continue
}
if typ.Kind == types.KindVariant { // FIXME: hack for now
// pass (input arg variants)
} else if out.Kind == types.KindVariant { // FIXME: hack for now
// pass (output arg variants)
} else if typ.Cmp(out) != nil {
e := fmt.Errorf("type mismatch from `%s` (%s) to `%s` (%s) with arg `%s`", node1, out, node2, typ, arg)
err = multierr.Append(err, e)
}
}
}
}
// check for leftover function inputs which weren't filled up by outputs
// (we're trying to call a function with fewer input args than required)
for node, m := range expected { // map[*State]map[string]int
for arg, count := range m {
if count != 0 { // count should be zero if all were used
e := fmt.Errorf("missing input to `%s` on arg `%s`", node, arg)
err = multierr.Append(err, e)
}
}
}
if err != nil {
return err // stage the errors so the user can fix many at once!
}
return nil
}
// Run starts up this function engine and gets it all running. It errors if the
// startup failed for some reason. On success, use the Stream and Table methods
// for future interaction with the engine, and the Close method to shut it off.
func (obj *Engine) Run() error {
if len(obj.topologicalSort) == 0 { // no funcs to load!
close(obj.loadedChan)
close(obj.streamChan)
return nil
}
// TODO: build a timer that runs while we wait for all funcs to startup.
// after some delay print a message to tell us which funcs we're waiting
// for to startup and that they are slow and blocking everyone, and then
// fail permanently after the timeout so that bad code can't block this!
// loop through all funcs that we might need
obj.agAdd(len(obj.topologicalSort))
for _, vertex := range obj.topologicalSort {
node := obj.state[vertex]
if obj.Debug {
obj.Logf("Startup func `%s`", node)
}
incoming := obj.Graph.IncomingGraphVertices(vertex) // []Vertex
init := &interfaces.Init{
Hostname: obj.Hostname,
Input: node.input,
Output: node.output,
World: obj.World,
Debug: obj.Debug,
Logf: func(format string, v ...interface{}) {
obj.Logf("func: "+format, v...)
},
}
if err := node.handle.Init(init); err != nil {
return errwrap.Wrapf(err, "could not init func `%s`", node)
}
node.init = true // we've successfully initialized
// no incoming edges, so no incoming data
if len(incoming) == 0 { // TODO: do this here or earlier?
close(node.input)
} else {
// process function input data
obj.wg.Add(1)
go func(vertex pgraph.Vertex) {
node := obj.state[vertex]
defer obj.wg.Done()
defer close(node.input)
var ready bool
// the final closing output to this, closes this
for range node.notify { // new input values
// now build the struct if we can...
ready = true // assume for now...
si := &types.Type{
// input to functions are structs
Kind: types.KindStruct,
Map: node.handle.Info().Sig.Map,
Ord: node.handle.Info().Sig.Ord,
}
st := types.NewStruct(si)
for _, v := range incoming {
args := obj.Graph.Adjacency()[v][vertex].(*Edge).Args
from := obj.state[v]
obj.mutex.RLock()
value, exists := obj.table[v]
obj.mutex.RUnlock()
if !exists {
ready = false // nope!
break
}
// set each arg, since one value
// could get used for multiple
// function inputs (shared edge)
for _, arg := range args {
err := st.Set(arg, value) // populate struct
if err != nil {
panic(fmt.Sprintf("struct set failure on `%s` from `%s`: %v", node, from, err))
}
}
}
if !ready {
continue
}
select {
case node.input <- st: // send to function
case <-obj.closeChan:
return
}
}
}(vertex)
}
obj.wg.Add(1)
go func(vertex pgraph.Vertex) { // run function
node := obj.state[vertex]
defer obj.wg.Done()
if obj.Debug {
obj.Logf("Running func `%s`", node)
}
err := node.handle.Stream()
if obj.Debug {
obj.Logf("Exiting func `%s`", node)
}
if err != nil {
// we closed with an error...
err := errwrap.Wrapf(err, "problem streaming func `%s`", node)
select {
case obj.ag <- err: // send to aggregate channel
case <-obj.closeChan:
return
}
}
}(vertex)
obj.wg.Add(1)
go func(vertex pgraph.Vertex) { // process function output data
node := obj.state[vertex]
defer obj.wg.Done()
defer obj.agDone(vertex)
outgoing := obj.Graph.OutgoingGraphVertices(vertex) // []Vertex
for value := range node.output { // read from channel
obj.mutex.RLock()
cached, exists := obj.table[vertex]
obj.mutex.RUnlock()
if !exists { // first value received
node.loaded = true
obj.Logf("func `%s` started", node)
} else if value.Cmp(cached) == nil {
// skip if new value is same as previous
// if this happens often, it *might* be
// a bug in the function implementation
// FIXME: do we need to disable engine
// caching when using hysteresis?
obj.Logf("func `%s` skipped", node)
continue
}
obj.mutex.Lock()
// XXX: maybe we can get rid of the table...
obj.table[vertex] = value // save the latest
if err := node.Expr.SetValue(value); err != nil {
panic(fmt.Sprintf("could not set value for `%s`: %+v", node, err))
}
obj.mutex.Unlock()
obj.Logf("func `%s` changed", node)
// FIXME: will this actually prevent glitching?
// if we only notify the aggregate channel when
// we're at the bottom of the topo sort (eg: no
// outgoing vertices to notify) then we'll have
// a glitch free subtree in the programs ast...
if obj.Glitch || len(outgoing) == 0 {
select {
case obj.ag <- nil: // send to aggregate channel
case <-obj.closeChan:
return
}
}
// notify the receiving vertices
for _, v := range outgoing {
node := obj.state[v]
select {
case node.notify <- struct{}{}:
case <-obj.closeChan:
return
}
}
}
// no more output values are coming...
obj.Logf("func `%s` stopped", node)
}(vertex)
}
// send event on streamChan when any of the (aggregated) facts change
obj.wg.Add(1)
go func() {
defer obj.wg.Done()
defer close(obj.streamChan)
Loop:
for {
var err error
var ok bool
select {
case err, ok = <-obj.ag: // aggregated channel
if !ok {
break Loop // channel shutdown
}
if !obj.loaded {
// now check if we're ready
var loaded = true // initially assume true
for _, vertex := range obj.topologicalSort {
node := obj.state[vertex]
if !node.loaded {
loaded = false // we were wrong
break
}
}
obj.loaded = loaded
if obj.loaded {
// this causes an initial start
// signal to be sent out below,
// since the stream sender runs
if obj.Debug {
obj.Logf("loaded")
}
close(obj.loadedChan) // signal
} else {
if err == nil {
continue // not ready to send signal
} // pass errors through...
}
}
case <-obj.closeChan:
return
}
// send stream signal
select {
// send events or errors on streamChan, eg: func failure
case obj.streamChan <- err: // send
if err != nil {
return
}
case <-obj.closeChan:
return
}
}
}()
return nil
}
// agAdd registers a user on the ag channel.
func (obj *Engine) agAdd(i int) {
defer obj.agLock.Unlock()
obj.agLock.Lock()
obj.agCount += i
}
// agDone closes the channel if we're the last one using it.
func (obj *Engine) agDone(vertex pgraph.Vertex) {
defer obj.agLock.Unlock()
obj.agLock.Lock()
node := obj.state[vertex]
node.closed = true
// FIXME: (perf) cache this into a table which we narrow down with each
// successive call. look at the outgoing vertices that I would affect...
for _, v := range obj.Graph.OutgoingGraphVertices(vertex) { // close for each one
// now determine who provides inputs to that vertex...
var closed = true
for _, vv := range obj.Graph.IncomingGraphVertices(v) {
// are they all closed?
if !obj.state[vv].closed {
closed = false
break
}
}
if closed { // if they're all closed, we can close the input
close(obj.state[v].notify)
}
}
if obj.agCount == 0 {
close(obj.ag)
}
}
// Stream returns a channel of engine events. Wait for nil events to know when
// the Table map has changed. An error event means this will shutdown shortly.
// Do not run the Table function before we've received one non-error event.
func (obj *Engine) Stream() chan error {
return obj.streamChan
}
// Close shuts down the function engine. It waits till everything has finished.
func (obj *Engine) Close() error {
var err error
for _, vertex := range obj.topologicalSort { // FIXME: should we do this in reverse?
node := obj.state[vertex]
if node.init { // did we Init this func?
if e := node.handle.Close(); e != nil {
e := errwrap.Wrapf(e, "problem closing func `%s`", node)
err = multierr.Append(err, e) // list of errors
}
}
}
close(obj.closeChan)
obj.wg.Wait() // wait so that each func doesn't need to do this in close
return err
}