Somewhere after the engine re-write we seem to have regressed and converge early even if some resource is dirty. This adds an additional timer so that we don't start the individual resource converged countdown until our state is okay.
444 lines
12 KiB
Go
444 lines
12 KiB
Go
// Mgmt
|
|
// Copyright (C) 2013-2018+ James Shubin and the project contributors
|
|
// Written by James Shubin <james@shubin.ca> and the project contributors
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package graph
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
"path"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/purpleidea/mgmt/converger"
|
|
"github.com/purpleidea/mgmt/engine"
|
|
"github.com/purpleidea/mgmt/engine/event"
|
|
"github.com/purpleidea/mgmt/pgraph"
|
|
"github.com/purpleidea/mgmt/util"
|
|
|
|
errwrap "github.com/pkg/errors"
|
|
)
|
|
|
|
// State stores some state about the resource it is mapped to.
|
|
type State struct {
|
|
// Graph is a pointer to the graph that this vertex is part of.
|
|
//Graph pgraph.Graph
|
|
|
|
// Vertex is the pointer in the graph that this state corresponds to. It
|
|
// can be converted to a `Res` if necessary.
|
|
// TODO: should this be passed in on Init instead?
|
|
Vertex pgraph.Vertex
|
|
|
|
Program string
|
|
Hostname string
|
|
World engine.World
|
|
|
|
// Prefix is a unique directory prefix which can be used. It should be
|
|
// created if needed.
|
|
Prefix string
|
|
|
|
//Converger converger.Converger
|
|
|
|
// Debug turns on additional output and behaviours.
|
|
Debug bool
|
|
|
|
// Logf is the logging function that should be used to display messages.
|
|
Logf func(format string, v ...interface{})
|
|
|
|
timestamp int64 // last updated timestamp
|
|
isStateOK bool // is state OK or do we need to run CheckApply ?
|
|
|
|
// events is a channel of incoming events which is read by the Watch
|
|
// loop for that resource. It receives events like pause, start, and
|
|
// poke. The channel shuts down to signal for Watch to exit.
|
|
eventsChan chan event.Kind // incoming to resource
|
|
eventsLock *sync.Mutex // lock around sending and closing of events channel
|
|
eventsDone bool // is channel closed?
|
|
|
|
// outputChan is the channel that the engine listens on for events from
|
|
// the Watch loop for that resource. The event is nil normally, except
|
|
// when events are sent on this channel from the engine. This only
|
|
// happens as a signaling mechanism when Watch has shutdown and we want
|
|
// to notify the Process loop which reads from this.
|
|
outputChan chan error // outgoing from resource
|
|
|
|
wg *sync.WaitGroup
|
|
exit *util.EasyExit
|
|
|
|
started chan struct{} // closes when it's started
|
|
stopped chan struct{} // closes when it's stopped
|
|
|
|
starter bool // do we have an indegree of 0 ?
|
|
working bool // is the Main() loop running ?
|
|
|
|
cuid converger.UID // primary converger
|
|
tuid converger.UID // secondary converger
|
|
|
|
init *engine.Init // a copy of the init struct passed to res Init
|
|
}
|
|
|
|
// Init initializes structures like channels.
|
|
func (obj *State) Init() error {
|
|
obj.eventsChan = make(chan event.Kind)
|
|
obj.eventsLock = &sync.Mutex{}
|
|
|
|
obj.outputChan = make(chan error)
|
|
|
|
obj.wg = &sync.WaitGroup{}
|
|
obj.exit = util.NewEasyExit()
|
|
|
|
obj.started = make(chan struct{})
|
|
obj.stopped = make(chan struct{})
|
|
|
|
res, isRes := obj.Vertex.(engine.Res)
|
|
if !isRes {
|
|
return fmt.Errorf("vertex is not a Res")
|
|
}
|
|
if obj.Hostname == "" {
|
|
return fmt.Errorf("the Hostname is empty")
|
|
}
|
|
if obj.Prefix == "" {
|
|
return fmt.Errorf("the Prefix is empty")
|
|
}
|
|
if obj.Prefix == "/" {
|
|
return fmt.Errorf("the Prefix is root")
|
|
}
|
|
if obj.Logf == nil {
|
|
return fmt.Errorf("the Logf function is missing")
|
|
}
|
|
|
|
//obj.cuid = obj.Converger.Register() // gets registered in Worker()
|
|
//obj.tuid = obj.Converger.Register() // gets registered in Worker()
|
|
|
|
obj.init = &engine.Init{
|
|
Program: obj.Program,
|
|
Hostname: obj.Hostname,
|
|
|
|
// Watch:
|
|
Running: func() error {
|
|
obj.tuid.StopTimer()
|
|
close(obj.started) // this is reset in the reset func
|
|
obj.isStateOK = false // assume we're initially dirty
|
|
// optimization: skip the initial send if not a starter
|
|
// because we'll get poked from a starter soon anyways!
|
|
if !obj.starter {
|
|
return nil
|
|
}
|
|
return obj.event()
|
|
},
|
|
Event: obj.event,
|
|
Events: obj.eventsChan,
|
|
Read: obj.read,
|
|
Dirty: func() { // TODO: should we rename this SetDirty?
|
|
obj.tuid.StopTimer()
|
|
obj.isStateOK = false
|
|
},
|
|
|
|
// CheckApply:
|
|
Refresh: func() bool {
|
|
res, ok := obj.Vertex.(engine.RefreshableRes)
|
|
if !ok {
|
|
panic("res does not support the Refreshable trait")
|
|
}
|
|
return res.Refresh()
|
|
},
|
|
Send: func(st interface{}) error {
|
|
res, ok := obj.Vertex.(engine.SendableRes)
|
|
if !ok {
|
|
panic("res does not support the Sendable trait")
|
|
}
|
|
// XXX: type check this
|
|
//expected := res.Sends()
|
|
//if err := XXX_TYPE_CHECK(expected, st); err != nil {
|
|
// return err
|
|
//}
|
|
|
|
return res.Send(st) // send the struct
|
|
},
|
|
Recv: func() map[string]*engine.Send { // TODO: change this API?
|
|
res, ok := obj.Vertex.(engine.RecvableRes)
|
|
if !ok {
|
|
panic("res does not support the Recvable trait")
|
|
}
|
|
return res.Recv()
|
|
},
|
|
|
|
World: obj.World,
|
|
VarDir: obj.varDir,
|
|
|
|
Debug: obj.Debug,
|
|
Logf: func(format string, v ...interface{}) {
|
|
obj.Logf("resource: "+format, v...)
|
|
},
|
|
}
|
|
|
|
// run the init
|
|
if obj.Debug {
|
|
obj.Logf("Init(%s)", res)
|
|
}
|
|
err := res.Init(obj.init)
|
|
if obj.Debug {
|
|
obj.Logf("Init(%s): Return(%+v)", res, err)
|
|
}
|
|
if err != nil {
|
|
return errwrap.Wrapf(err, "could not Init() resource")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Close shuts down and performs any cleanup. This is most akin to a "post" or
|
|
// cleanup command as the initiator for closing a vertex happens in graph sync.
|
|
func (obj *State) Close() error {
|
|
res, isRes := obj.Vertex.(engine.Res)
|
|
if !isRes {
|
|
return fmt.Errorf("vertex is not a Res")
|
|
}
|
|
|
|
//if obj.cuid != nil {
|
|
// obj.cuid.Unregister() // gets unregistered in Worker()
|
|
//}
|
|
//if obj.tuid != nil {
|
|
// obj.tuid.Unregister() // gets unregistered in Worker()
|
|
//}
|
|
|
|
// redundant safety
|
|
obj.wg.Wait() // wait until all poke's and events on me have exited
|
|
|
|
// run the close
|
|
if obj.Debug {
|
|
obj.Logf("Close(%s)", res)
|
|
}
|
|
err := res.Close()
|
|
if obj.Debug {
|
|
obj.Logf("Close(%s): Return(%+v)", res, err)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// reset is run to reset the state so that Watch can run a second time. Thus is
|
|
// needed for the Watch retry in particular.
|
|
func (obj *State) reset() {
|
|
obj.started = make(chan struct{})
|
|
obj.stopped = make(chan struct{})
|
|
}
|
|
|
|
// Poke sends a nil message on the outputChan. This channel is used by the
|
|
// resource to signal a possible change. This will cause the Process loop to
|
|
// run if it can.
|
|
func (obj *State) Poke() {
|
|
// add a wait group on the vertex we're poking!
|
|
obj.wg.Add(1)
|
|
defer obj.wg.Done()
|
|
|
|
select {
|
|
case obj.outputChan <- nil:
|
|
|
|
case <-obj.exit.Signal():
|
|
}
|
|
}
|
|
|
|
// Event sends a Pause or Start event to the resource. It can also be used to
|
|
// send Poke events, but it's much more efficient to send them directly instead
|
|
// of passing them through the resource.
|
|
func (obj *State) Event(kind event.Kind) {
|
|
// TODO: should these happen after the lock?
|
|
obj.wg.Add(1)
|
|
defer obj.wg.Done()
|
|
|
|
obj.eventsLock.Lock()
|
|
defer obj.eventsLock.Unlock()
|
|
|
|
if obj.eventsDone { // closing, skip events...
|
|
return
|
|
}
|
|
|
|
if kind == event.EventExit { // set this so future events don't deadlock
|
|
obj.Logf("exit event...")
|
|
obj.eventsDone = true
|
|
close(obj.eventsChan) // causes resource Watch loop to close
|
|
obj.exit.Done(nil) // trigger exit signal to unblock some cases
|
|
return
|
|
}
|
|
|
|
select {
|
|
case obj.eventsChan <- kind:
|
|
|
|
case <-obj.exit.Signal():
|
|
}
|
|
}
|
|
|
|
// read is a helper function used inside the main select statement of resources.
|
|
// If it returns an error, then this is a signal for the resource to exit.
|
|
func (obj *State) read(kind event.Kind) error {
|
|
switch kind {
|
|
case event.EventPoke:
|
|
return obj.event() // a poke needs to cause an event...
|
|
case event.EventStart:
|
|
return fmt.Errorf("unexpected start")
|
|
case event.EventPause:
|
|
// pass
|
|
case event.EventExit:
|
|
return engine.ErrSignalExit
|
|
|
|
default:
|
|
return fmt.Errorf("unhandled event: %+v", kind)
|
|
}
|
|
|
|
// we're paused now
|
|
select {
|
|
case kind, ok := <-obj.eventsChan:
|
|
if !ok {
|
|
return engine.ErrWatchExit
|
|
}
|
|
switch kind {
|
|
case event.EventPoke:
|
|
return fmt.Errorf("unexpected poke")
|
|
case event.EventPause:
|
|
return fmt.Errorf("unexpected pause")
|
|
case event.EventStart:
|
|
// resumed
|
|
return nil
|
|
case event.EventExit:
|
|
return engine.ErrSignalExit
|
|
|
|
default:
|
|
return fmt.Errorf("unhandled event: %+v", kind)
|
|
}
|
|
}
|
|
}
|
|
|
|
// event is a helper function to send an event from the resource Watch loop. It
|
|
// can be used for the initial `running` event, or any regular event. If it
|
|
// returns an error, then the Watch loop must return this error and shutdown.
|
|
func (obj *State) event() error {
|
|
// loop until we sent on obj.outputChan or exit with error
|
|
for {
|
|
select {
|
|
// send "activity" event
|
|
case obj.outputChan <- nil:
|
|
return nil // sent event!
|
|
|
|
// make sure to keep handling incoming
|
|
case kind, ok := <-obj.eventsChan:
|
|
if !ok {
|
|
return engine.ErrWatchExit
|
|
}
|
|
switch kind {
|
|
case event.EventPoke:
|
|
// we're trying to send an event, so swallow the
|
|
// poke: it's what we wanted to have happen here
|
|
continue
|
|
case event.EventStart:
|
|
return fmt.Errorf("unexpected start")
|
|
case event.EventPause:
|
|
// pass
|
|
case event.EventExit:
|
|
return engine.ErrSignalExit
|
|
|
|
default:
|
|
return fmt.Errorf("unhandled event: %+v", kind)
|
|
}
|
|
}
|
|
|
|
// we're paused now
|
|
select {
|
|
case kind, ok := <-obj.eventsChan:
|
|
if !ok {
|
|
return engine.ErrWatchExit
|
|
}
|
|
switch kind {
|
|
case event.EventPoke:
|
|
return fmt.Errorf("unexpected poke")
|
|
case event.EventPause:
|
|
return fmt.Errorf("unexpected pause")
|
|
case event.EventStart:
|
|
// resumed
|
|
case event.EventExit:
|
|
return engine.ErrSignalExit
|
|
|
|
default:
|
|
return fmt.Errorf("unhandled event: %+v", kind)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// varDir returns the path to a working directory for the resource. It will try
|
|
// and create the directory first, and return an error if this failed. The dir
|
|
// should be cleaned up by the resource on Close if it wishes to discard the
|
|
// contents. If it does not, then a future resource with the same kind and name
|
|
// may see those contents in that directory. The resource should clean up the
|
|
// contents before use if it is important that nothing exist. It is always
|
|
// possible that contents could remain after an abrupt crash, so do not store
|
|
// overly sensitive data unless you're aware of the risks.
|
|
func (obj *State) varDir(extra string) (string, error) {
|
|
// Using extra adds additional dirs onto our namespace. An empty extra
|
|
// adds no additional directories.
|
|
if obj.Prefix == "" { // safety
|
|
return "", fmt.Errorf("the VarDir prefix is empty")
|
|
}
|
|
|
|
// an empty string at the end has no effect
|
|
p := fmt.Sprintf("%s/", path.Join(obj.Prefix, extra))
|
|
if err := os.MkdirAll(p, 0770); err != nil {
|
|
return "", errwrap.Wrapf(err, "can't create prefix in: %s", p)
|
|
}
|
|
|
|
// returns with a trailing slash as per the mgmt file res convention
|
|
return p, nil
|
|
}
|
|
|
|
// poll is a replacement for Watch when the Poll metaparameter is used.
|
|
func (obj *State) poll(interval uint32) error {
|
|
// create a time.Ticker for the given interval
|
|
ticker := time.NewTicker(time.Duration(interval) * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
// notify engine that we're running
|
|
if err := obj.init.Running(); err != nil {
|
|
return err // exit if requested
|
|
}
|
|
|
|
var send = false // send event?
|
|
for {
|
|
select {
|
|
case <-ticker.C: // received the timer event
|
|
obj.init.Logf("polling...")
|
|
send = true
|
|
obj.init.Dirty() // dirty
|
|
|
|
case event, ok := <-obj.init.Events:
|
|
if !ok {
|
|
return nil
|
|
}
|
|
if err := obj.init.Read(event); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// do all our event sending all together to avoid duplicate msgs
|
|
if send {
|
|
send = false
|
|
if err := obj.init.Event(); err != nil {
|
|
return err // exit if requested
|
|
}
|
|
}
|
|
}
|
|
}
|