437 lines
12 KiB
Go
437 lines
12 KiB
Go
// Mgmt
|
|
// Copyright (C) 2013-2018+ James Shubin and the project contributors
|
|
// Written by James Shubin <james@shubin.ca> and the project contributors
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package graph
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
"path"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/purpleidea/mgmt/converger"
|
|
"github.com/purpleidea/mgmt/engine"
|
|
"github.com/purpleidea/mgmt/engine/event"
|
|
"github.com/purpleidea/mgmt/pgraph"
|
|
"github.com/purpleidea/mgmt/util"
|
|
|
|
errwrap "github.com/pkg/errors"
|
|
)
|
|
|
|
// State stores some state about the resource it is mapped to.
|
|
type State struct {
|
|
// Graph is a pointer to the graph that this vertex is part of.
|
|
//Graph pgraph.Graph
|
|
|
|
// Vertex is the pointer in the graph that this state corresponds to. It
|
|
// can be converted to a `Res` if necessary.
|
|
// TODO: should this be passed in on Init instead?
|
|
Vertex pgraph.Vertex
|
|
|
|
Program string
|
|
Hostname string
|
|
World engine.World
|
|
|
|
// Prefix is a unique directory prefix which can be used. It should be
|
|
// created if needed.
|
|
Prefix string
|
|
|
|
//Converger converger.Converger
|
|
|
|
// Debug turns on additional output and behaviours.
|
|
Debug bool
|
|
|
|
// Logf is the logging function that should be used to display messages.
|
|
Logf func(format string, v ...interface{})
|
|
|
|
timestamp int64 // last updated timestamp
|
|
isStateOK bool // is state OK or do we need to run CheckApply ?
|
|
|
|
// events is a channel of incoming events which is read by the Watch
|
|
// loop for that resource. It receives events like pause, start, and
|
|
// poke. The channel shuts down to signal for Watch to exit.
|
|
eventsChan chan event.Kind // incoming to resource
|
|
eventsLock *sync.Mutex // lock around sending and closing of events channel
|
|
eventsDone bool // is channel closed?
|
|
|
|
// outputChan is the channel that the engine listens on for events from
|
|
// the Watch loop for that resource. The event is nil normally, except
|
|
// when events are sent on this channel from the engine. This only
|
|
// happens as a signaling mechanism when Watch has shutdown and we want
|
|
// to notify the Process loop which reads from this.
|
|
outputChan chan error // outgoing from resource
|
|
|
|
wg *sync.WaitGroup
|
|
exit *util.EasyExit
|
|
|
|
started chan struct{} // closes when it's started
|
|
stopped chan struct{} // closes when it's stopped
|
|
|
|
starter bool // do we have an indegree of 0 ?
|
|
working bool // is the Main() loop running ?
|
|
|
|
cuid converger.UID // primary converger
|
|
|
|
init *engine.Init // a copy of the init struct passed to res Init
|
|
}
|
|
|
|
// Init initializes structures like channels.
|
|
func (obj *State) Init() error {
|
|
obj.eventsChan = make(chan event.Kind)
|
|
obj.eventsLock = &sync.Mutex{}
|
|
|
|
obj.outputChan = make(chan error)
|
|
|
|
obj.wg = &sync.WaitGroup{}
|
|
obj.exit = util.NewEasyExit()
|
|
|
|
obj.started = make(chan struct{})
|
|
obj.stopped = make(chan struct{})
|
|
|
|
res, isRes := obj.Vertex.(engine.Res)
|
|
if !isRes {
|
|
return fmt.Errorf("vertex is not a Res")
|
|
}
|
|
if obj.Hostname == "" {
|
|
return fmt.Errorf("the Hostname is empty")
|
|
}
|
|
if obj.Prefix == "" {
|
|
return fmt.Errorf("the Prefix is empty")
|
|
}
|
|
if obj.Prefix == "/" {
|
|
return fmt.Errorf("the Prefix is root")
|
|
}
|
|
if obj.Logf == nil {
|
|
return fmt.Errorf("the Logf function is missing")
|
|
}
|
|
|
|
//obj.cuid = obj.Converger.Register() // gets registered in Worker()
|
|
|
|
obj.init = &engine.Init{
|
|
Program: obj.Program,
|
|
Hostname: obj.Hostname,
|
|
|
|
// Watch:
|
|
Running: func() error {
|
|
close(obj.started) // this is reset in the reset func
|
|
obj.isStateOK = false // assume we're initially dirty
|
|
// optimization: skip the initial send if not a starter
|
|
// because we'll get poked from a starter soon anyways!
|
|
if !obj.starter {
|
|
return nil
|
|
}
|
|
return obj.event()
|
|
},
|
|
Event: obj.event,
|
|
Events: obj.eventsChan,
|
|
Read: obj.read,
|
|
Dirty: func() { // TODO: should we rename this SetDirty?
|
|
obj.isStateOK = false
|
|
},
|
|
|
|
// CheckApply:
|
|
Refresh: func() bool {
|
|
res, ok := obj.Vertex.(engine.RefreshableRes)
|
|
if !ok {
|
|
panic("res does not support the Refreshable trait")
|
|
}
|
|
return res.Refresh()
|
|
},
|
|
Send: func(st interface{}) error {
|
|
res, ok := obj.Vertex.(engine.SendableRes)
|
|
if !ok {
|
|
panic("res does not support the Sendable trait")
|
|
}
|
|
// XXX: type check this
|
|
//expected := res.Sends()
|
|
//if err := XXX_TYPE_CHECK(expected, st); err != nil {
|
|
// return err
|
|
//}
|
|
|
|
return res.Send(st) // send the struct
|
|
},
|
|
Recv: func() map[string]*engine.Send { // TODO: change this API?
|
|
res, ok := obj.Vertex.(engine.RecvableRes)
|
|
if !ok {
|
|
panic("res does not support the Recvable trait")
|
|
}
|
|
return res.Recv()
|
|
},
|
|
|
|
World: obj.World,
|
|
VarDir: obj.varDir,
|
|
|
|
Debug: obj.Debug,
|
|
Logf: func(format string, v ...interface{}) {
|
|
obj.Logf("resource: "+format, v...)
|
|
},
|
|
}
|
|
|
|
// run the init
|
|
if obj.Debug {
|
|
obj.Logf("Init(%s)", res)
|
|
}
|
|
err := res.Init(obj.init)
|
|
if obj.Debug {
|
|
obj.Logf("Init(%s): Return(%+v)", res, err)
|
|
}
|
|
if err != nil {
|
|
return errwrap.Wrapf(err, "could not Init() resource")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Close shuts down and performs any cleanup. This is most akin to a "post" or
|
|
// cleanup command as the initiator for closing a vertex happens in graph sync.
|
|
func (obj *State) Close() error {
|
|
res, isRes := obj.Vertex.(engine.Res)
|
|
if !isRes {
|
|
return fmt.Errorf("vertex is not a Res")
|
|
}
|
|
|
|
//if obj.cuid != nil {
|
|
// obj.cuid.Unregister() // gets unregistered in Worker()
|
|
//}
|
|
|
|
// redundant safety
|
|
obj.wg.Wait() // wait until all poke's and events on me have exited
|
|
|
|
// run the close
|
|
if obj.Debug {
|
|
obj.Logf("Close(%s)", res)
|
|
}
|
|
err := res.Close()
|
|
if obj.Debug {
|
|
obj.Logf("Close(%s): Return(%+v)", res, err)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// reset is run to reset the state so that Watch can run a second time. Thus is
|
|
// needed for the Watch retry in particular.
|
|
func (obj *State) reset() {
|
|
obj.started = make(chan struct{})
|
|
obj.stopped = make(chan struct{})
|
|
}
|
|
|
|
// Poke sends a nil message on the outputChan. This channel is used by the
|
|
// resource to signal a possible change. This will cause the Process loop to
|
|
// run if it can.
|
|
func (obj *State) Poke() {
|
|
// add a wait group on the vertex we're poking!
|
|
obj.wg.Add(1)
|
|
defer obj.wg.Done()
|
|
|
|
select {
|
|
case obj.outputChan <- nil:
|
|
|
|
case <-obj.exit.Signal():
|
|
}
|
|
}
|
|
|
|
// Event sends a Pause or Start event to the resource. It can also be used to
|
|
// send Poke events, but it's much more efficient to send them directly instead
|
|
// of passing them through the resource.
|
|
func (obj *State) Event(kind event.Kind) {
|
|
// TODO: should these happen after the lock?
|
|
obj.wg.Add(1)
|
|
defer obj.wg.Done()
|
|
|
|
obj.eventsLock.Lock()
|
|
defer obj.eventsLock.Unlock()
|
|
|
|
if obj.eventsDone { // closing, skip events...
|
|
return
|
|
}
|
|
|
|
if kind == event.EventExit { // set this so future events don't deadlock
|
|
obj.Logf("exit event...")
|
|
obj.eventsDone = true
|
|
close(obj.eventsChan) // causes resource Watch loop to close
|
|
obj.exit.Done(nil) // trigger exit signal to unblock some cases
|
|
return
|
|
}
|
|
|
|
select {
|
|
case obj.eventsChan <- kind:
|
|
|
|
case <-obj.exit.Signal():
|
|
}
|
|
}
|
|
|
|
// read is a helper function used inside the main select statement of resources.
|
|
// If it returns an error, then this is a signal for the resource to exit.
|
|
func (obj *State) read(kind event.Kind) error {
|
|
switch kind {
|
|
case event.EventPoke:
|
|
return obj.event() // a poke needs to cause an event...
|
|
case event.EventStart:
|
|
return fmt.Errorf("unexpected start")
|
|
case event.EventPause:
|
|
// pass
|
|
case event.EventExit:
|
|
return engine.ErrSignalExit
|
|
|
|
default:
|
|
return fmt.Errorf("unhandled event: %+v", kind)
|
|
}
|
|
|
|
// we're paused now
|
|
select {
|
|
case kind, ok := <-obj.eventsChan:
|
|
if !ok {
|
|
return engine.ErrWatchExit
|
|
}
|
|
switch kind {
|
|
case event.EventPoke:
|
|
return fmt.Errorf("unexpected poke")
|
|
case event.EventPause:
|
|
return fmt.Errorf("unexpected pause")
|
|
case event.EventStart:
|
|
// resumed
|
|
return nil
|
|
case event.EventExit:
|
|
return engine.ErrSignalExit
|
|
|
|
default:
|
|
return fmt.Errorf("unhandled event: %+v", kind)
|
|
}
|
|
}
|
|
}
|
|
|
|
// event is a helper function to send an event from the resource Watch loop. It
|
|
// can be used for the initial `running` event, or any regular event. If it
|
|
// returns an error, then the Watch loop must return this error and shutdown.
|
|
func (obj *State) event() error {
|
|
// loop until we sent on obj.outputChan or exit with error
|
|
for {
|
|
select {
|
|
// send "activity" event
|
|
case obj.outputChan <- nil:
|
|
return nil // sent event!
|
|
|
|
// make sure to keep handling incoming
|
|
case kind, ok := <-obj.eventsChan:
|
|
if !ok {
|
|
return engine.ErrWatchExit
|
|
}
|
|
switch kind {
|
|
case event.EventPoke:
|
|
// we're trying to send an event, so swallow the
|
|
// poke: it's what we wanted to have happen here
|
|
continue
|
|
case event.EventStart:
|
|
return fmt.Errorf("unexpected start")
|
|
case event.EventPause:
|
|
// pass
|
|
case event.EventExit:
|
|
return engine.ErrSignalExit
|
|
|
|
default:
|
|
return fmt.Errorf("unhandled event: %+v", kind)
|
|
}
|
|
}
|
|
|
|
// we're paused now
|
|
select {
|
|
case kind, ok := <-obj.eventsChan:
|
|
if !ok {
|
|
return engine.ErrWatchExit
|
|
}
|
|
switch kind {
|
|
case event.EventPoke:
|
|
return fmt.Errorf("unexpected poke")
|
|
case event.EventPause:
|
|
return fmt.Errorf("unexpected pause")
|
|
case event.EventStart:
|
|
// resumed
|
|
case event.EventExit:
|
|
return engine.ErrSignalExit
|
|
|
|
default:
|
|
return fmt.Errorf("unhandled event: %+v", kind)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// varDir returns the path to a working directory for the resource. It will try
|
|
// and create the directory first, and return an error if this failed. The dir
|
|
// should be cleaned up by the resource on Close if it wishes to discard the
|
|
// contents. If it does not, then a future resource with the same kind and name
|
|
// may see those contents in that directory. The resource should clean up the
|
|
// contents before use if it is important that nothing exist. It is always
|
|
// possible that contents could remain after an abrupt crash, so do not store
|
|
// overly sensitive data unless you're aware of the risks.
|
|
func (obj *State) varDir(extra string) (string, error) {
|
|
// Using extra adds additional dirs onto our namespace. An empty extra
|
|
// adds no additional directories.
|
|
if obj.Prefix == "" { // safety
|
|
return "", fmt.Errorf("the VarDir prefix is empty")
|
|
}
|
|
|
|
// an empty string at the end has no effect
|
|
p := fmt.Sprintf("%s/", path.Join(obj.Prefix, extra))
|
|
if err := os.MkdirAll(p, 0770); err != nil {
|
|
return "", errwrap.Wrapf(err, "can't create prefix in: %s", p)
|
|
}
|
|
|
|
// returns with a trailing slash as per the mgmt file res convention
|
|
return p, nil
|
|
}
|
|
|
|
// poll is a replacement for Watch when the Poll metaparameter is used.
|
|
func (obj *State) poll(interval uint32) error {
|
|
// create a time.Ticker for the given interval
|
|
ticker := time.NewTicker(time.Duration(interval) * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
// notify engine that we're running
|
|
if err := obj.init.Running(); err != nil {
|
|
return err // exit if requested
|
|
}
|
|
|
|
var send = false // send event?
|
|
for {
|
|
select {
|
|
case <-ticker.C: // received the timer event
|
|
obj.init.Logf("polling...")
|
|
send = true
|
|
obj.init.Dirty() // dirty
|
|
|
|
case event, ok := <-obj.init.Events:
|
|
if !ok {
|
|
return nil
|
|
}
|
|
if err := obj.init.Read(event); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// do all our event sending all together to avoid duplicate msgs
|
|
if send {
|
|
send = false
|
|
if err := obj.init.Event(); err != nil {
|
|
return err // exit if requested
|
|
}
|
|
}
|
|
}
|
|
}
|