Compare commits
15 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
19760be0bc | ||
|
|
b3ea33f88d | ||
|
|
5b3425a689 | ||
|
|
a3d157bde6 | ||
|
|
2c8c9264a4 | ||
|
|
0009d9b20e | ||
|
|
dd8d17232f | ||
|
|
6312b9225f | ||
|
|
68cc09fef2 | ||
|
|
0651c9de65 | ||
|
|
38261ec809 | ||
|
|
067932aebf | ||
|
|
af47511d58 | ||
|
|
36b916f27f | ||
|
|
e519811893 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -2,7 +2,6 @@
|
||||
.omv/
|
||||
.ssh/
|
||||
.vagrant/
|
||||
mgmt-documentation.pdf
|
||||
old/
|
||||
tmp/
|
||||
*_stringer.go
|
||||
|
||||
4
Makefile
4
Makefile
@@ -138,8 +138,8 @@ format: gofmt yamlfmt
|
||||
|
||||
docs: $(PROGRAM)-documentation.pdf
|
||||
|
||||
$(PROGRAM)-documentation.pdf: DOCUMENTATION.md
|
||||
pandoc DOCUMENTATION.md -o '$(PROGRAM)-documentation.pdf'
|
||||
$(PROGRAM)-documentation.pdf: docs/documentation.md
|
||||
pandoc docs/documentation.md -o docs/'$(PROGRAM)-documentation.pdf'
|
||||
|
||||
#
|
||||
# build aliases
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
|
||||
[](https://goreportcard.com/report/github.com/purpleidea/mgmt)
|
||||
[](http://travis-ci.org/purpleidea/mgmt)
|
||||
[](DOCUMENTATION.md)
|
||||
[](docs/documentation.md)
|
||||
[](https://godoc.org/github.com/purpleidea/mgmt)
|
||||
[](https://webchat.freenode.net/?channels=#mgmtconfig)
|
||||
[](https://ci.centos.org/job/purpleidea-mgmt/)
|
||||
@@ -23,7 +23,7 @@ With your help you'll be able to influence our design and get us there sooner!
|
||||
|
||||
## Questions:
|
||||
Please join the [#mgmtconfig](https://webchat.freenode.net/?channels=#mgmtconfig) IRC community!
|
||||
If you have a well phrased question that might benefit others, consider asking it by sending a patch to the documentation [FAQ](https://github.com/purpleidea/mgmt/blob/master/DOCUMENTATION.md#usage-and-frequently-asked-questions) section. I'll merge your question, and a patch with the answer!
|
||||
If you have a well phrased question that might benefit others, consider asking it by sending a patch to the documentation [FAQ](https://github.com/purpleidea/mgmt/blob/master/docs/documentation.md#usage-and-frequently-asked-questions) section. I'll merge your question, and a patch with the answer!
|
||||
|
||||
## Quick start:
|
||||
* Make sure you have golang version 1.6 or greater installed.
|
||||
@@ -49,7 +49,7 @@ cd $GOPATH/src/github.com/purpleidea/mgmt
|
||||
Please look in the [examples/](examples/) folder for more examples!
|
||||
|
||||
## Documentation:
|
||||
Please see: the manually created [DOCUMENTATION.md](DOCUMENTATION.md) (also available as [PDF](https://pdfdoc-purpleidea.rhcloud.com/pdf/https://github.com/purpleidea/mgmt/blob/master/DOCUMENTATION.md)) and the automatically generated [GoDoc documentation](https://godoc.org/github.com/purpleidea/mgmt).
|
||||
Please see: the manually created [documentation.md](docs/documentation.md) (also available as [PDF](https://pdfdoc-purpleidea.rhcloud.com/pdf/https://github.com/purpleidea/mgmt/blob/master/docs/documentation.md)) and the automatically generated [GoDoc documentation](https://godoc.org/github.com/purpleidea/mgmt).
|
||||
|
||||
## Roadmap:
|
||||
Please see: [TODO.md](TODO.md) for a list of upcoming work and TODO items.
|
||||
|
||||
1
docs/.gitignore
vendored
Normal file
1
docs/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
mgmt-documentation.pdf
|
||||
@@ -23,7 +23,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
####Available from:
|
||||
####[https://github.com/purpleidea/mgmt/](https://github.com/purpleidea/mgmt/)
|
||||
|
||||
####This documentation is available in: [Markdown](https://github.com/purpleidea/mgmt/blob/master/DOCUMENTATION.md) or [PDF](https://pdfdoc-purpleidea.rhcloud.com/pdf/https://github.com/purpleidea/mgmt/blob/master/DOCUMENTATION.md) format.
|
||||
####This documentation is available in: [Markdown](https://github.com/purpleidea/mgmt/blob/master/docs/documentation.md) or [PDF](https://pdfdoc-purpleidea.rhcloud.com/pdf/https://github.com/purpleidea/mgmt/blob/master/docs/documentation.md) format.
|
||||
|
||||
####Table of Contents
|
||||
|
||||
@@ -14,7 +14,7 @@ This document goes into detail on how this works, and lists
|
||||
some pitfalls and limitations.
|
||||
|
||||
For basic instructions on how to use the Puppet support, see
|
||||
the [main documentation](DOCUMENTATION.md#puppet-support).
|
||||
the [main documentation](documentation.md#puppet-support).
|
||||
|
||||
##Prerequisites
|
||||
|
||||
534
docs/resource-guide.md
Normal file
534
docs/resource-guide.md
Normal file
@@ -0,0 +1,534 @@
|
||||
#mgmt
|
||||
|
||||
<!--
|
||||
Mgmt
|
||||
Copyright (C) 2013-2016+ James Shubin and the project contributors
|
||||
Written by James Shubin <james@shubin.ca> and the project contributors
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU Affero General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Affero General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Affero General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
-->
|
||||
|
||||
##mgmt resource guide by [James](https://ttboj.wordpress.com/)
|
||||
####Available from:
|
||||
####[https://github.com/purpleidea/mgmt/](https://github.com/purpleidea/mgmt/)
|
||||
|
||||
####This documentation is available in: [Markdown](https://github.com/purpleidea/mgmt/blob/master/docs/resource-guide.md) or [PDF](https://pdfdoc-purpleidea.rhcloud.com/pdf/https://github.com/purpleidea/mgmt/blob/master/docs/resource-guide.md) format.
|
||||
|
||||
####Table of Contents
|
||||
|
||||
1. [Overview](#overview)
|
||||
2. [Theory - Resource theory in mgmt](#theory)
|
||||
3. [Resource API - Getting started with mgmt](#resource-api)
|
||||
* [Init - Initialize the resource](#init)
|
||||
* [CheckApply - Check and apply resource state](#checkapply)
|
||||
* [Watch - Detect resource changes](#watch)
|
||||
* [Compare - Compare resource with another](#compare)
|
||||
4. [Further considerations - More information about resource writing](#further-considerations)
|
||||
5. [Automatic edges - Adding automatic resources dependencies](#automatic-edges)
|
||||
6. [Automatic grouping - Grouping multiple resources into one](#automatic-grouping)
|
||||
7. [Send/Recv - Communication between resources](#send-recv)
|
||||
8. [Composite resources - Importing code from one resource into another](#composite-resources)
|
||||
9. [FAQ - Frequently asked questions](#frequently-asked-questions)
|
||||
10. [Suggestions - API change suggestions](#suggestions)
|
||||
11. [Authors - Authors and contact information](#authors)
|
||||
|
||||
##Overview
|
||||
|
||||
The `mgmt` tool has built-in resource primitives which make up the building
|
||||
blocks of any configuration. Each instance of a resource is mapped to a single
|
||||
vertex in the resource [graph](https://en.wikipedia.org/wiki/Directed_acyclic_graph).
|
||||
This guide is meant to instruct developers on how to write a brand new resource.
|
||||
Since `mgmt` and the core resources are written in golang, some prior golang
|
||||
knowledge is assumed.
|
||||
|
||||
##Theory
|
||||
|
||||
Resources in `mgmt` are similar to resources in other systems in that they are
|
||||
[idempotent](https://en.wikipedia.org/wiki/Idempotence). Our resources are
|
||||
uniquely different in that they can detect when their state has changed, and as
|
||||
a result can run to revert or repair this change instantly. For some background
|
||||
on this design, please read the
|
||||
[original article](https://ttboj.wordpress.com/2016/01/18/next-generation-configuration-mgmt/)
|
||||
on the subject.
|
||||
|
||||
##Resource API
|
||||
|
||||
To implement a resource in `mgmt` it must satisfy the
|
||||
[`Res`](https://github.com/purpleidea/mgmt/blob/master/resources/resources.go)
|
||||
interface. What follows are each of the method signatures and a description of
|
||||
each.
|
||||
|
||||
###Init
|
||||
```golang
|
||||
Init() error
|
||||
```
|
||||
|
||||
This is called to initialize the resource. If something goes wrong, it should
|
||||
return an error. It should set the resource `kind`, do any resource specific
|
||||
work, and finish by calling the `Init` method of the base resource.
|
||||
|
||||
####Example
|
||||
```golang
|
||||
// Init initializes the Foo resource.
|
||||
func (obj *FooRes) Init() error {
|
||||
obj.BaseRes.kind = "Foo" // must set capitalized resource kind
|
||||
// run the resource specific initialization, and error if anything fails
|
||||
if some_error {
|
||||
return err // something went wrong!
|
||||
}
|
||||
return obj.BaseRes.Init() // call the base resource init
|
||||
}
|
||||
```
|
||||
|
||||
###CheckApply
|
||||
```golang
|
||||
CheckApply(apply bool) (checkOK bool, err error)
|
||||
```
|
||||
|
||||
`CheckApply` is where the real _work_ is done. Under normal circumstances, this
|
||||
function should check if the state of this resource is correct, and if so, it
|
||||
should return: `(true, nil)`. If the `apply` variable is set to `true`, then
|
||||
this means that we should then proceed to run the changes required to bring the
|
||||
resource into the correct state. If the `apply` variable is set to `false`, then
|
||||
the resource is operating in _noop_ mode and _no operations_ should be executed!
|
||||
|
||||
After having executed the necessary operations to bring the resource back into
|
||||
the desired state, or after having detected that the state was incorrect, but
|
||||
that changes can't be made because `apply` is `false`, you should then return
|
||||
`(false, nil)`.
|
||||
|
||||
You must cause the resource to converge during a single execution of this
|
||||
function. If you cannot, then you must return an error! The exception to this
|
||||
rule is that if an external force changes the state of the resource while it is
|
||||
being remedied, it is possible to return from this function even though the
|
||||
resource isn't now converged. This is not a bug, as the resources `Watch`
|
||||
facility will detect the change, ultimately resulting in a subsequent call to
|
||||
`CheckApply`.
|
||||
|
||||
####Example
|
||||
```golang
|
||||
// CheckApply does the idempotent work of checking and applying resource state.
|
||||
func (obj *FooRes) CheckApply(apply bool) (bool, error) {
|
||||
// check the state
|
||||
if state_is_okay { return true, nil } // done early! :)
|
||||
// state was bad
|
||||
if !apply { return false, nil } // don't apply; !stateok, nil
|
||||
// do the apply!
|
||||
return false, nil // after success applying
|
||||
if any_error { return false, err } // anytime there's an err!
|
||||
}
|
||||
```
|
||||
|
||||
The `CheckApply` function is called by the `mgmt` engine when it believes a call
|
||||
is necessary. Under certain conditions when a `Watch` call does not invalidate
|
||||
the state of the resource, and no refresh call was sent, its execution might be
|
||||
skipped. This is an engine optimization, and not a bug. It is mentioned here in
|
||||
the documentation in case you are confused as to why a debug message you've
|
||||
added to the code isn't always printed.
|
||||
|
||||
####Refresh notifications
|
||||
Some resources may choose to support receiving refresh notifications. In general
|
||||
these should be avoided if possible, but nevertheless, they do make sense in
|
||||
certain situations. Resources that support these need to verify if one was sent
|
||||
during the CheckApply phase of execution. This is accomplished by calling the
|
||||
`Refresh() bool` method of the resource, and inspecting the return value. This
|
||||
is only necessary if you plan to perform a refresh action. Refresh actions
|
||||
should still respect the `apply` variable, and no system changes should be made
|
||||
if it is `false`. Refresh notifications are generated by any resource when an
|
||||
action is applied by that resource and are transmitted through graph edges which
|
||||
have enabled their propagation. Resources that currently perform some refresh
|
||||
action include `svc`, `timer`, and `password`.
|
||||
|
||||
####Paired execution
|
||||
For many resources it is not uncommon to see `CheckApply` run twice in rapid
|
||||
succession. This is usually not a pathological occurrence, but rather a healthy
|
||||
pattern which is a consequence of the event system. When the state of the
|
||||
resource is incorrect, `CheckApply` will run to remedy the state. In response to
|
||||
having just changed the state, it is usually the case that this repair will
|
||||
trigger the `Watch` code! In response, a second `CheckApply` is triggered, which
|
||||
will likely find the state to now be correct.
|
||||
|
||||
####Summary
|
||||
* Anytime an error occurs during `CheckApply`, you should return `(false, err)`.
|
||||
* If the state is correct and no changes are needed, return `(true, nil)`.
|
||||
* You should only make changes to the system if `apply` is set to `true`.
|
||||
* After checking the state and possibly applying the fix, return `(false, nil)`.
|
||||
* Returning `(true, err)` is a programming error and will cause a `Fatal`.
|
||||
|
||||
###Watch
|
||||
```golang
|
||||
Watch(chan Event) error
|
||||
```
|
||||
|
||||
`Watch` is a main loop that runs and sends messages when it detects that the
|
||||
state of the resource might have changed. To send a message you should write to
|
||||
the input `Event` channel using the `DoSend` helper method. The Watch function
|
||||
should run continuously until a shutdown message is received. If at any time
|
||||
something goes wrong, you should return an error, and the `mgmt` engine will
|
||||
handle possibly restarting the main loop based on the `retry` meta parameters.
|
||||
|
||||
It is better to send an event notification which turns out to be spurious, than
|
||||
to miss a possible event. Resources which can miss events are incorrect and need
|
||||
to be re-engineered so that this isn't the case. If you have an idea for a
|
||||
resource which would fit this criteria, but you can't find a solution, please
|
||||
contact the `mgmt` maintainers so that this problem can be investigated and a
|
||||
possible system level engineering fix can be found.
|
||||
|
||||
You may have trouble deciding how much resource state checking should happen in
|
||||
the `Watch` loop versus deferring it all to the `CheckApply` method. You may
|
||||
want to put some simple fast path checking in `Watch` to avoid generating
|
||||
obviously spurious events, but in general it's best to keep the `Watch` method
|
||||
as simple as possible. Contact the `mgmt` maintainers if you're not sure.
|
||||
|
||||
If the resource is activated in `polling` mode, the `Watch` method will not get
|
||||
executed. As a result, the resource must still work even if the main loop is not
|
||||
running.
|
||||
|
||||
####Select
|
||||
The lifetime of most resources `Watch` method should be spent in an infinite
|
||||
loop that is bounded by a `select` call. The `select` call is the point where
|
||||
our method hands back control to the engine (and the kernel) so that we can
|
||||
sleep until something of interest wakes us up. In this loop we must process
|
||||
events from the engine via the `<-obj.Events()` call, wait for the converged
|
||||
timeout with `<-cuid.ConvergedTimer()`, and receive events for our resource
|
||||
itself!
|
||||
|
||||
####Events
|
||||
If we receive an internal event from the `<-obj.Events()` method, we can read it
|
||||
with the ReadEvent helper function. This function tells us if we should shutdown
|
||||
our resource, and if we should generate an event. When we want to send an event,
|
||||
we use the `DoSend` helper function. It is also important to mark the resource
|
||||
state as `dirty` if we believe it might have changed. We do this with the
|
||||
`StateOK(false)` function.
|
||||
|
||||
####Startup
|
||||
Once the `Watch` function has finished starting up successfully, it is important
|
||||
to generate one event to notify the `mgmt` engine that we're now listening
|
||||
successfully, so that it can run an initial `CheckApply` to ensure we're safely
|
||||
tracking a healthy state and that we didn't miss anything when `Watch` was down
|
||||
or from before `mgmt` was running. It does this by calling the `Running` method.
|
||||
|
||||
####Converged
|
||||
The engine might be asked to shutdown when the entire state of the system has
|
||||
not seen any changes for some duration of time. In order for the engine to be
|
||||
able to make this determination, each resource must report its converged state.
|
||||
To do this, the `Watch` method should get the `ConvergedUID` handle that has
|
||||
been prepared for it by the engine. This is done by calling the `Converger`
|
||||
method on the resource object. The result can be used to set the converged
|
||||
status with `SetConverged`, and to notify when the particular timeout has been
|
||||
reached by waiting on `ConvergedTimer`.
|
||||
|
||||
Instead of interacting with the `ConvergedUID` with these two methods, we can
|
||||
instead use the `StartTimer` and `ResetTimer` methods which accomplish the same
|
||||
thing, but provide a `select`-free interface for different coding situations.
|
||||
|
||||
####Example
|
||||
```golang
|
||||
// Watch is the listener and main loop for this resource.
|
||||
func (obj *FooRes) Watch(processChan chan event.Event) error {
|
||||
cuid := obj.Converger() // get the converger uid used to report status
|
||||
|
||||
// setup the Foo resource
|
||||
var err error
|
||||
if err, obj.foo = OpenFoo(); err != nil {
|
||||
return err // we couldn't startup
|
||||
}
|
||||
defer obj.whatever.CloseFoo() // shutdown our
|
||||
|
||||
// notify engine that we're running
|
||||
if err := obj.Running(processChan); err != nil {
|
||||
return err // bubble up a NACK...
|
||||
}
|
||||
|
||||
var send = false // send event?
|
||||
var exit = false
|
||||
for {
|
||||
obj.SetState(ResStateWatching) // reset
|
||||
select {
|
||||
case event := <-obj.Events():
|
||||
cuid.SetConverged(false)
|
||||
// we avoid sending events on unpause
|
||||
if exit, send = obj.ReadEvent(&event); exit {
|
||||
return nil // exit
|
||||
}
|
||||
|
||||
// the actual events!
|
||||
case event := <-obj.foo.Events:
|
||||
if is_an_event {
|
||||
send = true // used below
|
||||
cuid.SetConverged(false)
|
||||
obj.StateOK(false) // dirty
|
||||
}
|
||||
|
||||
// event errors
|
||||
case err := <-obj.foo.Errors:
|
||||
cuuid.SetConverged(false)
|
||||
return err // will cause a retry or permanent failure
|
||||
|
||||
case <-cuid.ConvergedTimer():
|
||||
cuid.SetConverged(true) // converged!
|
||||
continue
|
||||
}
|
||||
|
||||
// do all our event sending all together to avoid duplicate msgs
|
||||
if send {
|
||||
send = false
|
||||
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
|
||||
return err // we exit or bubble up a NACK...
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
####Summary
|
||||
* Remember to call the appropriate `converger` methods throughout the resource.
|
||||
* Remember to call `Startup` when the `Watch` is running successfully.
|
||||
* Remember to process internal events and shutdown promptly if asked to.
|
||||
* Ensure the design of your resource is well thought out.
|
||||
* Have a look at the existing resources for a rough idea of how this all works.
|
||||
|
||||
###Compare
|
||||
```golang
|
||||
Compare(Res) bool
|
||||
```
|
||||
|
||||
Each resource must have a `Compare` method. This takes as input another resource
|
||||
and must return whether they are identical or not. This is used for identifying
|
||||
if an existing resource can be used in place of a new one with a similar set of
|
||||
parameters. In particular, when switching from one graph to a new (possibly
|
||||
identical) graph, this avoids recomputing the state for resources which don't
|
||||
change or that are sufficiently similar that they don't need to be swapped out.
|
||||
|
||||
In general if all the resource properties are identical, then they usually don't
|
||||
need to be changed. On occasion, not all of them need to be compared, in
|
||||
particular if they store some generated state, or if they aren't significant in
|
||||
some way.
|
||||
|
||||
####Example
|
||||
```golang
|
||||
// Compare two resources and return if they are equivalent.
|
||||
func (obj *FooRes) Compare(res Res) bool {
|
||||
switch res.(type) {
|
||||
case *FooRes: // only compare to other resources of the Foo kind!
|
||||
res := res.(*FileRes)
|
||||
if !obj.BaseRes.Compare(res) { // call base Compare
|
||||
return false
|
||||
}
|
||||
if obj.Name != res.Name {
|
||||
return false
|
||||
}
|
||||
if obj.whatever != res.whatever {
|
||||
return false
|
||||
}
|
||||
if obj.Flag != res.Flag {
|
||||
return false
|
||||
}
|
||||
default:
|
||||
return false // different kind of resource
|
||||
}
|
||||
return true // they must match!
|
||||
}
|
||||
```
|
||||
|
||||
###Validate
|
||||
```golang
|
||||
Validate() error
|
||||
```
|
||||
|
||||
This method is used to validate if the populated resource struct is a valid
|
||||
representation of the resource kind. If it does not conform to the resource
|
||||
specifications, it should generate an error. If you notice that this method is
|
||||
quite large, it might be an indication that you might want to reconsider the
|
||||
parameter list and interface to this resource.
|
||||
|
||||
###GetUIDs
|
||||
```golang
|
||||
GetUIDs() []ResUID
|
||||
```
|
||||
|
||||
The `GetUIDs` method returns a list of `ResUID` interfaces that represent the
|
||||
particular resource uniquely. This is used with the AutoEdges API to determine
|
||||
if another resource can match a dependency to this one.
|
||||
|
||||
###AutoEdges
|
||||
```golang
|
||||
AutoEdges() AutoEdge
|
||||
```
|
||||
|
||||
This returns a struct that implements the `AutoEdge` interface. This struct
|
||||
is used to match other resources that might be relevant dependencies for this
|
||||
resource.
|
||||
|
||||
###CollectPattern
|
||||
```golang
|
||||
CollectPattern() string
|
||||
```
|
||||
|
||||
This is currently a stub and will be updated once the DSL is further along.
|
||||
|
||||
##Further considerations
|
||||
There is some additional information that any resource writer will need to know.
|
||||
Each issue is listed separately below!
|
||||
|
||||
###Resource struct
|
||||
Each resource will implement methods as pointer receivers on a resource struct.
|
||||
The resource struct must include an anonymous reference to the `BaseRes` struct.
|
||||
The naming convention for resources is that they end with a `Res` suffix. If
|
||||
you'd like your resource to be accessible by the `YAML` graph API (GAPI), then
|
||||
you'll need to include the appropriate YAML fields as shown below.
|
||||
|
||||
####Example
|
||||
```golang
|
||||
type FooRes struct {
|
||||
BaseRes `yaml:",inline"` // base properties
|
||||
|
||||
Whatever string `yaml:"whatever"` // you pick!
|
||||
Bar int // no yaml, used as public output value for send/recv
|
||||
Baz bool `yaml:"baz"` // something else
|
||||
|
||||
something string // some private field
|
||||
}
|
||||
```
|
||||
|
||||
###YAML
|
||||
In addition to labelling your resource struct with YAML fields, you must also
|
||||
add an entry to the internal `GraphConfig` struct. It is a fairly straight
|
||||
forward one line patch.
|
||||
|
||||
```golang
|
||||
type GraphConfig struct {
|
||||
// [snip...]
|
||||
Resources struct {
|
||||
Noop []*resources.NoopRes `yaml:"noop"`
|
||||
File []*resources.FileRes `yaml:"file"`
|
||||
// [snip...]
|
||||
Foo []*resources.FooRes `yaml:"foo"` // tada :)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
###Gob registration
|
||||
All resources must be registered with the `golang` _gob_ module so that they can
|
||||
be encoded and decoded. Make sure to include the following code snippet for this
|
||||
to work.
|
||||
|
||||
```golang
|
||||
import "encoding/gob"
|
||||
func init() { // special golang method that runs once
|
||||
gob.Register(&FooRes{}) // substitude your resource here
|
||||
}
|
||||
```
|
||||
|
||||
##Automatic edges
|
||||
Automatic edges in `mgmt` are well described in [this article](https://ttboj.wordpress.com/2016/03/14/automatic-edges-in-mgmt/).
|
||||
The best example of this technique can be seen in the `svc` resource.
|
||||
Unfortunately no further documentation about this subject has been written. To
|
||||
expand this section, please send a patch! Please contact us if you'd like to
|
||||
work on a resource that uses this feature, or to add it to an existing one!
|
||||
|
||||
##Automatic grouping
|
||||
Automatic grouping in `mgmt` is well described in [this article](https://ttboj.wordpress.com/2016/03/30/automatic-grouping-in-mgmt/).
|
||||
The best example of this technique can be seen in the `pkg` resource.
|
||||
Unfortunately no further documentation about this subject has been written. To
|
||||
expand this section, please send a patch! Please contact us if you'd like to
|
||||
work on a resource that uses this feature, or to add it to an existing one!
|
||||
|
||||
|
||||
##Send/Recv
|
||||
In `mgmt` there is a novel concept called _Send/Recv_. For some background,
|
||||
please [read the introductory article](https://ttboj.wordpress.com/2016/12/07/sendrecv-in-mgmt/).
|
||||
When using this feature, the engine will automatically send the user specified
|
||||
value to the intended destination without requiring any resource specific code.
|
||||
Any time that one of the destination values is changed, the engine automatically
|
||||
marks the resource state as `dirty`. To detect if a particular value was
|
||||
received, and if it changed (during this invocation of CheckApply) from the
|
||||
previous value, you can query the Recv parameter. It will contain a `map` of all
|
||||
the keys which can be received on, and the value has a `Changed` property which
|
||||
will indicate whether the value was updated on this particular `CheckApply`
|
||||
invocation. The type of the sending key must match that of the receiving one.
|
||||
This can _only_ be done inside of the `CheckApply` function!
|
||||
|
||||
```golang
|
||||
// inside CheckApply, probably near the top
|
||||
if val, exists := obj.Recv["SomeKey"]; exists {
|
||||
log.Printf("SomeKey was sent to us from: %s[%s].%s", val.Res.Kind(), val.Res.GetName(), val.Key)
|
||||
if val.Changed {
|
||||
log.Printf("SomeKey was just updated!")
|
||||
// you may want to invalidate some local cache
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Astute readers will note that there isn't anything that prevents a user from
|
||||
sending an identically typed value to some arbitrary (public) key that the
|
||||
resource author hadn't considered! While this is true, resources should probably
|
||||
work within this problem space anyways. The rule of thumb is that any public
|
||||
parameter which is normally used in a resource can be used safely.
|
||||
|
||||
One subtle scenario is that if a resource creates a local cache or stores a
|
||||
computation that depends on the value of a public parameter and will require
|
||||
invalidation should that public parameter change, then you must detect that
|
||||
scenario and invalidate the cache when it occurs. This *must* be processed
|
||||
before there is a possibility of failure in CheckApply, because if we fail (and
|
||||
possibly run again) the subsequent send->recv transfer might not have a new
|
||||
value to copy, and therefore we won't see this notification of change.
|
||||
Therefore, it is important to process these promptly, if they must not be lost,
|
||||
such as for cache invalidation.
|
||||
|
||||
Remember, `Send/Recv` only changes your resource code if you cache state.
|
||||
|
||||
##Composite resources
|
||||
Composite resources are resources which embed one or more existing resources.
|
||||
This is useful to prevent code duplication in higher level resource scenarios.
|
||||
The best example of this technique can be seen in the `nspawn` resource which
|
||||
can be seen to partially embed a `svc` resource, but without its `Watch`.
|
||||
Unfortunately no further documentation about this subject has been written. To
|
||||
expand this section, please send a patch! Please contact us if you'd like to
|
||||
work on a resource that uses this feature, or to add it to an existing one!
|
||||
|
||||
##Frequently asked questions
|
||||
(Send your questions as a patch to this FAQ! I'll review it, merge it, and
|
||||
respond by commit with the answer.)
|
||||
|
||||
###Can I write resources in a different language?
|
||||
Currently `golang` is the only supported language for built-in resources. We
|
||||
might consider allowing external resources to be imported in the future. This
|
||||
will likely require a language that can expose a C-like API, such as `python` or
|
||||
`ruby`. Custom `golang` resources are already possible when using mgmt as a lib.
|
||||
Higher level resource collections will be possible once the `mgmt` DSL is ready.
|
||||
|
||||
###What new resource primitives need writing?
|
||||
There are still many ideas for new resources that haven't been written yet. If
|
||||
you'd like to contribute one, please contact us and tell us about your idea!
|
||||
|
||||
###Where can I find more information about mgmt?
|
||||
Additional blog posts, videos and other material [is available!](https://github.com/purpleidea/mgmt/#on-the-web).
|
||||
|
||||
##Suggestions
|
||||
If you have any ideas for API changes or other improvements to resource writing,
|
||||
please let us know! We're still pre 1.0 and pre 0.1 and happy to break API in
|
||||
order to get it right!
|
||||
|
||||
##Authors
|
||||
|
||||
Copyright (C) 2013-2016+ James Shubin and the project contributors
|
||||
|
||||
Please see the
|
||||
[AUTHORS](https://github.com/purpleidea/mgmt/tree/master/AUTHORS) file
|
||||
for more information.
|
||||
|
||||
* [github](https://github.com/purpleidea/)
|
||||
* [@purpleidea](https://twitter.com/#!/purpleidea)
|
||||
* [https://ttboj.wordpress.com/](https://ttboj.wordpress.com/)
|
||||
@@ -316,7 +316,7 @@ func (obj *EmbdEtcd) Connect(reconnect bool) error {
|
||||
if emax > maxClientConnectRetries {
|
||||
log.Printf("Etcd: The dataDir (%s) might be inconsistent or corrupt.", obj.dataDir)
|
||||
log.Printf("Etcd: Please see: %s", "https://github.com/purpleidea/mgmt/blob/master/DOCUMENTATION.md#what-does-the-error-message-about-an-inconsistent-datadir-mean")
|
||||
obj.cError = fmt.Errorf("Can't find an available endpoint.")
|
||||
obj.cError = fmt.Errorf("can't find an available endpoint")
|
||||
return obj.cError
|
||||
}
|
||||
err = &CtxDelayErr{time.Duration(emax) * time.Second, "No endpoints available yet!"} // retry with backoff...
|
||||
|
||||
@@ -85,8 +85,8 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) {
|
||||
return g, err
|
||||
}
|
||||
|
||||
// SwitchStream returns nil errors every time there could be a new graph.
|
||||
func (obj *MyGAPI) SwitchStream() chan error {
|
||||
// Next returns nil errors every time there could be a new graph.
|
||||
func (obj *MyGAPI) Next() chan error {
|
||||
if obj.data.NoWatch || obj.Interval <= 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -78,8 +78,8 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) {
|
||||
return g, nil
|
||||
}
|
||||
|
||||
// SwitchStream returns nil errors every time there could be a new graph.
|
||||
func (obj *MyGAPI) SwitchStream() chan error {
|
||||
// Next returns nil errors every time there could be a new graph.
|
||||
func (obj *MyGAPI) Next() chan error {
|
||||
if obj.data.NoWatch || obj.Interval <= 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -86,7 +86,7 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) {
|
||||
Name: "file1",
|
||||
// send->recv!
|
||||
Recv: map[string]*resources.Send{
|
||||
"Content": &resources.Send{Res: p1, Key: "Password"},
|
||||
"Content": {Res: p1, Key: "Password"},
|
||||
},
|
||||
},
|
||||
Path: "/tmp/mgmt/secret",
|
||||
@@ -120,8 +120,8 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) {
|
||||
return g, nil
|
||||
}
|
||||
|
||||
// SwitchStream returns nil errors every time there could be a new graph.
|
||||
func (obj *MyGAPI) SwitchStream() chan error {
|
||||
// Next returns nil errors every time there could be a new graph.
|
||||
func (obj *MyGAPI) Next() chan error {
|
||||
if obj.data.NoWatch || obj.Interval <= 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -45,6 +45,6 @@ type Data struct {
|
||||
type GAPI interface {
|
||||
Init(Data) error // initializes the GAPI and passes in useful data
|
||||
Graph() (*pgraph.Graph, error) // returns the most recent pgraph
|
||||
SwitchStream() chan error // returns a stream of switch events
|
||||
Next() chan error // returns a stream of switch events
|
||||
Close() error // shutdown the GAPI
|
||||
}
|
||||
|
||||
22
lib/main.go
22
lib/main.go
@@ -23,7 +23,6 @@ import (
|
||||
"log"
|
||||
"os"
|
||||
"path"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/purpleidea/mgmt/converger"
|
||||
@@ -267,7 +266,6 @@ func (obj *Main) Run() error {
|
||||
// TODO: Import admin key
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
var G, oldGraph *pgraph.Graph
|
||||
|
||||
// exit after `max-runtime` seconds for no reason at all...
|
||||
@@ -346,7 +344,7 @@ func (obj *Main) Run() error {
|
||||
if err := obj.GAPI.Init(data); err != nil {
|
||||
obj.Exit(fmt.Errorf("Main: GAPI: Init failed: %v", err))
|
||||
} else if !obj.NoWatch {
|
||||
gapiChan = obj.GAPI.SwitchStream() // stream of graph switch events!
|
||||
gapiChan = obj.GAPI.Next() // stream of graph switch events!
|
||||
}
|
||||
}
|
||||
|
||||
@@ -412,8 +410,8 @@ func (obj *Main) Run() error {
|
||||
log.Printf("Config: Error creating new graph: %v", err)
|
||||
// unpause!
|
||||
if !first {
|
||||
G.Start(&wg, first) // sync
|
||||
converger.Start() // after G.Start()
|
||||
G.Start(first) // sync
|
||||
converger.Start() // after G.Start()
|
||||
}
|
||||
continue
|
||||
}
|
||||
@@ -440,8 +438,8 @@ func (obj *Main) Run() error {
|
||||
log.Printf("Config: Error running graph sync: %v", err)
|
||||
// unpause!
|
||||
if !first {
|
||||
G.Start(&wg, first) // sync
|
||||
converger.Start() // after G.Start()
|
||||
G.Start(first) // sync
|
||||
converger.Start() // after G.Start()
|
||||
}
|
||||
continue
|
||||
}
|
||||
@@ -466,8 +464,8 @@ func (obj *Main) Run() error {
|
||||
// some are not ready yet and the EtcdWatch
|
||||
// loops, we'll cause G.Pause(...) before we
|
||||
// even got going, thus causing nil pointer errors
|
||||
G.Start(&wg, first) // sync
|
||||
converger.Start() // after G.Start()
|
||||
G.Start(first) // sync
|
||||
converger.Start() // after G.Start()
|
||||
first = false
|
||||
}
|
||||
}()
|
||||
@@ -542,11 +540,11 @@ func (obj *Main) Run() error {
|
||||
reterr = multierr.Append(reterr, err) // list of errors
|
||||
}
|
||||
|
||||
G.Exit() // tell all the children to exit
|
||||
|
||||
// tell inner main loop to exit
|
||||
close(exitchan)
|
||||
|
||||
G.Exit() // tell all the children to exit, and waits for them to do so
|
||||
|
||||
// cleanup etcd main loop last so it can process everything first
|
||||
if err := EmbdEtcd.Destroy(); err != nil { // shutdown and cleanup etcd
|
||||
err = errwrap.Wrapf(err, "Etcd exited poorly!")
|
||||
@@ -557,8 +555,6 @@ func (obj *Main) Run() error {
|
||||
log.Printf("Main: Graph: %v", G)
|
||||
}
|
||||
|
||||
wg.Wait() // wait for primary go routines to exit
|
||||
|
||||
// TODO: wait for each vertex to exit...
|
||||
log.Println("Goodbye!")
|
||||
return reterr
|
||||
|
||||
@@ -289,6 +289,9 @@ func (g *Graph) Worker(v *Vertex) error {
|
||||
// the Watch() function about which graph it is
|
||||
// running on, which isolates things nicely...
|
||||
obj := v.Res
|
||||
// TODO: is there a better system for the `Watching` flag?
|
||||
obj.SetWatching(true)
|
||||
defer obj.SetWatching(false)
|
||||
processChan := make(chan event.Event)
|
||||
go func() {
|
||||
running := false
|
||||
@@ -415,7 +418,9 @@ func (g *Graph) Worker(v *Vertex) error {
|
||||
}
|
||||
|
||||
// TODO: reset the watch retry count after some amount of success
|
||||
v.Res.RegisterConverger()
|
||||
e := v.Res.Watch(processChan)
|
||||
v.Res.UnregisterConverger()
|
||||
if e == nil { // exit signal
|
||||
err = nil // clean exit
|
||||
break
|
||||
@@ -445,31 +450,14 @@ func (g *Graph) Worker(v *Vertex) error {
|
||||
|
||||
// Start is a main kick to start the graph. It goes through in reverse topological
|
||||
// sort order so that events can't hit un-started vertices.
|
||||
func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue
|
||||
func (g *Graph) Start(first bool) { // start or continue
|
||||
log.Printf("State: %v -> %v", g.setState(graphStateStarting), g.getState())
|
||||
defer log.Printf("State: %v -> %v", g.setState(graphStateStarted), g.getState())
|
||||
var wg sync.WaitGroup
|
||||
t, _ := g.TopologicalSort()
|
||||
// TODO: only calculate indegree if `first` is true to save resources
|
||||
indegree := g.InDegree() // compute all of the indegree's
|
||||
for _, v := range Reverse(t) {
|
||||
|
||||
if !v.Res.IsWatching() { // if Watch() is not running...
|
||||
wg.Add(1)
|
||||
// must pass in value to avoid races...
|
||||
// see: https://ttboj.wordpress.com/2015/07/27/golang-parallelism-issues-causing-too-many-open-files-error/
|
||||
go func(vv *Vertex) {
|
||||
defer wg.Done()
|
||||
// TODO: if a sufficient number of workers error,
|
||||
// should something be done? Will these restart
|
||||
// after perma-failure if we have a graph change?
|
||||
if err := g.Worker(vv); err != nil { // contains the Watch and CheckApply loops
|
||||
log.Printf("%s[%s]: Exited with failure: %v", vv.Kind(), vv.GetName(), err)
|
||||
return
|
||||
}
|
||||
log.Printf("%s[%s]: Exited", vv.Kind(), vv.GetName())
|
||||
}(v)
|
||||
}
|
||||
|
||||
// selective poke: here we reduce the number of initial pokes
|
||||
// to the minimum required to activate every vertex in the
|
||||
// graph, either by direct action, or by getting poked by a
|
||||
@@ -483,18 +471,39 @@ func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue
|
||||
// function) we need to poke to *unpause* every graph vertex,
|
||||
// and not just selectively the subset with no indegree.
|
||||
if (!first) || indegree[v] == 0 {
|
||||
// ensure state is started before continuing on to next vertex
|
||||
for !v.SendEvent(event.EventStart, true, false) {
|
||||
if g.Flags.Debug {
|
||||
// if SendEvent fails, we aren't up yet
|
||||
log.Printf("%s[%s]: Retrying SendEvent(Start)", v.Kind(), v.GetName())
|
||||
// sleep here briefly or otherwise cause
|
||||
// a different goroutine to be scheduled
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
v.Res.Starter(true) // let the startup code know to poke
|
||||
}
|
||||
|
||||
if !v.Res.IsWatching() { // if Watch() is not running...
|
||||
g.wg.Add(1)
|
||||
// must pass in value to avoid races...
|
||||
// see: https://ttboj.wordpress.com/2015/07/27/golang-parallelism-issues-causing-too-many-open-files-error/
|
||||
go func(vv *Vertex) {
|
||||
defer g.wg.Done()
|
||||
// TODO: if a sufficient number of workers error,
|
||||
// should something be done? Will these restart
|
||||
// after perma-failure if we have a graph change?
|
||||
if err := g.Worker(vv); err != nil { // contains the Watch and CheckApply loops
|
||||
log.Printf("%s[%s]: Exited with failure: %v", vv.Kind(), vv.GetName(), err)
|
||||
return
|
||||
}
|
||||
}
|
||||
log.Printf("%s[%s]: Exited", vv.Kind(), vv.GetName())
|
||||
}(v)
|
||||
}
|
||||
|
||||
// let the vertices run their startup code in parallel
|
||||
wg.Add(1)
|
||||
go func(vv *Vertex) {
|
||||
defer wg.Done()
|
||||
vv.Res.Started() // block until started
|
||||
}(v)
|
||||
|
||||
if !first { // unpause!
|
||||
v.Res.SendEvent(event.EventStart, true, false) // sync!
|
||||
}
|
||||
}
|
||||
|
||||
wg.Wait() // wait for everyone
|
||||
}
|
||||
|
||||
// Pause sends pause events to the graph in a topological sort order.
|
||||
@@ -522,4 +531,5 @@ func (g *Graph) Exit() {
|
||||
|
||||
v.SendEvent(event.EventExit, true, false)
|
||||
}
|
||||
g.wg.Wait() // for now, this doesn't need to be a separate Wait() method
|
||||
}
|
||||
|
||||
@@ -40,6 +40,7 @@ const (
|
||||
graphStatePaused
|
||||
)
|
||||
|
||||
// Flags contains specific constants used by the graph.
|
||||
type Flags struct {
|
||||
Debug bool
|
||||
}
|
||||
@@ -55,7 +56,8 @@ type Graph struct {
|
||||
Adjacency map[*Vertex]map[*Vertex]*Edge // *Vertex -> *Vertex (edge)
|
||||
Flags Flags
|
||||
state graphState
|
||||
mutex sync.Mutex // used when modifying graph State variable
|
||||
mutex *sync.Mutex // used when modifying graph State variable
|
||||
wg *sync.WaitGroup
|
||||
}
|
||||
|
||||
// Vertex is the primary vertex struct in this library.
|
||||
@@ -78,6 +80,9 @@ func NewGraph(name string) *Graph {
|
||||
Name: name,
|
||||
Adjacency: make(map[*Vertex]map[*Vertex]*Edge),
|
||||
state: graphStateNil,
|
||||
// ptr b/c: Mutex/WaitGroup must not be copied after first use
|
||||
mutex: &sync.Mutex{},
|
||||
wg: &sync.WaitGroup{},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -112,6 +117,8 @@ func (g *Graph) Copy() *Graph {
|
||||
Adjacency: make(map[*Vertex]map[*Vertex]*Edge, len(g.Adjacency)),
|
||||
Flags: g.Flags,
|
||||
state: g.state,
|
||||
mutex: g.mutex,
|
||||
wg: g.wg,
|
||||
}
|
||||
for k, v := range g.Adjacency {
|
||||
newGraph.Adjacency[k] = v // copy
|
||||
|
||||
@@ -74,8 +74,8 @@ func (obj *GAPI) Graph() (*pgraph.Graph, error) {
|
||||
return g, err
|
||||
}
|
||||
|
||||
// SwitchStream returns nil errors every time there could be a new graph.
|
||||
func (obj *GAPI) SwitchStream() chan error {
|
||||
// Next returns nil errors every time there could be a new graph.
|
||||
func (obj *GAPI) Next() chan error {
|
||||
if obj.data.NoWatch {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -32,7 +32,8 @@ import (
|
||||
const (
|
||||
// PuppetYAMLBufferSize is the maximum buffer size for the yaml input data
|
||||
PuppetYAMLBufferSize = 65535
|
||||
Debug = false // FIXME: integrate with global debug flag
|
||||
// Debug is a local debug constant used in this module
|
||||
Debug = false // FIXME: integrate with global debug flag
|
||||
)
|
||||
|
||||
func runPuppetCommand(cmd *exec.Cmd) ([]byte, error) {
|
||||
|
||||
@@ -156,11 +156,11 @@ func (obj *SSH) Sftp() error {
|
||||
var err error
|
||||
|
||||
if obj.client == nil {
|
||||
return fmt.Errorf("Not dialed!")
|
||||
return fmt.Errorf("not dialed")
|
||||
}
|
||||
// this check is needed because the golang path.Base function is weird!
|
||||
if strings.HasSuffix(obj.file, "/") {
|
||||
return fmt.Errorf("File must not be a directory.")
|
||||
return fmt.Errorf("file must not be a directory")
|
||||
}
|
||||
|
||||
// we run local operations first so that remote clean up is easier...
|
||||
@@ -254,7 +254,7 @@ func (obj *SSH) Sftp() error {
|
||||
// make file executable; don't cache this in case it didn't ever happen
|
||||
// TODO: do we want the group or other bits set?
|
||||
if err := obj.sftp.Chmod(obj.execpath, 0770); err != nil {
|
||||
return fmt.Errorf("Can't set file mode bits!")
|
||||
return fmt.Errorf("can't set file mode bits")
|
||||
}
|
||||
|
||||
// copy graph file
|
||||
@@ -273,7 +273,7 @@ func (obj *SSH) Sftp() error {
|
||||
// SftpGraphCopy is a helper function used for re-copying the graph definition.
|
||||
func (obj *SSH) SftpGraphCopy() (int64, error) {
|
||||
if obj.filepath == "" {
|
||||
return -1, fmt.Errorf("Sftp session isn't ready yet!")
|
||||
return -1, fmt.Errorf("sftp session isn't ready yet")
|
||||
}
|
||||
return obj.SftpCopy(obj.file, obj.filepath)
|
||||
}
|
||||
@@ -281,7 +281,7 @@ func (obj *SSH) SftpGraphCopy() (int64, error) {
|
||||
// SftpCopy is a simple helper function that runs a local -> remote sftp copy.
|
||||
func (obj *SSH) SftpCopy(src, dst string) (int64, error) {
|
||||
if obj.sftp == nil {
|
||||
return -1, fmt.Errorf("Sftp session is not active!")
|
||||
return -1, fmt.Errorf("sftp session is not active")
|
||||
}
|
||||
var err error
|
||||
// TODO: add a check to make sure we don't run two copies of this
|
||||
@@ -313,7 +313,7 @@ func (obj *SSH) SftpCopy(src, dst string) (int64, error) {
|
||||
return n, fmt.Errorf("Can't copy to remote path: %v", err)
|
||||
}
|
||||
if n <= 0 {
|
||||
return n, fmt.Errorf("Zero bytes copied!")
|
||||
return n, fmt.Errorf("zero bytes copied")
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
@@ -391,10 +391,10 @@ func (obj *SSH) Tunnel() error {
|
||||
var err error
|
||||
|
||||
if len(obj.clientURLs) < 1 {
|
||||
return fmt.Errorf("Need at least one client URL to tunnel!")
|
||||
return fmt.Errorf("need at least one client URL to tunnel")
|
||||
}
|
||||
if len(obj.remoteURLs) < 1 {
|
||||
return fmt.Errorf("Need at least one remote URL to tunnel!")
|
||||
return fmt.Errorf("need at least one remote URL to tunnel")
|
||||
}
|
||||
|
||||
// TODO: do something less arbitrary about which one we pick?
|
||||
@@ -477,10 +477,10 @@ func (obj *SSH) TunnelClose() error {
|
||||
// Exec runs the binary on the remote server.
|
||||
func (obj *SSH) Exec() error {
|
||||
if obj.execpath == "" {
|
||||
return fmt.Errorf("Must have a binary path to execute!")
|
||||
return fmt.Errorf("must have a binary path to execute")
|
||||
}
|
||||
if obj.filepath == "" {
|
||||
return fmt.Errorf("Must have a graph definition to run!")
|
||||
return fmt.Errorf("must have a graph definition to run")
|
||||
}
|
||||
|
||||
var err error
|
||||
@@ -772,7 +772,7 @@ func (obj *Remotes) NewSSH(file string) (*SSH, error) {
|
||||
}
|
||||
host = x[0]
|
||||
if host == "" {
|
||||
return nil, fmt.Errorf("Empty hostname!")
|
||||
return nil, fmt.Errorf("empty hostname")
|
||||
}
|
||||
|
||||
user := defaultUser // default
|
||||
@@ -795,7 +795,7 @@ func (obj *Remotes) NewSSH(file string) (*SSH, error) {
|
||||
}
|
||||
|
||||
if len(auth) == 0 {
|
||||
return nil, fmt.Errorf("No authentication methods available!")
|
||||
return nil, fmt.Errorf("no authentication methods available")
|
||||
}
|
||||
|
||||
//hostname := config.Hostname // TODO: optionally specify local hostname somehow
|
||||
@@ -804,7 +804,7 @@ func (obj *Remotes) NewSSH(file string) (*SSH, error) {
|
||||
hostname = host // default to above
|
||||
}
|
||||
if util.StrInList(hostname, obj.hostnames) {
|
||||
return nil, fmt.Errorf("Remote: Hostname `%s` already exists!", hostname)
|
||||
return nil, fmt.Errorf("Remote: Hostname `%s` already exists", hostname)
|
||||
}
|
||||
obj.hostnames = append(obj.hostnames, hostname)
|
||||
|
||||
@@ -830,7 +830,7 @@ func (obj *Remotes) NewSSH(file string) (*SSH, error) {
|
||||
// sshKeyAuth is a helper function to get the ssh key auth struct needed
|
||||
func (obj *Remotes) sshKeyAuth() (ssh.AuthMethod, error) {
|
||||
if obj.sshPrivIdRsa == "" {
|
||||
return nil, fmt.Errorf("Empty path specified!")
|
||||
return nil, fmt.Errorf("empty path specified")
|
||||
}
|
||||
p := ""
|
||||
// TODO: this doesn't match strings of the form: ~james/.ssh/id_rsa
|
||||
@@ -843,7 +843,7 @@ func (obj *Remotes) sshKeyAuth() (ssh.AuthMethod, error) {
|
||||
p = path.Join(usr.HomeDir, obj.sshPrivIdRsa[len("~/"):])
|
||||
}
|
||||
if p == "" {
|
||||
return nil, fmt.Errorf("Empty path specified!")
|
||||
return nil, fmt.Errorf("empty path specified")
|
||||
}
|
||||
// A public key may be used to authenticate against the server by using
|
||||
// an unencrypted PEM-encoded private key file. If you have an encrypted
|
||||
@@ -892,7 +892,7 @@ func (obj *Remotes) passwordCallback(user, host string) func() (string, error) {
|
||||
case e := <-failchan:
|
||||
return "", e
|
||||
case <-util.TimeAfterOrBlock(timeout):
|
||||
return "", fmt.Errorf("Interactive timeout reached!")
|
||||
return "", fmt.Errorf("interactive timeout reached")
|
||||
}
|
||||
}
|
||||
return cb
|
||||
|
||||
@@ -25,7 +25,6 @@ import (
|
||||
"log"
|
||||
"os/exec"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/purpleidea/mgmt/event"
|
||||
"github.com/purpleidea/mgmt/util"
|
||||
@@ -111,22 +110,7 @@ func (obj *ExecRes) BufioChanScanner(scanner *bufio.Scanner) (chan string, chan
|
||||
|
||||
// Watch is the primary listener for this resource and it outputs events.
|
||||
func (obj *ExecRes) Watch(processChan chan event.Event) error {
|
||||
if obj.IsWatching() {
|
||||
return nil
|
||||
}
|
||||
obj.SetWatching(true)
|
||||
defer obj.SetWatching(false)
|
||||
cuid := obj.converger.Register()
|
||||
defer cuid.Unregister()
|
||||
|
||||
var startup bool
|
||||
Startup := func(block bool) <-chan time.Time {
|
||||
if block {
|
||||
return nil // blocks forever
|
||||
//return make(chan time.Time) // blocks forever
|
||||
}
|
||||
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
|
||||
}
|
||||
cuid := obj.Converger() // get the converger uid used to report status
|
||||
|
||||
var send = false // send event?
|
||||
var exit = false
|
||||
@@ -169,6 +153,11 @@ func (obj *ExecRes) Watch(processChan chan event.Event) error {
|
||||
bufioch, errch = obj.BufioChanScanner(scanner)
|
||||
}
|
||||
|
||||
// notify engine that we're running
|
||||
if err := obj.Running(processChan); err != nil {
|
||||
return err // bubble up a NACK...
|
||||
}
|
||||
|
||||
for {
|
||||
obj.SetState(ResStateWatching) // reset
|
||||
select {
|
||||
@@ -199,15 +188,10 @@ func (obj *ExecRes) Watch(processChan chan event.Event) error {
|
||||
case <-cuid.ConvergedTimer():
|
||||
cuid.SetConverged(true) // converged!
|
||||
continue
|
||||
|
||||
case <-Startup(startup):
|
||||
cuid.SetConverged(false)
|
||||
send = true
|
||||
}
|
||||
|
||||
// do all our event sending all together to avoid duplicate msgs
|
||||
if send {
|
||||
startup = true // startup finished
|
||||
send = false
|
||||
// it is okay to invalidate the clean state on poke too
|
||||
obj.StateOK(false) // something made state dirty
|
||||
|
||||
@@ -30,7 +30,6 @@ import (
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/purpleidea/mgmt/event"
|
||||
"github.com/purpleidea/mgmt/recwatch"
|
||||
@@ -142,22 +141,7 @@ func (obj *FileRes) Validate() error {
|
||||
// must be restarted. On a clean exit it returns nil.
|
||||
// FIXME: Also watch the source directory when using obj.Source !!!
|
||||
func (obj *FileRes) Watch(processChan chan event.Event) error {
|
||||
if obj.IsWatching() {
|
||||
return nil // TODO: should this be an error?
|
||||
}
|
||||
obj.SetWatching(true)
|
||||
defer obj.SetWatching(false)
|
||||
cuid := obj.converger.Register()
|
||||
defer cuid.Unregister()
|
||||
|
||||
var startup bool
|
||||
Startup := func(block bool) <-chan time.Time {
|
||||
if block {
|
||||
return nil // blocks forever
|
||||
//return make(chan time.Time) // blocks forever
|
||||
}
|
||||
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
|
||||
}
|
||||
cuid := obj.Converger() // get the converger uid used to report status
|
||||
|
||||
var err error
|
||||
obj.recWatcher, err = recwatch.NewRecWatcher(obj.Path, obj.Recurse)
|
||||
@@ -166,6 +150,11 @@ func (obj *FileRes) Watch(processChan chan event.Event) error {
|
||||
}
|
||||
defer obj.recWatcher.Close()
|
||||
|
||||
// notify engine that we're running
|
||||
if err := obj.Running(processChan); err != nil {
|
||||
return err // bubble up a NACK...
|
||||
}
|
||||
|
||||
var send = false // send event?
|
||||
var exit = false
|
||||
|
||||
@@ -200,16 +189,10 @@ func (obj *FileRes) Watch(processChan chan event.Event) error {
|
||||
case <-cuid.ConvergedTimer():
|
||||
cuid.SetConverged(true) // converged!
|
||||
continue
|
||||
|
||||
case <-Startup(startup):
|
||||
cuid.SetConverged(false)
|
||||
send = true
|
||||
obj.StateOK(false) // dirty
|
||||
}
|
||||
|
||||
// do all our event sending all together to avoid duplicate msgs
|
||||
if send {
|
||||
startup = true // startup finished
|
||||
send = false
|
||||
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
|
||||
return err // we exit or bubble up a NACK...
|
||||
@@ -787,7 +770,7 @@ func (obj *FileRes) Compare(res Res) bool {
|
||||
if obj.Name != res.Name {
|
||||
return false
|
||||
}
|
||||
if obj.path != res.Path {
|
||||
if obj.path != res.path {
|
||||
return false
|
||||
}
|
||||
if (obj.Content == nil) != (res.Content == nil) { // xor
|
||||
|
||||
@@ -22,7 +22,6 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/purpleidea/mgmt/event"
|
||||
"github.com/purpleidea/mgmt/util"
|
||||
@@ -109,22 +108,7 @@ func (obj *HostnameRes) Validate() error {
|
||||
|
||||
// Watch is the primary listener for this resource and it outputs events.
|
||||
func (obj *HostnameRes) Watch(processChan chan event.Event) error {
|
||||
if obj.IsWatching() {
|
||||
return nil // TODO: should this be an error?
|
||||
}
|
||||
obj.SetWatching(true)
|
||||
defer obj.SetWatching(false)
|
||||
cuid := obj.converger.Register()
|
||||
defer cuid.Unregister()
|
||||
|
||||
var startup bool
|
||||
Startup := func(block bool) <-chan time.Time {
|
||||
if block {
|
||||
return nil // blocks forever
|
||||
//return make(chan time.Time) // blocks forever
|
||||
}
|
||||
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
|
||||
}
|
||||
cuid := obj.Converger() // get the converger uid used to report status
|
||||
|
||||
// if we share the bus with others, we will get each others messages!!
|
||||
bus, err := util.SystemBusPrivateUsable() // don't share the bus connection!
|
||||
@@ -142,6 +126,11 @@ func (obj *HostnameRes) Watch(processChan chan event.Event) error {
|
||||
signals := make(chan *dbus.Signal, 10) // closed by dbus package
|
||||
bus.Signal(signals)
|
||||
|
||||
// notify engine that we're running
|
||||
if err := obj.Running(processChan); err != nil {
|
||||
return err // bubble up a NACK...
|
||||
}
|
||||
|
||||
var send = false // send event?
|
||||
|
||||
for {
|
||||
@@ -164,15 +153,10 @@ func (obj *HostnameRes) Watch(processChan chan event.Event) error {
|
||||
case <-cuid.ConvergedTimer():
|
||||
cuid.SetConverged(true) // converged!
|
||||
continue
|
||||
|
||||
case <-Startup(startup):
|
||||
cuid.SetConverged(false)
|
||||
send = true
|
||||
}
|
||||
|
||||
// do all our event sending all together to avoid duplicate msgs
|
||||
if send {
|
||||
startup = true // startup finished
|
||||
send = false
|
||||
|
||||
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
|
||||
|
||||
@@ -23,7 +23,6 @@ import (
|
||||
"log"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/purpleidea/mgmt/event"
|
||||
|
||||
@@ -136,21 +135,11 @@ func (obj *MsgRes) journalPriority() journal.Priority {
|
||||
|
||||
// Watch is the primary listener for this resource and it outputs events.
|
||||
func (obj *MsgRes) Watch(processChan chan event.Event) error {
|
||||
if obj.IsWatching() {
|
||||
return nil
|
||||
}
|
||||
obj.SetWatching(true)
|
||||
defer obj.SetWatching(false)
|
||||
cuid := obj.converger.Register()
|
||||
defer cuid.Unregister()
|
||||
cuid := obj.Converger() // get the converger uid used to report status
|
||||
|
||||
var startup bool
|
||||
Startup := func(block bool) <-chan time.Time {
|
||||
if block {
|
||||
return nil // blocks forever
|
||||
//return make(chan time.Time) // blocks forever
|
||||
}
|
||||
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
|
||||
// notify engine that we're running
|
||||
if err := obj.Running(processChan); err != nil {
|
||||
return err // bubble up a NACK...
|
||||
}
|
||||
|
||||
var send = false // send event?
|
||||
@@ -168,18 +157,11 @@ func (obj *MsgRes) Watch(processChan chan event.Event) error {
|
||||
case <-cuid.ConvergedTimer():
|
||||
cuid.SetConverged(true) // converged!
|
||||
continue
|
||||
|
||||
case <-Startup(startup):
|
||||
cuid.SetConverged(false)
|
||||
send = true
|
||||
}
|
||||
|
||||
// do all our event sending all together to avoid duplicate msgs
|
||||
if send {
|
||||
startup = true // startup finished
|
||||
send = false
|
||||
// only do this on certain types of events
|
||||
//obj.isStateOK = false // something made state dirty
|
||||
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
|
||||
return err // we exit or bubble up a NACK...
|
||||
}
|
||||
|
||||
@@ -20,7 +20,6 @@ package resources
|
||||
import (
|
||||
"encoding/gob"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/purpleidea/mgmt/event"
|
||||
)
|
||||
@@ -60,21 +59,11 @@ func (obj *NoopRes) Validate() error {
|
||||
|
||||
// Watch is the primary listener for this resource and it outputs events.
|
||||
func (obj *NoopRes) Watch(processChan chan event.Event) error {
|
||||
if obj.IsWatching() {
|
||||
return nil // TODO: should this be an error?
|
||||
}
|
||||
obj.SetWatching(true)
|
||||
defer obj.SetWatching(false)
|
||||
cuid := obj.converger.Register()
|
||||
defer cuid.Unregister()
|
||||
cuid := obj.Converger() // get the converger uid used to report status
|
||||
|
||||
var startup bool
|
||||
Startup := func(block bool) <-chan time.Time {
|
||||
if block {
|
||||
return nil // blocks forever
|
||||
//return make(chan time.Time) // blocks forever
|
||||
}
|
||||
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
|
||||
// notify engine that we're running
|
||||
if err := obj.Running(processChan); err != nil {
|
||||
return err // bubble up a NACK...
|
||||
}
|
||||
|
||||
var send = false // send event?
|
||||
@@ -92,15 +81,10 @@ func (obj *NoopRes) Watch(processChan chan event.Event) error {
|
||||
case <-cuid.ConvergedTimer():
|
||||
cuid.SetConverged(true) // converged!
|
||||
continue
|
||||
|
||||
case <-Startup(startup):
|
||||
cuid.SetConverged(false)
|
||||
send = true
|
||||
}
|
||||
|
||||
// do all our event sending all together to avoid duplicate msgs
|
||||
if send {
|
||||
startup = true // startup finished
|
||||
send = false
|
||||
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
|
||||
return err // we exit or bubble up a NACK...
|
||||
|
||||
@@ -22,7 +22,6 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/purpleidea/mgmt/event"
|
||||
"github.com/purpleidea/mgmt/util"
|
||||
@@ -96,22 +95,7 @@ func (obj *NspawnRes) Validate() error {
|
||||
|
||||
// Watch for state changes and sends a message to the bus if there is a change
|
||||
func (obj *NspawnRes) Watch(processChan chan event.Event) error {
|
||||
if obj.IsWatching() {
|
||||
return nil
|
||||
}
|
||||
obj.SetWatching(true)
|
||||
defer obj.SetWatching(false)
|
||||
cuid := obj.converger.Register()
|
||||
defer cuid.Unregister()
|
||||
|
||||
var startup bool
|
||||
Startup := func(block bool) <-chan time.Time {
|
||||
if block {
|
||||
return nil // blocks forever
|
||||
}
|
||||
// 1/2 the resolution of converged timeout
|
||||
return time.After(time.Duration(500) * time.Millisecond)
|
||||
}
|
||||
cuid := obj.Converger() // get the converger uid used to report status
|
||||
|
||||
// this resource depends on systemd ensure that it's running
|
||||
if !systemdUtil.IsRunningSystemd() {
|
||||
@@ -135,6 +119,11 @@ func (obj *NspawnRes) Watch(processChan chan event.Event) error {
|
||||
buschan := make(chan *dbus.Signal, 10)
|
||||
bus.Signal(buschan)
|
||||
|
||||
// notify engine that we're running
|
||||
if err := obj.Running(processChan); err != nil {
|
||||
return err // bubble up a NACK...
|
||||
}
|
||||
|
||||
var send = false
|
||||
var exit = false
|
||||
|
||||
@@ -165,16 +154,10 @@ func (obj *NspawnRes) Watch(processChan chan event.Event) error {
|
||||
case <-cuid.ConvergedTimer():
|
||||
cuid.SetConverged(true) // converged!
|
||||
continue
|
||||
|
||||
case <-Startup(startup):
|
||||
cuid.SetConverged(false)
|
||||
send = true
|
||||
obj.StateOK(false) // dirty
|
||||
}
|
||||
|
||||
// do all our event sending all together to avoid duplicate msgs
|
||||
if send {
|
||||
startup = true // startup finished
|
||||
send = false
|
||||
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
|
||||
return err // we exit or bubble up a NACK...
|
||||
|
||||
@@ -27,7 +27,6 @@ import (
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/purpleidea/mgmt/event"
|
||||
"github.com/purpleidea/mgmt/recwatch"
|
||||
@@ -169,22 +168,7 @@ Loop:
|
||||
|
||||
// Watch is the primary listener for this resource and it outputs events.
|
||||
func (obj *PasswordRes) Watch(processChan chan event.Event) error {
|
||||
if obj.IsWatching() {
|
||||
return nil // TODO: should this be an error?
|
||||
}
|
||||
obj.SetWatching(true)
|
||||
defer obj.SetWatching(false)
|
||||
cuid := obj.converger.Register()
|
||||
defer cuid.Unregister()
|
||||
|
||||
var startup bool
|
||||
Startup := func(block bool) <-chan time.Time {
|
||||
if block {
|
||||
return nil // blocks forever
|
||||
//return make(chan time.Time) // blocks forever
|
||||
}
|
||||
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
|
||||
}
|
||||
cuid := obj.Converger() // get the converger uid used to report status
|
||||
|
||||
var err error
|
||||
obj.recWatcher, err = recwatch.NewRecWatcher(obj.path, false)
|
||||
@@ -193,6 +177,11 @@ func (obj *PasswordRes) Watch(processChan chan event.Event) error {
|
||||
}
|
||||
defer obj.recWatcher.Close()
|
||||
|
||||
// notify engine that we're running
|
||||
if err := obj.Running(processChan); err != nil {
|
||||
return err // bubble up a NACK...
|
||||
}
|
||||
|
||||
var send = false // send event?
|
||||
var exit = false
|
||||
for {
|
||||
@@ -220,15 +209,10 @@ func (obj *PasswordRes) Watch(processChan chan event.Event) error {
|
||||
case <-cuid.ConvergedTimer():
|
||||
cuid.SetConverged(true) // converged!
|
||||
continue
|
||||
|
||||
case <-Startup(startup):
|
||||
cuid.SetConverged(false)
|
||||
send = true
|
||||
}
|
||||
|
||||
// do all our event sending all together to avoid duplicate msgs
|
||||
if send {
|
||||
startup = true // startup finished
|
||||
send = false
|
||||
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
|
||||
return err // we exit or bubble up a NACK...
|
||||
|
||||
@@ -23,7 +23,6 @@ import (
|
||||
"log"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/purpleidea/mgmt/event"
|
||||
"github.com/purpleidea/mgmt/resources/packagekit"
|
||||
@@ -110,22 +109,7 @@ func (obj *PkgRes) Validate() error {
|
||||
// TODO: https://github.com/hughsie/PackageKit/issues/109
|
||||
// TODO: https://github.com/hughsie/PackageKit/issues/110
|
||||
func (obj *PkgRes) Watch(processChan chan event.Event) error {
|
||||
if obj.IsWatching() {
|
||||
return nil
|
||||
}
|
||||
obj.SetWatching(true)
|
||||
defer obj.SetWatching(false)
|
||||
cuid := obj.converger.Register()
|
||||
defer cuid.Unregister()
|
||||
|
||||
var startup bool
|
||||
Startup := func(block bool) <-chan time.Time {
|
||||
if block {
|
||||
return nil // blocks forever
|
||||
//return make(chan time.Time) // blocks forever
|
||||
}
|
||||
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
|
||||
}
|
||||
cuid := obj.Converger() // get the converger uid used to report status
|
||||
|
||||
bus := packagekit.NewBus()
|
||||
if bus == nil {
|
||||
@@ -138,6 +122,11 @@ func (obj *PkgRes) Watch(processChan chan event.Event) error {
|
||||
return errwrap.Wrapf(err, "Error adding signal match")
|
||||
}
|
||||
|
||||
// notify engine that we're running
|
||||
if err := obj.Running(processChan); err != nil {
|
||||
return err // bubble up a NACK...
|
||||
}
|
||||
|
||||
var send = false // send event?
|
||||
var exit = false
|
||||
|
||||
@@ -175,16 +164,10 @@ func (obj *PkgRes) Watch(processChan chan event.Event) error {
|
||||
case <-cuid.ConvergedTimer():
|
||||
cuid.SetConverged(true) // converged!
|
||||
continue
|
||||
|
||||
case <-Startup(startup):
|
||||
cuid.SetConverged(false)
|
||||
send = true
|
||||
obj.StateOK(false) // dirty
|
||||
}
|
||||
|
||||
// do all our event sending all together to avoid duplicate msgs
|
||||
if send {
|
||||
startup = true // startup finished
|
||||
send = false
|
||||
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
|
||||
return err // we exit or bubble up a NACK...
|
||||
|
||||
@@ -130,6 +130,9 @@ type Base interface {
|
||||
AssociateData(*Data)
|
||||
IsWatching() bool
|
||||
SetWatching(bool)
|
||||
RegisterConverger()
|
||||
UnregisterConverger()
|
||||
Converger() converger.ConvergerUID
|
||||
GetState() ResState
|
||||
SetState(ResState)
|
||||
DoSend(chan event.Event, string) (bool, error)
|
||||
@@ -147,6 +150,9 @@ type Base interface {
|
||||
GetGroup() []Res // return everyone grouped inside me
|
||||
SetGroup([]Res)
|
||||
VarDir(string) (string, error)
|
||||
Running(chan event.Event) error // notify the engine that Watch started
|
||||
Started() <-chan struct{} // returns when the resource has started
|
||||
Starter(bool)
|
||||
}
|
||||
|
||||
// Res is the minimum interface you need to implement to define a new resource.
|
||||
@@ -171,14 +177,17 @@ type BaseRes struct {
|
||||
kind string
|
||||
events chan event.Event
|
||||
converger converger.Converger // converged tracking
|
||||
prefix string // base prefix for this resource
|
||||
cuid converger.ConvergerUID
|
||||
prefix string // base prefix for this resource
|
||||
debug bool
|
||||
state ResState
|
||||
watching bool // is Watch() loop running ?
|
||||
isStateOK bool // whether the state is okay based on events or not
|
||||
isGrouped bool // am i contained within a group?
|
||||
grouped []Res // list of any grouped resources
|
||||
refresh bool // does this resource have a refresh to run?
|
||||
watching bool // is Watch() loop running ?
|
||||
started chan struct{} // closed when worker is started/running
|
||||
starter bool // does this have indegree == 0 ? XXX: usually?
|
||||
isStateOK bool // whether the state is okay based on events or not
|
||||
isGrouped bool // am i contained within a group?
|
||||
grouped []Res // list of any grouped resources
|
||||
refresh bool // does this resource have a refresh to run?
|
||||
//refreshState StatefulBool // TODO: future stateful bool
|
||||
}
|
||||
|
||||
@@ -229,6 +238,7 @@ func (obj *BaseRes) Init() error {
|
||||
return fmt.Errorf("Resource did not set kind!")
|
||||
}
|
||||
obj.events = make(chan event.Event) // unbuffered chan to avoid stale events
|
||||
obj.started = make(chan struct{}) // closes when started
|
||||
//dir, err := obj.VarDir("")
|
||||
//if err != nil {
|
||||
// return errwrap.Wrapf(err, "VarDir failed in Init()")
|
||||
@@ -275,16 +285,34 @@ func (obj *BaseRes) AssociateData(data *Data) {
|
||||
obj.debug = data.Debug
|
||||
}
|
||||
|
||||
// IsWatching tells us if the Watch() function is running.
|
||||
// IsWatching tells us if the Worker() function is running.
|
||||
func (obj *BaseRes) IsWatching() bool {
|
||||
return obj.watching
|
||||
}
|
||||
|
||||
// SetWatching stores the status of if the Watch() function is running.
|
||||
// SetWatching stores the status of if the Worker() function is running.
|
||||
func (obj *BaseRes) SetWatching(b bool) {
|
||||
obj.watching = b
|
||||
}
|
||||
|
||||
// RegisterConverger sets up the cuid for the resource. This is a helper
|
||||
// function for the engine, and shouldn't be called by the resources directly.
|
||||
func (obj *BaseRes) RegisterConverger() {
|
||||
obj.cuid = obj.converger.Register()
|
||||
}
|
||||
|
||||
// UnregisterConverger tears down the cuid for the resource. This is a helper
|
||||
// function for the engine, and shouldn't be called by the resources directly.
|
||||
func (obj *BaseRes) UnregisterConverger() {
|
||||
obj.cuid.Unregister()
|
||||
}
|
||||
|
||||
// Converger returns the ConvergerUID for the resource. This should be called
|
||||
// by the Watch method of the resource to set the converged state.
|
||||
func (obj *BaseRes) Converger() converger.ConvergerUID {
|
||||
return obj.cuid
|
||||
}
|
||||
|
||||
// GetState returns the state of the resource.
|
||||
func (obj *BaseRes) GetState() ResState {
|
||||
return obj.state
|
||||
@@ -403,6 +431,13 @@ func (obj *BaseRes) VarDir(extra string) (string, error) {
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// Started returns a channel that closes when the resource has started up.
|
||||
func (obj *BaseRes) Started() <-chan struct{} { return obj.started }
|
||||
|
||||
// Starter sets the starter bool. This defines if a vertex has an indegree of 0.
|
||||
// If we have an indegree of 0, we'll need to be a poke initiator in the graph.
|
||||
func (obj *BaseRes) Starter(b bool) { obj.starter = b }
|
||||
|
||||
// ResToB64 encodes a resource to a base64 encoded string (after serialization)
|
||||
func ResToB64(res Res) (string, error) {
|
||||
b := bytes.Buffer{}
|
||||
|
||||
@@ -60,8 +60,8 @@ func (obj *BaseRes) ReadEvent(ev *event.Event) (exit, send bool) {
|
||||
var poke bool
|
||||
// ensure that a CheckApply runs by sending with a dirty state...
|
||||
if ev.GetActivity() { // if previous node did work, and we were notified...
|
||||
obj.StateOK(false) // dirty
|
||||
poke = true // poke!
|
||||
//obj.StateOK(false) // not necessarily
|
||||
poke = true // poke!
|
||||
// XXX: this should be elsewhere in case Watch isn't used (eg: Polling instead...)
|
||||
// XXX: unless this is used in our "fallback" polling implementation???
|
||||
//obj.SetRefresh(true) // TODO: is this redundant?
|
||||
@@ -108,6 +108,23 @@ func (obj *BaseRes) ReadEvent(ev *event.Event) (exit, send bool) {
|
||||
return true, false // required to keep the stupid go compiler happy
|
||||
}
|
||||
|
||||
// Running is called by the Watch method of the resource once it has started up.
|
||||
// This signals to the engine to kick off the initial CheckApply resource check.
|
||||
func (obj *BaseRes) Running(processChan chan event.Event) error {
|
||||
obj.StateOK(false) // assume we're initially dirty
|
||||
cuid := obj.Converger() // get the converger uid used to report status
|
||||
cuid.SetConverged(false) // a reasonable initial assumption
|
||||
close(obj.started) // send started signal
|
||||
|
||||
// FIXME: exit return value is unused atm, so ignore it for now...
|
||||
//if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
|
||||
var err error
|
||||
if obj.starter { // vertices of indegree == 0 should send initial pokes
|
||||
_, err = obj.DoSend(processChan, "") // trigger a CheckApply
|
||||
}
|
||||
return err // bubble up any possible error (or nil)
|
||||
}
|
||||
|
||||
// Send points to a value that a resource will send.
|
||||
type Send struct {
|
||||
Res Res // a handle to the resource which is sending a value
|
||||
|
||||
@@ -23,7 +23,6 @@ import (
|
||||
"encoding/gob"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/purpleidea/mgmt/event"
|
||||
"github.com/purpleidea/mgmt/util"
|
||||
@@ -76,22 +75,7 @@ func (obj *SvcRes) Validate() error {
|
||||
|
||||
// Watch is the primary listener for this resource and it outputs events.
|
||||
func (obj *SvcRes) Watch(processChan chan event.Event) error {
|
||||
if obj.IsWatching() {
|
||||
return nil
|
||||
}
|
||||
obj.SetWatching(true)
|
||||
defer obj.SetWatching(false)
|
||||
cuid := obj.converger.Register()
|
||||
defer cuid.Unregister()
|
||||
|
||||
var startup bool
|
||||
Startup := func(block bool) <-chan time.Time {
|
||||
if block {
|
||||
return nil // blocks forever
|
||||
//return make(chan time.Time) // blocks forever
|
||||
}
|
||||
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
|
||||
}
|
||||
cuid := obj.Converger() // get the converger uid used to report status
|
||||
|
||||
// obj.Name: svc name
|
||||
if !systemdUtil.IsRunningSystemd() {
|
||||
@@ -116,6 +100,11 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error {
|
||||
buschan := make(chan *dbus.Signal, 10)
|
||||
bus.Signal(buschan)
|
||||
|
||||
// notify engine that we're running
|
||||
if err := obj.Running(processChan); err != nil {
|
||||
return err // bubble up a NACK...
|
||||
}
|
||||
|
||||
var svc = fmt.Sprintf("%s.service", obj.Name) // systemd name
|
||||
var send = false // send event?
|
||||
var exit = false
|
||||
@@ -175,11 +164,6 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error {
|
||||
case <-cuid.ConvergedTimer():
|
||||
cuid.SetConverged(true) // converged!
|
||||
continue
|
||||
|
||||
case <-Startup(startup):
|
||||
cuid.SetConverged(false)
|
||||
send = true
|
||||
obj.StateOK(false) // dirty
|
||||
}
|
||||
} else {
|
||||
if !activeSet {
|
||||
@@ -227,16 +211,10 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error {
|
||||
case <-cuid.ConvergedTimer():
|
||||
cuid.SetConverged(true) // converged!
|
||||
continue
|
||||
|
||||
case <-Startup(startup):
|
||||
cuid.SetConverged(false)
|
||||
send = true
|
||||
obj.StateOK(false) // dirty
|
||||
}
|
||||
}
|
||||
|
||||
if send {
|
||||
startup = true // startup finished
|
||||
send = false
|
||||
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
|
||||
return err // we exit or bubble up a NACK...
|
||||
|
||||
@@ -74,27 +74,17 @@ func (obj *TimerRes) newTicker() *time.Ticker {
|
||||
|
||||
// Watch is the primary listener for this resource and it outputs events.
|
||||
func (obj *TimerRes) Watch(processChan chan event.Event) error {
|
||||
if obj.IsWatching() {
|
||||
return nil
|
||||
}
|
||||
obj.SetWatching(true)
|
||||
defer obj.SetWatching(false)
|
||||
cuid := obj.converger.Register()
|
||||
defer cuid.Unregister()
|
||||
|
||||
var startup bool
|
||||
Startup := func(block bool) <-chan time.Time {
|
||||
if block {
|
||||
return nil // blocks forever
|
||||
//return make(chan time.Time) // blocks forever
|
||||
}
|
||||
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
|
||||
}
|
||||
cuid := obj.Converger() // get the converger uid used to report status
|
||||
|
||||
// create a time.Ticker for the given interval
|
||||
obj.ticker = obj.newTicker()
|
||||
defer obj.ticker.Stop()
|
||||
|
||||
// notify engine that we're running
|
||||
if err := obj.Running(processChan); err != nil {
|
||||
return err // bubble up a NACK...
|
||||
}
|
||||
|
||||
var send = false
|
||||
|
||||
for {
|
||||
@@ -113,13 +103,9 @@ func (obj *TimerRes) Watch(processChan chan event.Event) error {
|
||||
case <-cuid.ConvergedTimer():
|
||||
cuid.SetConverged(true)
|
||||
continue
|
||||
|
||||
case <-Startup(startup):
|
||||
cuid.SetConverged(false)
|
||||
send = true
|
||||
}
|
||||
|
||||
if send {
|
||||
startup = true // startup finished
|
||||
send = false
|
||||
if exit, err := obj.DoSend(processChan, "timer ticked"); exit || err != nil {
|
||||
return err // we exit or bubble up a NACK...
|
||||
|
||||
@@ -132,22 +132,7 @@ func (obj *VirtRes) connect() (conn libvirt.VirConnection, err error) {
|
||||
|
||||
// Watch is the primary listener for this resource and it outputs events.
|
||||
func (obj *VirtRes) Watch(processChan chan event.Event) error {
|
||||
if obj.IsWatching() {
|
||||
return nil
|
||||
}
|
||||
obj.SetWatching(true)
|
||||
defer obj.SetWatching(false)
|
||||
cuid := obj.converger.Register()
|
||||
defer cuid.Unregister()
|
||||
|
||||
var startup bool
|
||||
Startup := func(block bool) <-chan time.Time {
|
||||
if block {
|
||||
return nil // blocks forever
|
||||
//return make(chan time.Time) // blocks forever
|
||||
}
|
||||
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
|
||||
}
|
||||
cuid := obj.Converger() // get the converger uid used to report status
|
||||
|
||||
conn, err := obj.connect()
|
||||
if err != nil {
|
||||
@@ -203,6 +188,11 @@ func (obj *VirtRes) Watch(processChan chan event.Event) error {
|
||||
)
|
||||
defer conn.DomainEventDeregister(callbackID)
|
||||
|
||||
// notify engine that we're running
|
||||
if err := obj.Running(processChan); err != nil {
|
||||
return err // bubble up a NACK...
|
||||
}
|
||||
|
||||
var send = false
|
||||
var exit = false
|
||||
|
||||
@@ -260,15 +250,9 @@ func (obj *VirtRes) Watch(processChan chan event.Event) error {
|
||||
case <-cuid.ConvergedTimer():
|
||||
cuid.SetConverged(true) // converged!
|
||||
continue
|
||||
|
||||
case <-Startup(startup):
|
||||
cuid.SetConverged(false)
|
||||
send = true
|
||||
obj.StateOK(false) // dirty
|
||||
}
|
||||
|
||||
if send {
|
||||
startup = true // startup finished
|
||||
send = false
|
||||
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
|
||||
return err // we exit or bubble up a NACK...
|
||||
|
||||
@@ -9,7 +9,7 @@ ROOT=$(dirname "${BASH_SOURCE}")/..
|
||||
|
||||
GO_VERSION=($(go version))
|
||||
|
||||
if [[ -z $(echo "${GO_VERSION[2]}" | grep -E 'go1.2|go1.3|go1.4|go1.5|go1.6|go1.7|go1.8') ]]; then
|
||||
if [[ -z $(echo "${GO_VERSION[2]}" | grep -E 'go1.2|go1.3|go1.4|go1.5|go1.6|go1.7|go1.8|devel') ]]; then
|
||||
echo "Unknown go version '${GO_VERSION[2]}', failing gofmt."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
@@ -74,8 +74,8 @@ func (obj *GAPI) Graph() (*pgraph.Graph, error) {
|
||||
return g, err
|
||||
}
|
||||
|
||||
// SwitchStream returns nil errors every time there could be a new graph.
|
||||
func (obj *GAPI) SwitchStream() chan error {
|
||||
// Next returns nil errors every time there could be a new graph.
|
||||
func (obj *GAPI) Next() chan error {
|
||||
if obj.data.NoWatch {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -34,10 +34,6 @@ import (
|
||||
"gopkg.in/yaml.v2"
|
||||
)
|
||||
|
||||
const (
|
||||
Debug = false // FIXME: integrate with global debug flag
|
||||
)
|
||||
|
||||
type collectorResConfig struct {
|
||||
Kind string `yaml:"kind"`
|
||||
Pattern string `yaml:"pattern"` // XXX: Not Implemented
|
||||
@@ -120,9 +116,6 @@ func (c *GraphConfig) NewGraphFromConfig(hostname string, world gapi.World, noop
|
||||
slice := reflect.ValueOf(iface)
|
||||
// XXX: should we just drop these everywhere and have the kind strings be all lowercase?
|
||||
kind := util.FirstToUpper(name)
|
||||
if Debug {
|
||||
log.Printf("Config: Processing: %v...", kind)
|
||||
}
|
||||
for j := 0; j < slice.Len(); j++ { // loop through resources of same kind
|
||||
x := slice.Index(j).Interface()
|
||||
res, ok := x.(resources.Res) // convert to Res type
|
||||
|
||||
Reference in New Issue
Block a user