15 Commits
0.0.7 ... 0.0.8

Author SHA1 Message Date
James Shubin
19760be0bc golint: Fix some golint issues 2016-12-21 03:10:25 -05:00
James Shubin
b3ea33f88d test: Allow devel versions to run gofmt
Let tip builds pass in travis too!
2016-12-21 02:48:50 -05:00
James Shubin
5b3425a689 pgraph: Remember to unpause the vertices!
Forgot this part earlier, sorry! Should work correctly now :)
2016-12-21 02:39:54 -05:00
James Shubin
a3d157bde6 pgraph: Mutex must be a pointer
This should be done the same way as the WaitGroup so that we don't
panic!
2016-12-20 05:49:17 -05:00
James Shubin
2c8c9264a4 pgraph: Simplify graph exit waiting
I think the vertex resource exiting can be done in a single stage
instead of the previous two stage exit.
2016-12-20 05:49:17 -05:00
James Shubin
0009d9b20e pgraph, resources: Integrate properly with the startup logic
This signals which resources have to run their initial pokes, and
removes the racy retry timer. We actually get a proper signal when
things are running too!
2016-12-20 05:49:17 -05:00
James Shubin
dd8d17232f pgraph: Build the sync group into the graph structure
This hides the sync/wait logic inside the graph itself.
2016-12-20 05:49:17 -05:00
James Shubin
6312b9225f gapi: Rename SwitchStream to Next
This is more concise and I think more logical. Complains welcome!
2016-12-20 05:49:17 -05:00
James Shubin
68cc09fef2 resources: file: Fix small typo in the compare function 2016-12-20 05:49:16 -05:00
James Shubin
0651c9de65 docs: Add resource guide
Sorry I never published this earlier. Thanks to everyone who has managed
to write a native resource without this.
2016-12-20 05:49:16 -05:00
James Shubin
38261ec809 resources: msg: Remove legacy comment
This doesn't apply anymore. Remove it!
2016-12-20 05:47:40 -05:00
James Shubin
067932aebf resources: Remove SetWatching/IsWatching code from Watch
This removes some boilerplate from the Watch methods which can be baked
into the engine instead.

This code should be checked for races and locks to make sure we only
start resources when it makes sense to.
2016-12-20 05:47:40 -05:00
James Shubin
af47511d58 resources: Don't dirty resource when poked with activity
When we receive a poke with the activity flag set it probably means we
are receiving a refresh notification. This doesn't necessarily mean that
the resource state should be dirty as a result, in particular if the
resource doesn't support refreshing. As a result, don't automatically
mark it as dirty. (The engine knows not to skip the CheckApply when the
refresh flag is set!)
2016-12-20 05:47:40 -05:00
James Shubin
36b916f27f resources: Simplify resource Converger and Startup code
This takes the Converged initialization and Startup patterns that are
common in all resources, and bakes it into the core engine. This way
resource writing is much more concise and there is less boilerplate!
2016-12-20 05:47:40 -05:00
James Shubin
e519811893 docs: Create a dedicated documentation folder 2016-12-09 17:32:50 -05:00
34 changed files with 755 additions and 347 deletions

1
.gitignore vendored
View File

@@ -2,7 +2,6 @@
.omv/ .omv/
.ssh/ .ssh/
.vagrant/ .vagrant/
mgmt-documentation.pdf
old/ old/
tmp/ tmp/
*_stringer.go *_stringer.go

View File

@@ -138,8 +138,8 @@ format: gofmt yamlfmt
docs: $(PROGRAM)-documentation.pdf docs: $(PROGRAM)-documentation.pdf
$(PROGRAM)-documentation.pdf: DOCUMENTATION.md $(PROGRAM)-documentation.pdf: docs/documentation.md
pandoc DOCUMENTATION.md -o '$(PROGRAM)-documentation.pdf' pandoc docs/documentation.md -o docs/'$(PROGRAM)-documentation.pdf'
# #
# build aliases # build aliases

View File

@@ -4,7 +4,7 @@
[![Go Report Card](https://goreportcard.com/badge/github.com/purpleidea/mgmt)](https://goreportcard.com/report/github.com/purpleidea/mgmt) [![Go Report Card](https://goreportcard.com/badge/github.com/purpleidea/mgmt)](https://goreportcard.com/report/github.com/purpleidea/mgmt)
[![Build Status](https://secure.travis-ci.org/purpleidea/mgmt.png?branch=master)](http://travis-ci.org/purpleidea/mgmt) [![Build Status](https://secure.travis-ci.org/purpleidea/mgmt.png?branch=master)](http://travis-ci.org/purpleidea/mgmt)
[![Documentation](https://img.shields.io/docs/markdown.png)](DOCUMENTATION.md) [![Documentation](https://img.shields.io/docs/markdown.png)](docs/documentation.md)
[![GoDoc](https://godoc.org/github.com/purpleidea/mgmt?status.svg)](https://godoc.org/github.com/purpleidea/mgmt) [![GoDoc](https://godoc.org/github.com/purpleidea/mgmt?status.svg)](https://godoc.org/github.com/purpleidea/mgmt)
[![IRC](https://img.shields.io/irc/%23mgmtconfig.png)](https://webchat.freenode.net/?channels=#mgmtconfig) [![IRC](https://img.shields.io/irc/%23mgmtconfig.png)](https://webchat.freenode.net/?channels=#mgmtconfig)
[![Jenkins](https://img.shields.io/jenkins/status.png)](https://ci.centos.org/job/purpleidea-mgmt/) [![Jenkins](https://img.shields.io/jenkins/status.png)](https://ci.centos.org/job/purpleidea-mgmt/)
@@ -23,7 +23,7 @@ With your help you'll be able to influence our design and get us there sooner!
## Questions: ## Questions:
Please join the [#mgmtconfig](https://webchat.freenode.net/?channels=#mgmtconfig) IRC community! Please join the [#mgmtconfig](https://webchat.freenode.net/?channels=#mgmtconfig) IRC community!
If you have a well phrased question that might benefit others, consider asking it by sending a patch to the documentation [FAQ](https://github.com/purpleidea/mgmt/blob/master/DOCUMENTATION.md#usage-and-frequently-asked-questions) section. I'll merge your question, and a patch with the answer! If you have a well phrased question that might benefit others, consider asking it by sending a patch to the documentation [FAQ](https://github.com/purpleidea/mgmt/blob/master/docs/documentation.md#usage-and-frequently-asked-questions) section. I'll merge your question, and a patch with the answer!
## Quick start: ## Quick start:
* Make sure you have golang version 1.6 or greater installed. * Make sure you have golang version 1.6 or greater installed.
@@ -49,7 +49,7 @@ cd $GOPATH/src/github.com/purpleidea/mgmt
Please look in the [examples/](examples/) folder for more examples! Please look in the [examples/](examples/) folder for more examples!
## Documentation: ## Documentation:
Please see: the manually created [DOCUMENTATION.md](DOCUMENTATION.md) (also available as [PDF](https://pdfdoc-purpleidea.rhcloud.com/pdf/https://github.com/purpleidea/mgmt/blob/master/DOCUMENTATION.md)) and the automatically generated [GoDoc documentation](https://godoc.org/github.com/purpleidea/mgmt). Please see: the manually created [documentation.md](docs/documentation.md) (also available as [PDF](https://pdfdoc-purpleidea.rhcloud.com/pdf/https://github.com/purpleidea/mgmt/blob/master/docs/documentation.md)) and the automatically generated [GoDoc documentation](https://godoc.org/github.com/purpleidea/mgmt).
## Roadmap: ## Roadmap:
Please see: [TODO.md](TODO.md) for a list of upcoming work and TODO items. Please see: [TODO.md](TODO.md) for a list of upcoming work and TODO items.

1
docs/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
mgmt-documentation.pdf

View File

@@ -23,7 +23,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
####Available from: ####Available from:
####[https://github.com/purpleidea/mgmt/](https://github.com/purpleidea/mgmt/) ####[https://github.com/purpleidea/mgmt/](https://github.com/purpleidea/mgmt/)
####This documentation is available in: [Markdown](https://github.com/purpleidea/mgmt/blob/master/DOCUMENTATION.md) or [PDF](https://pdfdoc-purpleidea.rhcloud.com/pdf/https://github.com/purpleidea/mgmt/blob/master/DOCUMENTATION.md) format. ####This documentation is available in: [Markdown](https://github.com/purpleidea/mgmt/blob/master/docs/documentation.md) or [PDF](https://pdfdoc-purpleidea.rhcloud.com/pdf/https://github.com/purpleidea/mgmt/blob/master/docs/documentation.md) format.
####Table of Contents ####Table of Contents

View File

@@ -14,7 +14,7 @@ This document goes into detail on how this works, and lists
some pitfalls and limitations. some pitfalls and limitations.
For basic instructions on how to use the Puppet support, see For basic instructions on how to use the Puppet support, see
the [main documentation](DOCUMENTATION.md#puppet-support). the [main documentation](documentation.md#puppet-support).
##Prerequisites ##Prerequisites

534
docs/resource-guide.md Normal file
View File

@@ -0,0 +1,534 @@
#mgmt
<!--
Mgmt
Copyright (C) 2013-2016+ James Shubin and the project contributors
Written by James Shubin <james@shubin.ca> and the project contributors
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
-->
##mgmt resource guide by [James](https://ttboj.wordpress.com/)
####Available from:
####[https://github.com/purpleidea/mgmt/](https://github.com/purpleidea/mgmt/)
####This documentation is available in: [Markdown](https://github.com/purpleidea/mgmt/blob/master/docs/resource-guide.md) or [PDF](https://pdfdoc-purpleidea.rhcloud.com/pdf/https://github.com/purpleidea/mgmt/blob/master/docs/resource-guide.md) format.
####Table of Contents
1. [Overview](#overview)
2. [Theory - Resource theory in mgmt](#theory)
3. [Resource API - Getting started with mgmt](#resource-api)
* [Init - Initialize the resource](#init)
* [CheckApply - Check and apply resource state](#checkapply)
* [Watch - Detect resource changes](#watch)
* [Compare - Compare resource with another](#compare)
4. [Further considerations - More information about resource writing](#further-considerations)
5. [Automatic edges - Adding automatic resources dependencies](#automatic-edges)
6. [Automatic grouping - Grouping multiple resources into one](#automatic-grouping)
7. [Send/Recv - Communication between resources](#send-recv)
8. [Composite resources - Importing code from one resource into another](#composite-resources)
9. [FAQ - Frequently asked questions](#frequently-asked-questions)
10. [Suggestions - API change suggestions](#suggestions)
11. [Authors - Authors and contact information](#authors)
##Overview
The `mgmt` tool has built-in resource primitives which make up the building
blocks of any configuration. Each instance of a resource is mapped to a single
vertex in the resource [graph](https://en.wikipedia.org/wiki/Directed_acyclic_graph).
This guide is meant to instruct developers on how to write a brand new resource.
Since `mgmt` and the core resources are written in golang, some prior golang
knowledge is assumed.
##Theory
Resources in `mgmt` are similar to resources in other systems in that they are
[idempotent](https://en.wikipedia.org/wiki/Idempotence). Our resources are
uniquely different in that they can detect when their state has changed, and as
a result can run to revert or repair this change instantly. For some background
on this design, please read the
[original article](https://ttboj.wordpress.com/2016/01/18/next-generation-configuration-mgmt/)
on the subject.
##Resource API
To implement a resource in `mgmt` it must satisfy the
[`Res`](https://github.com/purpleidea/mgmt/blob/master/resources/resources.go)
interface. What follows are each of the method signatures and a description of
each.
###Init
```golang
Init() error
```
This is called to initialize the resource. If something goes wrong, it should
return an error. It should set the resource `kind`, do any resource specific
work, and finish by calling the `Init` method of the base resource.
####Example
```golang
// Init initializes the Foo resource.
func (obj *FooRes) Init() error {
obj.BaseRes.kind = "Foo" // must set capitalized resource kind
// run the resource specific initialization, and error if anything fails
if some_error {
return err // something went wrong!
}
return obj.BaseRes.Init() // call the base resource init
}
```
###CheckApply
```golang
CheckApply(apply bool) (checkOK bool, err error)
```
`CheckApply` is where the real _work_ is done. Under normal circumstances, this
function should check if the state of this resource is correct, and if so, it
should return: `(true, nil)`. If the `apply` variable is set to `true`, then
this means that we should then proceed to run the changes required to bring the
resource into the correct state. If the `apply` variable is set to `false`, then
the resource is operating in _noop_ mode and _no operations_ should be executed!
After having executed the necessary operations to bring the resource back into
the desired state, or after having detected that the state was incorrect, but
that changes can't be made because `apply` is `false`, you should then return
`(false, nil)`.
You must cause the resource to converge during a single execution of this
function. If you cannot, then you must return an error! The exception to this
rule is that if an external force changes the state of the resource while it is
being remedied, it is possible to return from this function even though the
resource isn't now converged. This is not a bug, as the resources `Watch`
facility will detect the change, ultimately resulting in a subsequent call to
`CheckApply`.
####Example
```golang
// CheckApply does the idempotent work of checking and applying resource state.
func (obj *FooRes) CheckApply(apply bool) (bool, error) {
// check the state
if state_is_okay { return true, nil } // done early! :)
// state was bad
if !apply { return false, nil } // don't apply; !stateok, nil
// do the apply!
return false, nil // after success applying
if any_error { return false, err } // anytime there's an err!
}
```
The `CheckApply` function is called by the `mgmt` engine when it believes a call
is necessary. Under certain conditions when a `Watch` call does not invalidate
the state of the resource, and no refresh call was sent, its execution might be
skipped. This is an engine optimization, and not a bug. It is mentioned here in
the documentation in case you are confused as to why a debug message you've
added to the code isn't always printed.
####Refresh notifications
Some resources may choose to support receiving refresh notifications. In general
these should be avoided if possible, but nevertheless, they do make sense in
certain situations. Resources that support these need to verify if one was sent
during the CheckApply phase of execution. This is accomplished by calling the
`Refresh() bool` method of the resource, and inspecting the return value. This
is only necessary if you plan to perform a refresh action. Refresh actions
should still respect the `apply` variable, and no system changes should be made
if it is `false`. Refresh notifications are generated by any resource when an
action is applied by that resource and are transmitted through graph edges which
have enabled their propagation. Resources that currently perform some refresh
action include `svc`, `timer`, and `password`.
####Paired execution
For many resources it is not uncommon to see `CheckApply` run twice in rapid
succession. This is usually not a pathological occurrence, but rather a healthy
pattern which is a consequence of the event system. When the state of the
resource is incorrect, `CheckApply` will run to remedy the state. In response to
having just changed the state, it is usually the case that this repair will
trigger the `Watch` code! In response, a second `CheckApply` is triggered, which
will likely find the state to now be correct.
####Summary
* Anytime an error occurs during `CheckApply`, you should return `(false, err)`.
* If the state is correct and no changes are needed, return `(true, nil)`.
* You should only make changes to the system if `apply` is set to `true`.
* After checking the state and possibly applying the fix, return `(false, nil)`.
* Returning `(true, err)` is a programming error and will cause a `Fatal`.
###Watch
```golang
Watch(chan Event) error
```
`Watch` is a main loop that runs and sends messages when it detects that the
state of the resource might have changed. To send a message you should write to
the input `Event` channel using the `DoSend` helper method. The Watch function
should run continuously until a shutdown message is received. If at any time
something goes wrong, you should return an error, and the `mgmt` engine will
handle possibly restarting the main loop based on the `retry` meta parameters.
It is better to send an event notification which turns out to be spurious, than
to miss a possible event. Resources which can miss events are incorrect and need
to be re-engineered so that this isn't the case. If you have an idea for a
resource which would fit this criteria, but you can't find a solution, please
contact the `mgmt` maintainers so that this problem can be investigated and a
possible system level engineering fix can be found.
You may have trouble deciding how much resource state checking should happen in
the `Watch` loop versus deferring it all to the `CheckApply` method. You may
want to put some simple fast path checking in `Watch` to avoid generating
obviously spurious events, but in general it's best to keep the `Watch` method
as simple as possible. Contact the `mgmt` maintainers if you're not sure.
If the resource is activated in `polling` mode, the `Watch` method will not get
executed. As a result, the resource must still work even if the main loop is not
running.
####Select
The lifetime of most resources `Watch` method should be spent in an infinite
loop that is bounded by a `select` call. The `select` call is the point where
our method hands back control to the engine (and the kernel) so that we can
sleep until something of interest wakes us up. In this loop we must process
events from the engine via the `<-obj.Events()` call, wait for the converged
timeout with `<-cuid.ConvergedTimer()`, and receive events for our resource
itself!
####Events
If we receive an internal event from the `<-obj.Events()` method, we can read it
with the ReadEvent helper function. This function tells us if we should shutdown
our resource, and if we should generate an event. When we want to send an event,
we use the `DoSend` helper function. It is also important to mark the resource
state as `dirty` if we believe it might have changed. We do this with the
`StateOK(false)` function.
####Startup
Once the `Watch` function has finished starting up successfully, it is important
to generate one event to notify the `mgmt` engine that we're now listening
successfully, so that it can run an initial `CheckApply` to ensure we're safely
tracking a healthy state and that we didn't miss anything when `Watch` was down
or from before `mgmt` was running. It does this by calling the `Running` method.
####Converged
The engine might be asked to shutdown when the entire state of the system has
not seen any changes for some duration of time. In order for the engine to be
able to make this determination, each resource must report its converged state.
To do this, the `Watch` method should get the `ConvergedUID` handle that has
been prepared for it by the engine. This is done by calling the `Converger`
method on the resource object. The result can be used to set the converged
status with `SetConverged`, and to notify when the particular timeout has been
reached by waiting on `ConvergedTimer`.
Instead of interacting with the `ConvergedUID` with these two methods, we can
instead use the `StartTimer` and `ResetTimer` methods which accomplish the same
thing, but provide a `select`-free interface for different coding situations.
####Example
```golang
// Watch is the listener and main loop for this resource.
func (obj *FooRes) Watch(processChan chan event.Event) error {
cuid := obj.Converger() // get the converger uid used to report status
// setup the Foo resource
var err error
if err, obj.foo = OpenFoo(); err != nil {
return err // we couldn't startup
}
defer obj.whatever.CloseFoo() // shutdown our
// notify engine that we're running
if err := obj.Running(processChan); err != nil {
return err // bubble up a NACK...
}
var send = false // send event?
var exit = false
for {
obj.SetState(ResStateWatching) // reset
select {
case event := <-obj.Events():
cuid.SetConverged(false)
// we avoid sending events on unpause
if exit, send = obj.ReadEvent(&event); exit {
return nil // exit
}
// the actual events!
case event := <-obj.foo.Events:
if is_an_event {
send = true // used below
cuid.SetConverged(false)
obj.StateOK(false) // dirty
}
// event errors
case err := <-obj.foo.Errors:
cuuid.SetConverged(false)
return err // will cause a retry or permanent failure
case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged!
continue
}
// do all our event sending all together to avoid duplicate msgs
if send {
send = false
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
return err // we exit or bubble up a NACK...
}
}
}
}
```
####Summary
* Remember to call the appropriate `converger` methods throughout the resource.
* Remember to call `Startup` when the `Watch` is running successfully.
* Remember to process internal events and shutdown promptly if asked to.
* Ensure the design of your resource is well thought out.
* Have a look at the existing resources for a rough idea of how this all works.
###Compare
```golang
Compare(Res) bool
```
Each resource must have a `Compare` method. This takes as input another resource
and must return whether they are identical or not. This is used for identifying
if an existing resource can be used in place of a new one with a similar set of
parameters. In particular, when switching from one graph to a new (possibly
identical) graph, this avoids recomputing the state for resources which don't
change or that are sufficiently similar that they don't need to be swapped out.
In general if all the resource properties are identical, then they usually don't
need to be changed. On occasion, not all of them need to be compared, in
particular if they store some generated state, or if they aren't significant in
some way.
####Example
```golang
// Compare two resources and return if they are equivalent.
func (obj *FooRes) Compare(res Res) bool {
switch res.(type) {
case *FooRes: // only compare to other resources of the Foo kind!
res := res.(*FileRes)
if !obj.BaseRes.Compare(res) { // call base Compare
return false
}
if obj.Name != res.Name {
return false
}
if obj.whatever != res.whatever {
return false
}
if obj.Flag != res.Flag {
return false
}
default:
return false // different kind of resource
}
return true // they must match!
}
```
###Validate
```golang
Validate() error
```
This method is used to validate if the populated resource struct is a valid
representation of the resource kind. If it does not conform to the resource
specifications, it should generate an error. If you notice that this method is
quite large, it might be an indication that you might want to reconsider the
parameter list and interface to this resource.
###GetUIDs
```golang
GetUIDs() []ResUID
```
The `GetUIDs` method returns a list of `ResUID` interfaces that represent the
particular resource uniquely. This is used with the AutoEdges API to determine
if another resource can match a dependency to this one.
###AutoEdges
```golang
AutoEdges() AutoEdge
```
This returns a struct that implements the `AutoEdge` interface. This struct
is used to match other resources that might be relevant dependencies for this
resource.
###CollectPattern
```golang
CollectPattern() string
```
This is currently a stub and will be updated once the DSL is further along.
##Further considerations
There is some additional information that any resource writer will need to know.
Each issue is listed separately below!
###Resource struct
Each resource will implement methods as pointer receivers on a resource struct.
The resource struct must include an anonymous reference to the `BaseRes` struct.
The naming convention for resources is that they end with a `Res` suffix. If
you'd like your resource to be accessible by the `YAML` graph API (GAPI), then
you'll need to include the appropriate YAML fields as shown below.
####Example
```golang
type FooRes struct {
BaseRes `yaml:",inline"` // base properties
Whatever string `yaml:"whatever"` // you pick!
Bar int // no yaml, used as public output value for send/recv
Baz bool `yaml:"baz"` // something else
something string // some private field
}
```
###YAML
In addition to labelling your resource struct with YAML fields, you must also
add an entry to the internal `GraphConfig` struct. It is a fairly straight
forward one line patch.
```golang
type GraphConfig struct {
// [snip...]
Resources struct {
Noop []*resources.NoopRes `yaml:"noop"`
File []*resources.FileRes `yaml:"file"`
// [snip...]
Foo []*resources.FooRes `yaml:"foo"` // tada :)
}
}
```
###Gob registration
All resources must be registered with the `golang` _gob_ module so that they can
be encoded and decoded. Make sure to include the following code snippet for this
to work.
```golang
import "encoding/gob"
func init() { // special golang method that runs once
gob.Register(&FooRes{}) // substitude your resource here
}
```
##Automatic edges
Automatic edges in `mgmt` are well described in [this article](https://ttboj.wordpress.com/2016/03/14/automatic-edges-in-mgmt/).
The best example of this technique can be seen in the `svc` resource.
Unfortunately no further documentation about this subject has been written. To
expand this section, please send a patch! Please contact us if you'd like to
work on a resource that uses this feature, or to add it to an existing one!
##Automatic grouping
Automatic grouping in `mgmt` is well described in [this article](https://ttboj.wordpress.com/2016/03/30/automatic-grouping-in-mgmt/).
The best example of this technique can be seen in the `pkg` resource.
Unfortunately no further documentation about this subject has been written. To
expand this section, please send a patch! Please contact us if you'd like to
work on a resource that uses this feature, or to add it to an existing one!
##Send/Recv
In `mgmt` there is a novel concept called _Send/Recv_. For some background,
please [read the introductory article](https://ttboj.wordpress.com/2016/12/07/sendrecv-in-mgmt/).
When using this feature, the engine will automatically send the user specified
value to the intended destination without requiring any resource specific code.
Any time that one of the destination values is changed, the engine automatically
marks the resource state as `dirty`. To detect if a particular value was
received, and if it changed (during this invocation of CheckApply) from the
previous value, you can query the Recv parameter. It will contain a `map` of all
the keys which can be received on, and the value has a `Changed` property which
will indicate whether the value was updated on this particular `CheckApply`
invocation. The type of the sending key must match that of the receiving one.
This can _only_ be done inside of the `CheckApply` function!
```golang
// inside CheckApply, probably near the top
if val, exists := obj.Recv["SomeKey"]; exists {
log.Printf("SomeKey was sent to us from: %s[%s].%s", val.Res.Kind(), val.Res.GetName(), val.Key)
if val.Changed {
log.Printf("SomeKey was just updated!")
// you may want to invalidate some local cache
}
}
```
Astute readers will note that there isn't anything that prevents a user from
sending an identically typed value to some arbitrary (public) key that the
resource author hadn't considered! While this is true, resources should probably
work within this problem space anyways. The rule of thumb is that any public
parameter which is normally used in a resource can be used safely.
One subtle scenario is that if a resource creates a local cache or stores a
computation that depends on the value of a public parameter and will require
invalidation should that public parameter change, then you must detect that
scenario and invalidate the cache when it occurs. This *must* be processed
before there is a possibility of failure in CheckApply, because if we fail (and
possibly run again) the subsequent send->recv transfer might not have a new
value to copy, and therefore we won't see this notification of change.
Therefore, it is important to process these promptly, if they must not be lost,
such as for cache invalidation.
Remember, `Send/Recv` only changes your resource code if you cache state.
##Composite resources
Composite resources are resources which embed one or more existing resources.
This is useful to prevent code duplication in higher level resource scenarios.
The best example of this technique can be seen in the `nspawn` resource which
can be seen to partially embed a `svc` resource, but without its `Watch`.
Unfortunately no further documentation about this subject has been written. To
expand this section, please send a patch! Please contact us if you'd like to
work on a resource that uses this feature, or to add it to an existing one!
##Frequently asked questions
(Send your questions as a patch to this FAQ! I'll review it, merge it, and
respond by commit with the answer.)
###Can I write resources in a different language?
Currently `golang` is the only supported language for built-in resources. We
might consider allowing external resources to be imported in the future. This
will likely require a language that can expose a C-like API, such as `python` or
`ruby`. Custom `golang` resources are already possible when using mgmt as a lib.
Higher level resource collections will be possible once the `mgmt` DSL is ready.
###What new resource primitives need writing?
There are still many ideas for new resources that haven't been written yet. If
you'd like to contribute one, please contact us and tell us about your idea!
###Where can I find more information about mgmt?
Additional blog posts, videos and other material [is available!](https://github.com/purpleidea/mgmt/#on-the-web).
##Suggestions
If you have any ideas for API changes or other improvements to resource writing,
please let us know! We're still pre 1.0 and pre 0.1 and happy to break API in
order to get it right!
##Authors
Copyright (C) 2013-2016+ James Shubin and the project contributors
Please see the
[AUTHORS](https://github.com/purpleidea/mgmt/tree/master/AUTHORS) file
for more information.
* [github](https://github.com/purpleidea/)
* [&#64;purpleidea](https://twitter.com/#!/purpleidea)
* [https://ttboj.wordpress.com/](https://ttboj.wordpress.com/)

View File

@@ -316,7 +316,7 @@ func (obj *EmbdEtcd) Connect(reconnect bool) error {
if emax > maxClientConnectRetries { if emax > maxClientConnectRetries {
log.Printf("Etcd: The dataDir (%s) might be inconsistent or corrupt.", obj.dataDir) log.Printf("Etcd: The dataDir (%s) might be inconsistent or corrupt.", obj.dataDir)
log.Printf("Etcd: Please see: %s", "https://github.com/purpleidea/mgmt/blob/master/DOCUMENTATION.md#what-does-the-error-message-about-an-inconsistent-datadir-mean") log.Printf("Etcd: Please see: %s", "https://github.com/purpleidea/mgmt/blob/master/DOCUMENTATION.md#what-does-the-error-message-about-an-inconsistent-datadir-mean")
obj.cError = fmt.Errorf("Can't find an available endpoint.") obj.cError = fmt.Errorf("can't find an available endpoint")
return obj.cError return obj.cError
} }
err = &CtxDelayErr{time.Duration(emax) * time.Second, "No endpoints available yet!"} // retry with backoff... err = &CtxDelayErr{time.Duration(emax) * time.Second, "No endpoints available yet!"} // retry with backoff...

View File

@@ -85,8 +85,8 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) {
return g, err return g, err
} }
// SwitchStream returns nil errors every time there could be a new graph. // Next returns nil errors every time there could be a new graph.
func (obj *MyGAPI) SwitchStream() chan error { func (obj *MyGAPI) Next() chan error {
if obj.data.NoWatch || obj.Interval <= 0 { if obj.data.NoWatch || obj.Interval <= 0 {
return nil return nil
} }

View File

@@ -78,8 +78,8 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) {
return g, nil return g, nil
} }
// SwitchStream returns nil errors every time there could be a new graph. // Next returns nil errors every time there could be a new graph.
func (obj *MyGAPI) SwitchStream() chan error { func (obj *MyGAPI) Next() chan error {
if obj.data.NoWatch || obj.Interval <= 0 { if obj.data.NoWatch || obj.Interval <= 0 {
return nil return nil
} }

View File

@@ -86,7 +86,7 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) {
Name: "file1", Name: "file1",
// send->recv! // send->recv!
Recv: map[string]*resources.Send{ Recv: map[string]*resources.Send{
"Content": &resources.Send{Res: p1, Key: "Password"}, "Content": {Res: p1, Key: "Password"},
}, },
}, },
Path: "/tmp/mgmt/secret", Path: "/tmp/mgmt/secret",
@@ -120,8 +120,8 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) {
return g, nil return g, nil
} }
// SwitchStream returns nil errors every time there could be a new graph. // Next returns nil errors every time there could be a new graph.
func (obj *MyGAPI) SwitchStream() chan error { func (obj *MyGAPI) Next() chan error {
if obj.data.NoWatch || obj.Interval <= 0 { if obj.data.NoWatch || obj.Interval <= 0 {
return nil return nil
} }

View File

@@ -45,6 +45,6 @@ type Data struct {
type GAPI interface { type GAPI interface {
Init(Data) error // initializes the GAPI and passes in useful data Init(Data) error // initializes the GAPI and passes in useful data
Graph() (*pgraph.Graph, error) // returns the most recent pgraph Graph() (*pgraph.Graph, error) // returns the most recent pgraph
SwitchStream() chan error // returns a stream of switch events Next() chan error // returns a stream of switch events
Close() error // shutdown the GAPI Close() error // shutdown the GAPI
} }

View File

@@ -23,7 +23,6 @@ import (
"log" "log"
"os" "os"
"path" "path"
"sync"
"time" "time"
"github.com/purpleidea/mgmt/converger" "github.com/purpleidea/mgmt/converger"
@@ -267,7 +266,6 @@ func (obj *Main) Run() error {
// TODO: Import admin key // TODO: Import admin key
} }
var wg sync.WaitGroup
var G, oldGraph *pgraph.Graph var G, oldGraph *pgraph.Graph
// exit after `max-runtime` seconds for no reason at all... // exit after `max-runtime` seconds for no reason at all...
@@ -346,7 +344,7 @@ func (obj *Main) Run() error {
if err := obj.GAPI.Init(data); err != nil { if err := obj.GAPI.Init(data); err != nil {
obj.Exit(fmt.Errorf("Main: GAPI: Init failed: %v", err)) obj.Exit(fmt.Errorf("Main: GAPI: Init failed: %v", err))
} else if !obj.NoWatch { } else if !obj.NoWatch {
gapiChan = obj.GAPI.SwitchStream() // stream of graph switch events! gapiChan = obj.GAPI.Next() // stream of graph switch events!
} }
} }
@@ -412,8 +410,8 @@ func (obj *Main) Run() error {
log.Printf("Config: Error creating new graph: %v", err) log.Printf("Config: Error creating new graph: %v", err)
// unpause! // unpause!
if !first { if !first {
G.Start(&wg, first) // sync G.Start(first) // sync
converger.Start() // after G.Start() converger.Start() // after G.Start()
} }
continue continue
} }
@@ -440,8 +438,8 @@ func (obj *Main) Run() error {
log.Printf("Config: Error running graph sync: %v", err) log.Printf("Config: Error running graph sync: %v", err)
// unpause! // unpause!
if !first { if !first {
G.Start(&wg, first) // sync G.Start(first) // sync
converger.Start() // after G.Start() converger.Start() // after G.Start()
} }
continue continue
} }
@@ -466,8 +464,8 @@ func (obj *Main) Run() error {
// some are not ready yet and the EtcdWatch // some are not ready yet and the EtcdWatch
// loops, we'll cause G.Pause(...) before we // loops, we'll cause G.Pause(...) before we
// even got going, thus causing nil pointer errors // even got going, thus causing nil pointer errors
G.Start(&wg, first) // sync G.Start(first) // sync
converger.Start() // after G.Start() converger.Start() // after G.Start()
first = false first = false
} }
}() }()
@@ -542,11 +540,11 @@ func (obj *Main) Run() error {
reterr = multierr.Append(reterr, err) // list of errors reterr = multierr.Append(reterr, err) // list of errors
} }
G.Exit() // tell all the children to exit
// tell inner main loop to exit // tell inner main loop to exit
close(exitchan) close(exitchan)
G.Exit() // tell all the children to exit, and waits for them to do so
// cleanup etcd main loop last so it can process everything first // cleanup etcd main loop last so it can process everything first
if err := EmbdEtcd.Destroy(); err != nil { // shutdown and cleanup etcd if err := EmbdEtcd.Destroy(); err != nil { // shutdown and cleanup etcd
err = errwrap.Wrapf(err, "Etcd exited poorly!") err = errwrap.Wrapf(err, "Etcd exited poorly!")
@@ -557,8 +555,6 @@ func (obj *Main) Run() error {
log.Printf("Main: Graph: %v", G) log.Printf("Main: Graph: %v", G)
} }
wg.Wait() // wait for primary go routines to exit
// TODO: wait for each vertex to exit... // TODO: wait for each vertex to exit...
log.Println("Goodbye!") log.Println("Goodbye!")
return reterr return reterr

View File

@@ -289,6 +289,9 @@ func (g *Graph) Worker(v *Vertex) error {
// the Watch() function about which graph it is // the Watch() function about which graph it is
// running on, which isolates things nicely... // running on, which isolates things nicely...
obj := v.Res obj := v.Res
// TODO: is there a better system for the `Watching` flag?
obj.SetWatching(true)
defer obj.SetWatching(false)
processChan := make(chan event.Event) processChan := make(chan event.Event)
go func() { go func() {
running := false running := false
@@ -415,7 +418,9 @@ func (g *Graph) Worker(v *Vertex) error {
} }
// TODO: reset the watch retry count after some amount of success // TODO: reset the watch retry count after some amount of success
v.Res.RegisterConverger()
e := v.Res.Watch(processChan) e := v.Res.Watch(processChan)
v.Res.UnregisterConverger()
if e == nil { // exit signal if e == nil { // exit signal
err = nil // clean exit err = nil // clean exit
break break
@@ -445,31 +450,14 @@ func (g *Graph) Worker(v *Vertex) error {
// Start is a main kick to start the graph. It goes through in reverse topological // Start is a main kick to start the graph. It goes through in reverse topological
// sort order so that events can't hit un-started vertices. // sort order so that events can't hit un-started vertices.
func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue func (g *Graph) Start(first bool) { // start or continue
log.Printf("State: %v -> %v", g.setState(graphStateStarting), g.getState()) log.Printf("State: %v -> %v", g.setState(graphStateStarting), g.getState())
defer log.Printf("State: %v -> %v", g.setState(graphStateStarted), g.getState()) defer log.Printf("State: %v -> %v", g.setState(graphStateStarted), g.getState())
var wg sync.WaitGroup
t, _ := g.TopologicalSort() t, _ := g.TopologicalSort()
// TODO: only calculate indegree if `first` is true to save resources // TODO: only calculate indegree if `first` is true to save resources
indegree := g.InDegree() // compute all of the indegree's indegree := g.InDegree() // compute all of the indegree's
for _, v := range Reverse(t) { for _, v := range Reverse(t) {
if !v.Res.IsWatching() { // if Watch() is not running...
wg.Add(1)
// must pass in value to avoid races...
// see: https://ttboj.wordpress.com/2015/07/27/golang-parallelism-issues-causing-too-many-open-files-error/
go func(vv *Vertex) {
defer wg.Done()
// TODO: if a sufficient number of workers error,
// should something be done? Will these restart
// after perma-failure if we have a graph change?
if err := g.Worker(vv); err != nil { // contains the Watch and CheckApply loops
log.Printf("%s[%s]: Exited with failure: %v", vv.Kind(), vv.GetName(), err)
return
}
log.Printf("%s[%s]: Exited", vv.Kind(), vv.GetName())
}(v)
}
// selective poke: here we reduce the number of initial pokes // selective poke: here we reduce the number of initial pokes
// to the minimum required to activate every vertex in the // to the minimum required to activate every vertex in the
// graph, either by direct action, or by getting poked by a // graph, either by direct action, or by getting poked by a
@@ -483,18 +471,39 @@ func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue
// function) we need to poke to *unpause* every graph vertex, // function) we need to poke to *unpause* every graph vertex,
// and not just selectively the subset with no indegree. // and not just selectively the subset with no indegree.
if (!first) || indegree[v] == 0 { if (!first) || indegree[v] == 0 {
// ensure state is started before continuing on to next vertex v.Res.Starter(true) // let the startup code know to poke
for !v.SendEvent(event.EventStart, true, false) { }
if g.Flags.Debug {
// if SendEvent fails, we aren't up yet if !v.Res.IsWatching() { // if Watch() is not running...
log.Printf("%s[%s]: Retrying SendEvent(Start)", v.Kind(), v.GetName()) g.wg.Add(1)
// sleep here briefly or otherwise cause // must pass in value to avoid races...
// a different goroutine to be scheduled // see: https://ttboj.wordpress.com/2015/07/27/golang-parallelism-issues-causing-too-many-open-files-error/
time.Sleep(1 * time.Millisecond) go func(vv *Vertex) {
defer g.wg.Done()
// TODO: if a sufficient number of workers error,
// should something be done? Will these restart
// after perma-failure if we have a graph change?
if err := g.Worker(vv); err != nil { // contains the Watch and CheckApply loops
log.Printf("%s[%s]: Exited with failure: %v", vv.Kind(), vv.GetName(), err)
return
} }
} log.Printf("%s[%s]: Exited", vv.Kind(), vv.GetName())
}(v)
}
// let the vertices run their startup code in parallel
wg.Add(1)
go func(vv *Vertex) {
defer wg.Done()
vv.Res.Started() // block until started
}(v)
if !first { // unpause!
v.Res.SendEvent(event.EventStart, true, false) // sync!
} }
} }
wg.Wait() // wait for everyone
} }
// Pause sends pause events to the graph in a topological sort order. // Pause sends pause events to the graph in a topological sort order.
@@ -522,4 +531,5 @@ func (g *Graph) Exit() {
v.SendEvent(event.EventExit, true, false) v.SendEvent(event.EventExit, true, false)
} }
g.wg.Wait() // for now, this doesn't need to be a separate Wait() method
} }

View File

@@ -40,6 +40,7 @@ const (
graphStatePaused graphStatePaused
) )
// Flags contains specific constants used by the graph.
type Flags struct { type Flags struct {
Debug bool Debug bool
} }
@@ -55,7 +56,8 @@ type Graph struct {
Adjacency map[*Vertex]map[*Vertex]*Edge // *Vertex -> *Vertex (edge) Adjacency map[*Vertex]map[*Vertex]*Edge // *Vertex -> *Vertex (edge)
Flags Flags Flags Flags
state graphState state graphState
mutex sync.Mutex // used when modifying graph State variable mutex *sync.Mutex // used when modifying graph State variable
wg *sync.WaitGroup
} }
// Vertex is the primary vertex struct in this library. // Vertex is the primary vertex struct in this library.
@@ -78,6 +80,9 @@ func NewGraph(name string) *Graph {
Name: name, Name: name,
Adjacency: make(map[*Vertex]map[*Vertex]*Edge), Adjacency: make(map[*Vertex]map[*Vertex]*Edge),
state: graphStateNil, state: graphStateNil,
// ptr b/c: Mutex/WaitGroup must not be copied after first use
mutex: &sync.Mutex{},
wg: &sync.WaitGroup{},
} }
} }
@@ -112,6 +117,8 @@ func (g *Graph) Copy() *Graph {
Adjacency: make(map[*Vertex]map[*Vertex]*Edge, len(g.Adjacency)), Adjacency: make(map[*Vertex]map[*Vertex]*Edge, len(g.Adjacency)),
Flags: g.Flags, Flags: g.Flags,
state: g.state, state: g.state,
mutex: g.mutex,
wg: g.wg,
} }
for k, v := range g.Adjacency { for k, v := range g.Adjacency {
newGraph.Adjacency[k] = v // copy newGraph.Adjacency[k] = v // copy

View File

@@ -74,8 +74,8 @@ func (obj *GAPI) Graph() (*pgraph.Graph, error) {
return g, err return g, err
} }
// SwitchStream returns nil errors every time there could be a new graph. // Next returns nil errors every time there could be a new graph.
func (obj *GAPI) SwitchStream() chan error { func (obj *GAPI) Next() chan error {
if obj.data.NoWatch { if obj.data.NoWatch {
return nil return nil
} }

View File

@@ -32,7 +32,8 @@ import (
const ( const (
// PuppetYAMLBufferSize is the maximum buffer size for the yaml input data // PuppetYAMLBufferSize is the maximum buffer size for the yaml input data
PuppetYAMLBufferSize = 65535 PuppetYAMLBufferSize = 65535
Debug = false // FIXME: integrate with global debug flag // Debug is a local debug constant used in this module
Debug = false // FIXME: integrate with global debug flag
) )
func runPuppetCommand(cmd *exec.Cmd) ([]byte, error) { func runPuppetCommand(cmd *exec.Cmd) ([]byte, error) {

View File

@@ -156,11 +156,11 @@ func (obj *SSH) Sftp() error {
var err error var err error
if obj.client == nil { if obj.client == nil {
return fmt.Errorf("Not dialed!") return fmt.Errorf("not dialed")
} }
// this check is needed because the golang path.Base function is weird! // this check is needed because the golang path.Base function is weird!
if strings.HasSuffix(obj.file, "/") { if strings.HasSuffix(obj.file, "/") {
return fmt.Errorf("File must not be a directory.") return fmt.Errorf("file must not be a directory")
} }
// we run local operations first so that remote clean up is easier... // we run local operations first so that remote clean up is easier...
@@ -254,7 +254,7 @@ func (obj *SSH) Sftp() error {
// make file executable; don't cache this in case it didn't ever happen // make file executable; don't cache this in case it didn't ever happen
// TODO: do we want the group or other bits set? // TODO: do we want the group or other bits set?
if err := obj.sftp.Chmod(obj.execpath, 0770); err != nil { if err := obj.sftp.Chmod(obj.execpath, 0770); err != nil {
return fmt.Errorf("Can't set file mode bits!") return fmt.Errorf("can't set file mode bits")
} }
// copy graph file // copy graph file
@@ -273,7 +273,7 @@ func (obj *SSH) Sftp() error {
// SftpGraphCopy is a helper function used for re-copying the graph definition. // SftpGraphCopy is a helper function used for re-copying the graph definition.
func (obj *SSH) SftpGraphCopy() (int64, error) { func (obj *SSH) SftpGraphCopy() (int64, error) {
if obj.filepath == "" { if obj.filepath == "" {
return -1, fmt.Errorf("Sftp session isn't ready yet!") return -1, fmt.Errorf("sftp session isn't ready yet")
} }
return obj.SftpCopy(obj.file, obj.filepath) return obj.SftpCopy(obj.file, obj.filepath)
} }
@@ -281,7 +281,7 @@ func (obj *SSH) SftpGraphCopy() (int64, error) {
// SftpCopy is a simple helper function that runs a local -> remote sftp copy. // SftpCopy is a simple helper function that runs a local -> remote sftp copy.
func (obj *SSH) SftpCopy(src, dst string) (int64, error) { func (obj *SSH) SftpCopy(src, dst string) (int64, error) {
if obj.sftp == nil { if obj.sftp == nil {
return -1, fmt.Errorf("Sftp session is not active!") return -1, fmt.Errorf("sftp session is not active")
} }
var err error var err error
// TODO: add a check to make sure we don't run two copies of this // TODO: add a check to make sure we don't run two copies of this
@@ -313,7 +313,7 @@ func (obj *SSH) SftpCopy(src, dst string) (int64, error) {
return n, fmt.Errorf("Can't copy to remote path: %v", err) return n, fmt.Errorf("Can't copy to remote path: %v", err)
} }
if n <= 0 { if n <= 0 {
return n, fmt.Errorf("Zero bytes copied!") return n, fmt.Errorf("zero bytes copied")
} }
return n, nil return n, nil
} }
@@ -391,10 +391,10 @@ func (obj *SSH) Tunnel() error {
var err error var err error
if len(obj.clientURLs) < 1 { if len(obj.clientURLs) < 1 {
return fmt.Errorf("Need at least one client URL to tunnel!") return fmt.Errorf("need at least one client URL to tunnel")
} }
if len(obj.remoteURLs) < 1 { if len(obj.remoteURLs) < 1 {
return fmt.Errorf("Need at least one remote URL to tunnel!") return fmt.Errorf("need at least one remote URL to tunnel")
} }
// TODO: do something less arbitrary about which one we pick? // TODO: do something less arbitrary about which one we pick?
@@ -477,10 +477,10 @@ func (obj *SSH) TunnelClose() error {
// Exec runs the binary on the remote server. // Exec runs the binary on the remote server.
func (obj *SSH) Exec() error { func (obj *SSH) Exec() error {
if obj.execpath == "" { if obj.execpath == "" {
return fmt.Errorf("Must have a binary path to execute!") return fmt.Errorf("must have a binary path to execute")
} }
if obj.filepath == "" { if obj.filepath == "" {
return fmt.Errorf("Must have a graph definition to run!") return fmt.Errorf("must have a graph definition to run")
} }
var err error var err error
@@ -772,7 +772,7 @@ func (obj *Remotes) NewSSH(file string) (*SSH, error) {
} }
host = x[0] host = x[0]
if host == "" { if host == "" {
return nil, fmt.Errorf("Empty hostname!") return nil, fmt.Errorf("empty hostname")
} }
user := defaultUser // default user := defaultUser // default
@@ -795,7 +795,7 @@ func (obj *Remotes) NewSSH(file string) (*SSH, error) {
} }
if len(auth) == 0 { if len(auth) == 0 {
return nil, fmt.Errorf("No authentication methods available!") return nil, fmt.Errorf("no authentication methods available")
} }
//hostname := config.Hostname // TODO: optionally specify local hostname somehow //hostname := config.Hostname // TODO: optionally specify local hostname somehow
@@ -804,7 +804,7 @@ func (obj *Remotes) NewSSH(file string) (*SSH, error) {
hostname = host // default to above hostname = host // default to above
} }
if util.StrInList(hostname, obj.hostnames) { if util.StrInList(hostname, obj.hostnames) {
return nil, fmt.Errorf("Remote: Hostname `%s` already exists!", hostname) return nil, fmt.Errorf("Remote: Hostname `%s` already exists", hostname)
} }
obj.hostnames = append(obj.hostnames, hostname) obj.hostnames = append(obj.hostnames, hostname)
@@ -830,7 +830,7 @@ func (obj *Remotes) NewSSH(file string) (*SSH, error) {
// sshKeyAuth is a helper function to get the ssh key auth struct needed // sshKeyAuth is a helper function to get the ssh key auth struct needed
func (obj *Remotes) sshKeyAuth() (ssh.AuthMethod, error) { func (obj *Remotes) sshKeyAuth() (ssh.AuthMethod, error) {
if obj.sshPrivIdRsa == "" { if obj.sshPrivIdRsa == "" {
return nil, fmt.Errorf("Empty path specified!") return nil, fmt.Errorf("empty path specified")
} }
p := "" p := ""
// TODO: this doesn't match strings of the form: ~james/.ssh/id_rsa // TODO: this doesn't match strings of the form: ~james/.ssh/id_rsa
@@ -843,7 +843,7 @@ func (obj *Remotes) sshKeyAuth() (ssh.AuthMethod, error) {
p = path.Join(usr.HomeDir, obj.sshPrivIdRsa[len("~/"):]) p = path.Join(usr.HomeDir, obj.sshPrivIdRsa[len("~/"):])
} }
if p == "" { if p == "" {
return nil, fmt.Errorf("Empty path specified!") return nil, fmt.Errorf("empty path specified")
} }
// A public key may be used to authenticate against the server by using // A public key may be used to authenticate against the server by using
// an unencrypted PEM-encoded private key file. If you have an encrypted // an unencrypted PEM-encoded private key file. If you have an encrypted
@@ -892,7 +892,7 @@ func (obj *Remotes) passwordCallback(user, host string) func() (string, error) {
case e := <-failchan: case e := <-failchan:
return "", e return "", e
case <-util.TimeAfterOrBlock(timeout): case <-util.TimeAfterOrBlock(timeout):
return "", fmt.Errorf("Interactive timeout reached!") return "", fmt.Errorf("interactive timeout reached")
} }
} }
return cb return cb

View File

@@ -25,7 +25,6 @@ import (
"log" "log"
"os/exec" "os/exec"
"strings" "strings"
"time"
"github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/util" "github.com/purpleidea/mgmt/util"
@@ -111,22 +110,7 @@ func (obj *ExecRes) BufioChanScanner(scanner *bufio.Scanner) (chan string, chan
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *ExecRes) Watch(processChan chan event.Event) error { func (obj *ExecRes) Watch(processChan chan event.Event) error {
if obj.IsWatching() { cuid := obj.Converger() // get the converger uid used to report status
return nil
}
obj.SetWatching(true)
defer obj.SetWatching(false)
cuid := obj.converger.Register()
defer cuid.Unregister()
var startup bool
Startup := func(block bool) <-chan time.Time {
if block {
return nil // blocks forever
//return make(chan time.Time) // blocks forever
}
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
}
var send = false // send event? var send = false // send event?
var exit = false var exit = false
@@ -169,6 +153,11 @@ func (obj *ExecRes) Watch(processChan chan event.Event) error {
bufioch, errch = obj.BufioChanScanner(scanner) bufioch, errch = obj.BufioChanScanner(scanner)
} }
// notify engine that we're running
if err := obj.Running(processChan); err != nil {
return err // bubble up a NACK...
}
for { for {
obj.SetState(ResStateWatching) // reset obj.SetState(ResStateWatching) // reset
select { select {
@@ -199,15 +188,10 @@ func (obj *ExecRes) Watch(processChan chan event.Event) error {
case <-cuid.ConvergedTimer(): case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged! cuid.SetConverged(true) // converged!
continue continue
case <-Startup(startup):
cuid.SetConverged(false)
send = true
} }
// do all our event sending all together to avoid duplicate msgs // do all our event sending all together to avoid duplicate msgs
if send { if send {
startup = true // startup finished
send = false send = false
// it is okay to invalidate the clean state on poke too // it is okay to invalidate the clean state on poke too
obj.StateOK(false) // something made state dirty obj.StateOK(false) // something made state dirty

View File

@@ -30,7 +30,6 @@ import (
"path" "path"
"path/filepath" "path/filepath"
"strings" "strings"
"time"
"github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/recwatch" "github.com/purpleidea/mgmt/recwatch"
@@ -142,22 +141,7 @@ func (obj *FileRes) Validate() error {
// must be restarted. On a clean exit it returns nil. // must be restarted. On a clean exit it returns nil.
// FIXME: Also watch the source directory when using obj.Source !!! // FIXME: Also watch the source directory when using obj.Source !!!
func (obj *FileRes) Watch(processChan chan event.Event) error { func (obj *FileRes) Watch(processChan chan event.Event) error {
if obj.IsWatching() { cuid := obj.Converger() // get the converger uid used to report status
return nil // TODO: should this be an error?
}
obj.SetWatching(true)
defer obj.SetWatching(false)
cuid := obj.converger.Register()
defer cuid.Unregister()
var startup bool
Startup := func(block bool) <-chan time.Time {
if block {
return nil // blocks forever
//return make(chan time.Time) // blocks forever
}
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
}
var err error var err error
obj.recWatcher, err = recwatch.NewRecWatcher(obj.Path, obj.Recurse) obj.recWatcher, err = recwatch.NewRecWatcher(obj.Path, obj.Recurse)
@@ -166,6 +150,11 @@ func (obj *FileRes) Watch(processChan chan event.Event) error {
} }
defer obj.recWatcher.Close() defer obj.recWatcher.Close()
// notify engine that we're running
if err := obj.Running(processChan); err != nil {
return err // bubble up a NACK...
}
var send = false // send event? var send = false // send event?
var exit = false var exit = false
@@ -200,16 +189,10 @@ func (obj *FileRes) Watch(processChan chan event.Event) error {
case <-cuid.ConvergedTimer(): case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged! cuid.SetConverged(true) // converged!
continue continue
case <-Startup(startup):
cuid.SetConverged(false)
send = true
obj.StateOK(false) // dirty
} }
// do all our event sending all together to avoid duplicate msgs // do all our event sending all together to avoid duplicate msgs
if send { if send {
startup = true // startup finished
send = false send = false
if exit, err := obj.DoSend(processChan, ""); exit || err != nil { if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
return err // we exit or bubble up a NACK... return err // we exit or bubble up a NACK...
@@ -787,7 +770,7 @@ func (obj *FileRes) Compare(res Res) bool {
if obj.Name != res.Name { if obj.Name != res.Name {
return false return false
} }
if obj.path != res.Path { if obj.path != res.path {
return false return false
} }
if (obj.Content == nil) != (res.Content == nil) { // xor if (obj.Content == nil) != (res.Content == nil) { // xor

View File

@@ -22,7 +22,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"log" "log"
"time"
"github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/util" "github.com/purpleidea/mgmt/util"
@@ -109,22 +108,7 @@ func (obj *HostnameRes) Validate() error {
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *HostnameRes) Watch(processChan chan event.Event) error { func (obj *HostnameRes) Watch(processChan chan event.Event) error {
if obj.IsWatching() { cuid := obj.Converger() // get the converger uid used to report status
return nil // TODO: should this be an error?
}
obj.SetWatching(true)
defer obj.SetWatching(false)
cuid := obj.converger.Register()
defer cuid.Unregister()
var startup bool
Startup := func(block bool) <-chan time.Time {
if block {
return nil // blocks forever
//return make(chan time.Time) // blocks forever
}
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
}
// if we share the bus with others, we will get each others messages!! // if we share the bus with others, we will get each others messages!!
bus, err := util.SystemBusPrivateUsable() // don't share the bus connection! bus, err := util.SystemBusPrivateUsable() // don't share the bus connection!
@@ -142,6 +126,11 @@ func (obj *HostnameRes) Watch(processChan chan event.Event) error {
signals := make(chan *dbus.Signal, 10) // closed by dbus package signals := make(chan *dbus.Signal, 10) // closed by dbus package
bus.Signal(signals) bus.Signal(signals)
// notify engine that we're running
if err := obj.Running(processChan); err != nil {
return err // bubble up a NACK...
}
var send = false // send event? var send = false // send event?
for { for {
@@ -164,15 +153,10 @@ func (obj *HostnameRes) Watch(processChan chan event.Event) error {
case <-cuid.ConvergedTimer(): case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged! cuid.SetConverged(true) // converged!
continue continue
case <-Startup(startup):
cuid.SetConverged(false)
send = true
} }
// do all our event sending all together to avoid duplicate msgs // do all our event sending all together to avoid duplicate msgs
if send { if send {
startup = true // startup finished
send = false send = false
if exit, err := obj.DoSend(processChan, ""); exit || err != nil { if exit, err := obj.DoSend(processChan, ""); exit || err != nil {

View File

@@ -23,7 +23,6 @@ import (
"log" "log"
"regexp" "regexp"
"strings" "strings"
"time"
"github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/event"
@@ -136,21 +135,11 @@ func (obj *MsgRes) journalPriority() journal.Priority {
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *MsgRes) Watch(processChan chan event.Event) error { func (obj *MsgRes) Watch(processChan chan event.Event) error {
if obj.IsWatching() { cuid := obj.Converger() // get the converger uid used to report status
return nil
}
obj.SetWatching(true)
defer obj.SetWatching(false)
cuid := obj.converger.Register()
defer cuid.Unregister()
var startup bool // notify engine that we're running
Startup := func(block bool) <-chan time.Time { if err := obj.Running(processChan); err != nil {
if block { return err // bubble up a NACK...
return nil // blocks forever
//return make(chan time.Time) // blocks forever
}
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
} }
var send = false // send event? var send = false // send event?
@@ -168,18 +157,11 @@ func (obj *MsgRes) Watch(processChan chan event.Event) error {
case <-cuid.ConvergedTimer(): case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged! cuid.SetConverged(true) // converged!
continue continue
case <-Startup(startup):
cuid.SetConverged(false)
send = true
} }
// do all our event sending all together to avoid duplicate msgs // do all our event sending all together to avoid duplicate msgs
if send { if send {
startup = true // startup finished
send = false send = false
// only do this on certain types of events
//obj.isStateOK = false // something made state dirty
if exit, err := obj.DoSend(processChan, ""); exit || err != nil { if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
return err // we exit or bubble up a NACK... return err // we exit or bubble up a NACK...
} }

View File

@@ -20,7 +20,6 @@ package resources
import ( import (
"encoding/gob" "encoding/gob"
"log" "log"
"time"
"github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/event"
) )
@@ -60,21 +59,11 @@ func (obj *NoopRes) Validate() error {
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *NoopRes) Watch(processChan chan event.Event) error { func (obj *NoopRes) Watch(processChan chan event.Event) error {
if obj.IsWatching() { cuid := obj.Converger() // get the converger uid used to report status
return nil // TODO: should this be an error?
}
obj.SetWatching(true)
defer obj.SetWatching(false)
cuid := obj.converger.Register()
defer cuid.Unregister()
var startup bool // notify engine that we're running
Startup := func(block bool) <-chan time.Time { if err := obj.Running(processChan); err != nil {
if block { return err // bubble up a NACK...
return nil // blocks forever
//return make(chan time.Time) // blocks forever
}
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
} }
var send = false // send event? var send = false // send event?
@@ -92,15 +81,10 @@ func (obj *NoopRes) Watch(processChan chan event.Event) error {
case <-cuid.ConvergedTimer(): case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged! cuid.SetConverged(true) // converged!
continue continue
case <-Startup(startup):
cuid.SetConverged(false)
send = true
} }
// do all our event sending all together to avoid duplicate msgs // do all our event sending all together to avoid duplicate msgs
if send { if send {
startup = true // startup finished
send = false send = false
if exit, err := obj.DoSend(processChan, ""); exit || err != nil { if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
return err // we exit or bubble up a NACK... return err // we exit or bubble up a NACK...

View File

@@ -22,7 +22,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"log" "log"
"time"
"github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/util" "github.com/purpleidea/mgmt/util"
@@ -96,22 +95,7 @@ func (obj *NspawnRes) Validate() error {
// Watch for state changes and sends a message to the bus if there is a change // Watch for state changes and sends a message to the bus if there is a change
func (obj *NspawnRes) Watch(processChan chan event.Event) error { func (obj *NspawnRes) Watch(processChan chan event.Event) error {
if obj.IsWatching() { cuid := obj.Converger() // get the converger uid used to report status
return nil
}
obj.SetWatching(true)
defer obj.SetWatching(false)
cuid := obj.converger.Register()
defer cuid.Unregister()
var startup bool
Startup := func(block bool) <-chan time.Time {
if block {
return nil // blocks forever
}
// 1/2 the resolution of converged timeout
return time.After(time.Duration(500) * time.Millisecond)
}
// this resource depends on systemd ensure that it's running // this resource depends on systemd ensure that it's running
if !systemdUtil.IsRunningSystemd() { if !systemdUtil.IsRunningSystemd() {
@@ -135,6 +119,11 @@ func (obj *NspawnRes) Watch(processChan chan event.Event) error {
buschan := make(chan *dbus.Signal, 10) buschan := make(chan *dbus.Signal, 10)
bus.Signal(buschan) bus.Signal(buschan)
// notify engine that we're running
if err := obj.Running(processChan); err != nil {
return err // bubble up a NACK...
}
var send = false var send = false
var exit = false var exit = false
@@ -165,16 +154,10 @@ func (obj *NspawnRes) Watch(processChan chan event.Event) error {
case <-cuid.ConvergedTimer(): case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged! cuid.SetConverged(true) // converged!
continue continue
case <-Startup(startup):
cuid.SetConverged(false)
send = true
obj.StateOK(false) // dirty
} }
// do all our event sending all together to avoid duplicate msgs // do all our event sending all together to avoid duplicate msgs
if send { if send {
startup = true // startup finished
send = false send = false
if exit, err := obj.DoSend(processChan, ""); exit || err != nil { if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
return err // we exit or bubble up a NACK... return err // we exit or bubble up a NACK...

View File

@@ -27,7 +27,6 @@ import (
"os" "os"
"path" "path"
"strings" "strings"
"time"
"github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/recwatch" "github.com/purpleidea/mgmt/recwatch"
@@ -169,22 +168,7 @@ Loop:
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *PasswordRes) Watch(processChan chan event.Event) error { func (obj *PasswordRes) Watch(processChan chan event.Event) error {
if obj.IsWatching() { cuid := obj.Converger() // get the converger uid used to report status
return nil // TODO: should this be an error?
}
obj.SetWatching(true)
defer obj.SetWatching(false)
cuid := obj.converger.Register()
defer cuid.Unregister()
var startup bool
Startup := func(block bool) <-chan time.Time {
if block {
return nil // blocks forever
//return make(chan time.Time) // blocks forever
}
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
}
var err error var err error
obj.recWatcher, err = recwatch.NewRecWatcher(obj.path, false) obj.recWatcher, err = recwatch.NewRecWatcher(obj.path, false)
@@ -193,6 +177,11 @@ func (obj *PasswordRes) Watch(processChan chan event.Event) error {
} }
defer obj.recWatcher.Close() defer obj.recWatcher.Close()
// notify engine that we're running
if err := obj.Running(processChan); err != nil {
return err // bubble up a NACK...
}
var send = false // send event? var send = false // send event?
var exit = false var exit = false
for { for {
@@ -220,15 +209,10 @@ func (obj *PasswordRes) Watch(processChan chan event.Event) error {
case <-cuid.ConvergedTimer(): case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged! cuid.SetConverged(true) // converged!
continue continue
case <-Startup(startup):
cuid.SetConverged(false)
send = true
} }
// do all our event sending all together to avoid duplicate msgs // do all our event sending all together to avoid duplicate msgs
if send { if send {
startup = true // startup finished
send = false send = false
if exit, err := obj.DoSend(processChan, ""); exit || err != nil { if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
return err // we exit or bubble up a NACK... return err // we exit or bubble up a NACK...

View File

@@ -23,7 +23,6 @@ import (
"log" "log"
"path" "path"
"strings" "strings"
"time"
"github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/resources/packagekit" "github.com/purpleidea/mgmt/resources/packagekit"
@@ -110,22 +109,7 @@ func (obj *PkgRes) Validate() error {
// TODO: https://github.com/hughsie/PackageKit/issues/109 // TODO: https://github.com/hughsie/PackageKit/issues/109
// TODO: https://github.com/hughsie/PackageKit/issues/110 // TODO: https://github.com/hughsie/PackageKit/issues/110
func (obj *PkgRes) Watch(processChan chan event.Event) error { func (obj *PkgRes) Watch(processChan chan event.Event) error {
if obj.IsWatching() { cuid := obj.Converger() // get the converger uid used to report status
return nil
}
obj.SetWatching(true)
defer obj.SetWatching(false)
cuid := obj.converger.Register()
defer cuid.Unregister()
var startup bool
Startup := func(block bool) <-chan time.Time {
if block {
return nil // blocks forever
//return make(chan time.Time) // blocks forever
}
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
}
bus := packagekit.NewBus() bus := packagekit.NewBus()
if bus == nil { if bus == nil {
@@ -138,6 +122,11 @@ func (obj *PkgRes) Watch(processChan chan event.Event) error {
return errwrap.Wrapf(err, "Error adding signal match") return errwrap.Wrapf(err, "Error adding signal match")
} }
// notify engine that we're running
if err := obj.Running(processChan); err != nil {
return err // bubble up a NACK...
}
var send = false // send event? var send = false // send event?
var exit = false var exit = false
@@ -175,16 +164,10 @@ func (obj *PkgRes) Watch(processChan chan event.Event) error {
case <-cuid.ConvergedTimer(): case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged! cuid.SetConverged(true) // converged!
continue continue
case <-Startup(startup):
cuid.SetConverged(false)
send = true
obj.StateOK(false) // dirty
} }
// do all our event sending all together to avoid duplicate msgs // do all our event sending all together to avoid duplicate msgs
if send { if send {
startup = true // startup finished
send = false send = false
if exit, err := obj.DoSend(processChan, ""); exit || err != nil { if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
return err // we exit or bubble up a NACK... return err // we exit or bubble up a NACK...

View File

@@ -130,6 +130,9 @@ type Base interface {
AssociateData(*Data) AssociateData(*Data)
IsWatching() bool IsWatching() bool
SetWatching(bool) SetWatching(bool)
RegisterConverger()
UnregisterConverger()
Converger() converger.ConvergerUID
GetState() ResState GetState() ResState
SetState(ResState) SetState(ResState)
DoSend(chan event.Event, string) (bool, error) DoSend(chan event.Event, string) (bool, error)
@@ -147,6 +150,9 @@ type Base interface {
GetGroup() []Res // return everyone grouped inside me GetGroup() []Res // return everyone grouped inside me
SetGroup([]Res) SetGroup([]Res)
VarDir(string) (string, error) VarDir(string) (string, error)
Running(chan event.Event) error // notify the engine that Watch started
Started() <-chan struct{} // returns when the resource has started
Starter(bool)
} }
// Res is the minimum interface you need to implement to define a new resource. // Res is the minimum interface you need to implement to define a new resource.
@@ -171,14 +177,17 @@ type BaseRes struct {
kind string kind string
events chan event.Event events chan event.Event
converger converger.Converger // converged tracking converger converger.Converger // converged tracking
prefix string // base prefix for this resource cuid converger.ConvergerUID
prefix string // base prefix for this resource
debug bool debug bool
state ResState state ResState
watching bool // is Watch() loop running ? watching bool // is Watch() loop running ?
isStateOK bool // whether the state is okay based on events or not started chan struct{} // closed when worker is started/running
isGrouped bool // am i contained within a group? starter bool // does this have indegree == 0 ? XXX: usually?
grouped []Res // list of any grouped resources isStateOK bool // whether the state is okay based on events or not
refresh bool // does this resource have a refresh to run? isGrouped bool // am i contained within a group?
grouped []Res // list of any grouped resources
refresh bool // does this resource have a refresh to run?
//refreshState StatefulBool // TODO: future stateful bool //refreshState StatefulBool // TODO: future stateful bool
} }
@@ -229,6 +238,7 @@ func (obj *BaseRes) Init() error {
return fmt.Errorf("Resource did not set kind!") return fmt.Errorf("Resource did not set kind!")
} }
obj.events = make(chan event.Event) // unbuffered chan to avoid stale events obj.events = make(chan event.Event) // unbuffered chan to avoid stale events
obj.started = make(chan struct{}) // closes when started
//dir, err := obj.VarDir("") //dir, err := obj.VarDir("")
//if err != nil { //if err != nil {
// return errwrap.Wrapf(err, "VarDir failed in Init()") // return errwrap.Wrapf(err, "VarDir failed in Init()")
@@ -275,16 +285,34 @@ func (obj *BaseRes) AssociateData(data *Data) {
obj.debug = data.Debug obj.debug = data.Debug
} }
// IsWatching tells us if the Watch() function is running. // IsWatching tells us if the Worker() function is running.
func (obj *BaseRes) IsWatching() bool { func (obj *BaseRes) IsWatching() bool {
return obj.watching return obj.watching
} }
// SetWatching stores the status of if the Watch() function is running. // SetWatching stores the status of if the Worker() function is running.
func (obj *BaseRes) SetWatching(b bool) { func (obj *BaseRes) SetWatching(b bool) {
obj.watching = b obj.watching = b
} }
// RegisterConverger sets up the cuid for the resource. This is a helper
// function for the engine, and shouldn't be called by the resources directly.
func (obj *BaseRes) RegisterConverger() {
obj.cuid = obj.converger.Register()
}
// UnregisterConverger tears down the cuid for the resource. This is a helper
// function for the engine, and shouldn't be called by the resources directly.
func (obj *BaseRes) UnregisterConverger() {
obj.cuid.Unregister()
}
// Converger returns the ConvergerUID for the resource. This should be called
// by the Watch method of the resource to set the converged state.
func (obj *BaseRes) Converger() converger.ConvergerUID {
return obj.cuid
}
// GetState returns the state of the resource. // GetState returns the state of the resource.
func (obj *BaseRes) GetState() ResState { func (obj *BaseRes) GetState() ResState {
return obj.state return obj.state
@@ -403,6 +431,13 @@ func (obj *BaseRes) VarDir(extra string) (string, error) {
return p, nil return p, nil
} }
// Started returns a channel that closes when the resource has started up.
func (obj *BaseRes) Started() <-chan struct{} { return obj.started }
// Starter sets the starter bool. This defines if a vertex has an indegree of 0.
// If we have an indegree of 0, we'll need to be a poke initiator in the graph.
func (obj *BaseRes) Starter(b bool) { obj.starter = b }
// ResToB64 encodes a resource to a base64 encoded string (after serialization) // ResToB64 encodes a resource to a base64 encoded string (after serialization)
func ResToB64(res Res) (string, error) { func ResToB64(res Res) (string, error) {
b := bytes.Buffer{} b := bytes.Buffer{}

View File

@@ -60,8 +60,8 @@ func (obj *BaseRes) ReadEvent(ev *event.Event) (exit, send bool) {
var poke bool var poke bool
// ensure that a CheckApply runs by sending with a dirty state... // ensure that a CheckApply runs by sending with a dirty state...
if ev.GetActivity() { // if previous node did work, and we were notified... if ev.GetActivity() { // if previous node did work, and we were notified...
obj.StateOK(false) // dirty //obj.StateOK(false) // not necessarily
poke = true // poke! poke = true // poke!
// XXX: this should be elsewhere in case Watch isn't used (eg: Polling instead...) // XXX: this should be elsewhere in case Watch isn't used (eg: Polling instead...)
// XXX: unless this is used in our "fallback" polling implementation??? // XXX: unless this is used in our "fallback" polling implementation???
//obj.SetRefresh(true) // TODO: is this redundant? //obj.SetRefresh(true) // TODO: is this redundant?
@@ -108,6 +108,23 @@ func (obj *BaseRes) ReadEvent(ev *event.Event) (exit, send bool) {
return true, false // required to keep the stupid go compiler happy return true, false // required to keep the stupid go compiler happy
} }
// Running is called by the Watch method of the resource once it has started up.
// This signals to the engine to kick off the initial CheckApply resource check.
func (obj *BaseRes) Running(processChan chan event.Event) error {
obj.StateOK(false) // assume we're initially dirty
cuid := obj.Converger() // get the converger uid used to report status
cuid.SetConverged(false) // a reasonable initial assumption
close(obj.started) // send started signal
// FIXME: exit return value is unused atm, so ignore it for now...
//if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
var err error
if obj.starter { // vertices of indegree == 0 should send initial pokes
_, err = obj.DoSend(processChan, "") // trigger a CheckApply
}
return err // bubble up any possible error (or nil)
}
// Send points to a value that a resource will send. // Send points to a value that a resource will send.
type Send struct { type Send struct {
Res Res // a handle to the resource which is sending a value Res Res // a handle to the resource which is sending a value

View File

@@ -23,7 +23,6 @@ import (
"encoding/gob" "encoding/gob"
"fmt" "fmt"
"log" "log"
"time"
"github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/util" "github.com/purpleidea/mgmt/util"
@@ -76,22 +75,7 @@ func (obj *SvcRes) Validate() error {
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *SvcRes) Watch(processChan chan event.Event) error { func (obj *SvcRes) Watch(processChan chan event.Event) error {
if obj.IsWatching() { cuid := obj.Converger() // get the converger uid used to report status
return nil
}
obj.SetWatching(true)
defer obj.SetWatching(false)
cuid := obj.converger.Register()
defer cuid.Unregister()
var startup bool
Startup := func(block bool) <-chan time.Time {
if block {
return nil // blocks forever
//return make(chan time.Time) // blocks forever
}
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
}
// obj.Name: svc name // obj.Name: svc name
if !systemdUtil.IsRunningSystemd() { if !systemdUtil.IsRunningSystemd() {
@@ -116,6 +100,11 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error {
buschan := make(chan *dbus.Signal, 10) buschan := make(chan *dbus.Signal, 10)
bus.Signal(buschan) bus.Signal(buschan)
// notify engine that we're running
if err := obj.Running(processChan); err != nil {
return err // bubble up a NACK...
}
var svc = fmt.Sprintf("%s.service", obj.Name) // systemd name var svc = fmt.Sprintf("%s.service", obj.Name) // systemd name
var send = false // send event? var send = false // send event?
var exit = false var exit = false
@@ -175,11 +164,6 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error {
case <-cuid.ConvergedTimer(): case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged! cuid.SetConverged(true) // converged!
continue continue
case <-Startup(startup):
cuid.SetConverged(false)
send = true
obj.StateOK(false) // dirty
} }
} else { } else {
if !activeSet { if !activeSet {
@@ -227,16 +211,10 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error {
case <-cuid.ConvergedTimer(): case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged! cuid.SetConverged(true) // converged!
continue continue
case <-Startup(startup):
cuid.SetConverged(false)
send = true
obj.StateOK(false) // dirty
} }
} }
if send { if send {
startup = true // startup finished
send = false send = false
if exit, err := obj.DoSend(processChan, ""); exit || err != nil { if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
return err // we exit or bubble up a NACK... return err // we exit or bubble up a NACK...

View File

@@ -74,27 +74,17 @@ func (obj *TimerRes) newTicker() *time.Ticker {
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *TimerRes) Watch(processChan chan event.Event) error { func (obj *TimerRes) Watch(processChan chan event.Event) error {
if obj.IsWatching() { cuid := obj.Converger() // get the converger uid used to report status
return nil
}
obj.SetWatching(true)
defer obj.SetWatching(false)
cuid := obj.converger.Register()
defer cuid.Unregister()
var startup bool
Startup := func(block bool) <-chan time.Time {
if block {
return nil // blocks forever
//return make(chan time.Time) // blocks forever
}
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
}
// create a time.Ticker for the given interval // create a time.Ticker for the given interval
obj.ticker = obj.newTicker() obj.ticker = obj.newTicker()
defer obj.ticker.Stop() defer obj.ticker.Stop()
// notify engine that we're running
if err := obj.Running(processChan); err != nil {
return err // bubble up a NACK...
}
var send = false var send = false
for { for {
@@ -113,13 +103,9 @@ func (obj *TimerRes) Watch(processChan chan event.Event) error {
case <-cuid.ConvergedTimer(): case <-cuid.ConvergedTimer():
cuid.SetConverged(true) cuid.SetConverged(true)
continue continue
case <-Startup(startup):
cuid.SetConverged(false)
send = true
} }
if send { if send {
startup = true // startup finished
send = false send = false
if exit, err := obj.DoSend(processChan, "timer ticked"); exit || err != nil { if exit, err := obj.DoSend(processChan, "timer ticked"); exit || err != nil {
return err // we exit or bubble up a NACK... return err // we exit or bubble up a NACK...

View File

@@ -132,22 +132,7 @@ func (obj *VirtRes) connect() (conn libvirt.VirConnection, err error) {
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *VirtRes) Watch(processChan chan event.Event) error { func (obj *VirtRes) Watch(processChan chan event.Event) error {
if obj.IsWatching() { cuid := obj.Converger() // get the converger uid used to report status
return nil
}
obj.SetWatching(true)
defer obj.SetWatching(false)
cuid := obj.converger.Register()
defer cuid.Unregister()
var startup bool
Startup := func(block bool) <-chan time.Time {
if block {
return nil // blocks forever
//return make(chan time.Time) // blocks forever
}
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
}
conn, err := obj.connect() conn, err := obj.connect()
if err != nil { if err != nil {
@@ -203,6 +188,11 @@ func (obj *VirtRes) Watch(processChan chan event.Event) error {
) )
defer conn.DomainEventDeregister(callbackID) defer conn.DomainEventDeregister(callbackID)
// notify engine that we're running
if err := obj.Running(processChan); err != nil {
return err // bubble up a NACK...
}
var send = false var send = false
var exit = false var exit = false
@@ -260,15 +250,9 @@ func (obj *VirtRes) Watch(processChan chan event.Event) error {
case <-cuid.ConvergedTimer(): case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged! cuid.SetConverged(true) // converged!
continue continue
case <-Startup(startup):
cuid.SetConverged(false)
send = true
obj.StateOK(false) // dirty
} }
if send { if send {
startup = true // startup finished
send = false send = false
if exit, err := obj.DoSend(processChan, ""); exit || err != nil { if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
return err // we exit or bubble up a NACK... return err // we exit or bubble up a NACK...

View File

@@ -9,7 +9,7 @@ ROOT=$(dirname "${BASH_SOURCE}")/..
GO_VERSION=($(go version)) GO_VERSION=($(go version))
if [[ -z $(echo "${GO_VERSION[2]}" | grep -E 'go1.2|go1.3|go1.4|go1.5|go1.6|go1.7|go1.8') ]]; then if [[ -z $(echo "${GO_VERSION[2]}" | grep -E 'go1.2|go1.3|go1.4|go1.5|go1.6|go1.7|go1.8|devel') ]]; then
echo "Unknown go version '${GO_VERSION[2]}', failing gofmt." echo "Unknown go version '${GO_VERSION[2]}', failing gofmt."
exit 1 exit 1
fi fi

View File

@@ -74,8 +74,8 @@ func (obj *GAPI) Graph() (*pgraph.Graph, error) {
return g, err return g, err
} }
// SwitchStream returns nil errors every time there could be a new graph. // Next returns nil errors every time there could be a new graph.
func (obj *GAPI) SwitchStream() chan error { func (obj *GAPI) Next() chan error {
if obj.data.NoWatch { if obj.data.NoWatch {
return nil return nil
} }

View File

@@ -34,10 +34,6 @@ import (
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
) )
const (
Debug = false // FIXME: integrate with global debug flag
)
type collectorResConfig struct { type collectorResConfig struct {
Kind string `yaml:"kind"` Kind string `yaml:"kind"`
Pattern string `yaml:"pattern"` // XXX: Not Implemented Pattern string `yaml:"pattern"` // XXX: Not Implemented
@@ -120,9 +116,6 @@ func (c *GraphConfig) NewGraphFromConfig(hostname string, world gapi.World, noop
slice := reflect.ValueOf(iface) slice := reflect.ValueOf(iface)
// XXX: should we just drop these everywhere and have the kind strings be all lowercase? // XXX: should we just drop these everywhere and have the kind strings be all lowercase?
kind := util.FirstToUpper(name) kind := util.FirstToUpper(name)
if Debug {
log.Printf("Config: Processing: %v...", kind)
}
for j := 0; j < slice.Len(); j++ { // loop through resources of same kind for j := 0; j < slice.Len(); j++ { // loop through resources of same kind
x := slice.Index(j).Interface() x := slice.Index(j).Interface()
res, ok := x.(resources.Res) // convert to Res type res, ok := x.(resources.Res) // convert to Res type