Compare commits
15 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
19760be0bc | ||
|
|
b3ea33f88d | ||
|
|
5b3425a689 | ||
|
|
a3d157bde6 | ||
|
|
2c8c9264a4 | ||
|
|
0009d9b20e | ||
|
|
dd8d17232f | ||
|
|
6312b9225f | ||
|
|
68cc09fef2 | ||
|
|
0651c9de65 | ||
|
|
38261ec809 | ||
|
|
067932aebf | ||
|
|
af47511d58 | ||
|
|
36b916f27f | ||
|
|
e519811893 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -2,7 +2,6 @@
|
|||||||
.omv/
|
.omv/
|
||||||
.ssh/
|
.ssh/
|
||||||
.vagrant/
|
.vagrant/
|
||||||
mgmt-documentation.pdf
|
|
||||||
old/
|
old/
|
||||||
tmp/
|
tmp/
|
||||||
*_stringer.go
|
*_stringer.go
|
||||||
|
|||||||
4
Makefile
4
Makefile
@@ -138,8 +138,8 @@ format: gofmt yamlfmt
|
|||||||
|
|
||||||
docs: $(PROGRAM)-documentation.pdf
|
docs: $(PROGRAM)-documentation.pdf
|
||||||
|
|
||||||
$(PROGRAM)-documentation.pdf: DOCUMENTATION.md
|
$(PROGRAM)-documentation.pdf: docs/documentation.md
|
||||||
pandoc DOCUMENTATION.md -o '$(PROGRAM)-documentation.pdf'
|
pandoc docs/documentation.md -o docs/'$(PROGRAM)-documentation.pdf'
|
||||||
|
|
||||||
#
|
#
|
||||||
# build aliases
|
# build aliases
|
||||||
|
|||||||
@@ -4,7 +4,7 @@
|
|||||||
|
|
||||||
[](https://goreportcard.com/report/github.com/purpleidea/mgmt)
|
[](https://goreportcard.com/report/github.com/purpleidea/mgmt)
|
||||||
[](http://travis-ci.org/purpleidea/mgmt)
|
[](http://travis-ci.org/purpleidea/mgmt)
|
||||||
[](DOCUMENTATION.md)
|
[](docs/documentation.md)
|
||||||
[](https://godoc.org/github.com/purpleidea/mgmt)
|
[](https://godoc.org/github.com/purpleidea/mgmt)
|
||||||
[](https://webchat.freenode.net/?channels=#mgmtconfig)
|
[](https://webchat.freenode.net/?channels=#mgmtconfig)
|
||||||
[](https://ci.centos.org/job/purpleidea-mgmt/)
|
[](https://ci.centos.org/job/purpleidea-mgmt/)
|
||||||
@@ -23,7 +23,7 @@ With your help you'll be able to influence our design and get us there sooner!
|
|||||||
|
|
||||||
## Questions:
|
## Questions:
|
||||||
Please join the [#mgmtconfig](https://webchat.freenode.net/?channels=#mgmtconfig) IRC community!
|
Please join the [#mgmtconfig](https://webchat.freenode.net/?channels=#mgmtconfig) IRC community!
|
||||||
If you have a well phrased question that might benefit others, consider asking it by sending a patch to the documentation [FAQ](https://github.com/purpleidea/mgmt/blob/master/DOCUMENTATION.md#usage-and-frequently-asked-questions) section. I'll merge your question, and a patch with the answer!
|
If you have a well phrased question that might benefit others, consider asking it by sending a patch to the documentation [FAQ](https://github.com/purpleidea/mgmt/blob/master/docs/documentation.md#usage-and-frequently-asked-questions) section. I'll merge your question, and a patch with the answer!
|
||||||
|
|
||||||
## Quick start:
|
## Quick start:
|
||||||
* Make sure you have golang version 1.6 or greater installed.
|
* Make sure you have golang version 1.6 or greater installed.
|
||||||
@@ -49,7 +49,7 @@ cd $GOPATH/src/github.com/purpleidea/mgmt
|
|||||||
Please look in the [examples/](examples/) folder for more examples!
|
Please look in the [examples/](examples/) folder for more examples!
|
||||||
|
|
||||||
## Documentation:
|
## Documentation:
|
||||||
Please see: the manually created [DOCUMENTATION.md](DOCUMENTATION.md) (also available as [PDF](https://pdfdoc-purpleidea.rhcloud.com/pdf/https://github.com/purpleidea/mgmt/blob/master/DOCUMENTATION.md)) and the automatically generated [GoDoc documentation](https://godoc.org/github.com/purpleidea/mgmt).
|
Please see: the manually created [documentation.md](docs/documentation.md) (also available as [PDF](https://pdfdoc-purpleidea.rhcloud.com/pdf/https://github.com/purpleidea/mgmt/blob/master/docs/documentation.md)) and the automatically generated [GoDoc documentation](https://godoc.org/github.com/purpleidea/mgmt).
|
||||||
|
|
||||||
## Roadmap:
|
## Roadmap:
|
||||||
Please see: [TODO.md](TODO.md) for a list of upcoming work and TODO items.
|
Please see: [TODO.md](TODO.md) for a list of upcoming work and TODO items.
|
||||||
|
|||||||
1
docs/.gitignore
vendored
Normal file
1
docs/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
|||||||
|
mgmt-documentation.pdf
|
||||||
@@ -23,7 +23,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|||||||
####Available from:
|
####Available from:
|
||||||
####[https://github.com/purpleidea/mgmt/](https://github.com/purpleidea/mgmt/)
|
####[https://github.com/purpleidea/mgmt/](https://github.com/purpleidea/mgmt/)
|
||||||
|
|
||||||
####This documentation is available in: [Markdown](https://github.com/purpleidea/mgmt/blob/master/DOCUMENTATION.md) or [PDF](https://pdfdoc-purpleidea.rhcloud.com/pdf/https://github.com/purpleidea/mgmt/blob/master/DOCUMENTATION.md) format.
|
####This documentation is available in: [Markdown](https://github.com/purpleidea/mgmt/blob/master/docs/documentation.md) or [PDF](https://pdfdoc-purpleidea.rhcloud.com/pdf/https://github.com/purpleidea/mgmt/blob/master/docs/documentation.md) format.
|
||||||
|
|
||||||
####Table of Contents
|
####Table of Contents
|
||||||
|
|
||||||
@@ -14,7 +14,7 @@ This document goes into detail on how this works, and lists
|
|||||||
some pitfalls and limitations.
|
some pitfalls and limitations.
|
||||||
|
|
||||||
For basic instructions on how to use the Puppet support, see
|
For basic instructions on how to use the Puppet support, see
|
||||||
the [main documentation](DOCUMENTATION.md#puppet-support).
|
the [main documentation](documentation.md#puppet-support).
|
||||||
|
|
||||||
##Prerequisites
|
##Prerequisites
|
||||||
|
|
||||||
534
docs/resource-guide.md
Normal file
534
docs/resource-guide.md
Normal file
@@ -0,0 +1,534 @@
|
|||||||
|
#mgmt
|
||||||
|
|
||||||
|
<!--
|
||||||
|
Mgmt
|
||||||
|
Copyright (C) 2013-2016+ James Shubin and the project contributors
|
||||||
|
Written by James Shubin <james@shubin.ca> and the project contributors
|
||||||
|
|
||||||
|
This program is free software: you can redistribute it and/or modify
|
||||||
|
it under the terms of the GNU Affero General Public License as published by
|
||||||
|
the Free Software Foundation, either version 3 of the License, or
|
||||||
|
(at your option) any later version.
|
||||||
|
|
||||||
|
This program is distributed in the hope that it will be useful,
|
||||||
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
You should have received a copy of the GNU Affero General Public License
|
||||||
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
-->
|
||||||
|
|
||||||
|
##mgmt resource guide by [James](https://ttboj.wordpress.com/)
|
||||||
|
####Available from:
|
||||||
|
####[https://github.com/purpleidea/mgmt/](https://github.com/purpleidea/mgmt/)
|
||||||
|
|
||||||
|
####This documentation is available in: [Markdown](https://github.com/purpleidea/mgmt/blob/master/docs/resource-guide.md) or [PDF](https://pdfdoc-purpleidea.rhcloud.com/pdf/https://github.com/purpleidea/mgmt/blob/master/docs/resource-guide.md) format.
|
||||||
|
|
||||||
|
####Table of Contents
|
||||||
|
|
||||||
|
1. [Overview](#overview)
|
||||||
|
2. [Theory - Resource theory in mgmt](#theory)
|
||||||
|
3. [Resource API - Getting started with mgmt](#resource-api)
|
||||||
|
* [Init - Initialize the resource](#init)
|
||||||
|
* [CheckApply - Check and apply resource state](#checkapply)
|
||||||
|
* [Watch - Detect resource changes](#watch)
|
||||||
|
* [Compare - Compare resource with another](#compare)
|
||||||
|
4. [Further considerations - More information about resource writing](#further-considerations)
|
||||||
|
5. [Automatic edges - Adding automatic resources dependencies](#automatic-edges)
|
||||||
|
6. [Automatic grouping - Grouping multiple resources into one](#automatic-grouping)
|
||||||
|
7. [Send/Recv - Communication between resources](#send-recv)
|
||||||
|
8. [Composite resources - Importing code from one resource into another](#composite-resources)
|
||||||
|
9. [FAQ - Frequently asked questions](#frequently-asked-questions)
|
||||||
|
10. [Suggestions - API change suggestions](#suggestions)
|
||||||
|
11. [Authors - Authors and contact information](#authors)
|
||||||
|
|
||||||
|
##Overview
|
||||||
|
|
||||||
|
The `mgmt` tool has built-in resource primitives which make up the building
|
||||||
|
blocks of any configuration. Each instance of a resource is mapped to a single
|
||||||
|
vertex in the resource [graph](https://en.wikipedia.org/wiki/Directed_acyclic_graph).
|
||||||
|
This guide is meant to instruct developers on how to write a brand new resource.
|
||||||
|
Since `mgmt` and the core resources are written in golang, some prior golang
|
||||||
|
knowledge is assumed.
|
||||||
|
|
||||||
|
##Theory
|
||||||
|
|
||||||
|
Resources in `mgmt` are similar to resources in other systems in that they are
|
||||||
|
[idempotent](https://en.wikipedia.org/wiki/Idempotence). Our resources are
|
||||||
|
uniquely different in that they can detect when their state has changed, and as
|
||||||
|
a result can run to revert or repair this change instantly. For some background
|
||||||
|
on this design, please read the
|
||||||
|
[original article](https://ttboj.wordpress.com/2016/01/18/next-generation-configuration-mgmt/)
|
||||||
|
on the subject.
|
||||||
|
|
||||||
|
##Resource API
|
||||||
|
|
||||||
|
To implement a resource in `mgmt` it must satisfy the
|
||||||
|
[`Res`](https://github.com/purpleidea/mgmt/blob/master/resources/resources.go)
|
||||||
|
interface. What follows are each of the method signatures and a description of
|
||||||
|
each.
|
||||||
|
|
||||||
|
###Init
|
||||||
|
```golang
|
||||||
|
Init() error
|
||||||
|
```
|
||||||
|
|
||||||
|
This is called to initialize the resource. If something goes wrong, it should
|
||||||
|
return an error. It should set the resource `kind`, do any resource specific
|
||||||
|
work, and finish by calling the `Init` method of the base resource.
|
||||||
|
|
||||||
|
####Example
|
||||||
|
```golang
|
||||||
|
// Init initializes the Foo resource.
|
||||||
|
func (obj *FooRes) Init() error {
|
||||||
|
obj.BaseRes.kind = "Foo" // must set capitalized resource kind
|
||||||
|
// run the resource specific initialization, and error if anything fails
|
||||||
|
if some_error {
|
||||||
|
return err // something went wrong!
|
||||||
|
}
|
||||||
|
return obj.BaseRes.Init() // call the base resource init
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
###CheckApply
|
||||||
|
```golang
|
||||||
|
CheckApply(apply bool) (checkOK bool, err error)
|
||||||
|
```
|
||||||
|
|
||||||
|
`CheckApply` is where the real _work_ is done. Under normal circumstances, this
|
||||||
|
function should check if the state of this resource is correct, and if so, it
|
||||||
|
should return: `(true, nil)`. If the `apply` variable is set to `true`, then
|
||||||
|
this means that we should then proceed to run the changes required to bring the
|
||||||
|
resource into the correct state. If the `apply` variable is set to `false`, then
|
||||||
|
the resource is operating in _noop_ mode and _no operations_ should be executed!
|
||||||
|
|
||||||
|
After having executed the necessary operations to bring the resource back into
|
||||||
|
the desired state, or after having detected that the state was incorrect, but
|
||||||
|
that changes can't be made because `apply` is `false`, you should then return
|
||||||
|
`(false, nil)`.
|
||||||
|
|
||||||
|
You must cause the resource to converge during a single execution of this
|
||||||
|
function. If you cannot, then you must return an error! The exception to this
|
||||||
|
rule is that if an external force changes the state of the resource while it is
|
||||||
|
being remedied, it is possible to return from this function even though the
|
||||||
|
resource isn't now converged. This is not a bug, as the resources `Watch`
|
||||||
|
facility will detect the change, ultimately resulting in a subsequent call to
|
||||||
|
`CheckApply`.
|
||||||
|
|
||||||
|
####Example
|
||||||
|
```golang
|
||||||
|
// CheckApply does the idempotent work of checking and applying resource state.
|
||||||
|
func (obj *FooRes) CheckApply(apply bool) (bool, error) {
|
||||||
|
// check the state
|
||||||
|
if state_is_okay { return true, nil } // done early! :)
|
||||||
|
// state was bad
|
||||||
|
if !apply { return false, nil } // don't apply; !stateok, nil
|
||||||
|
// do the apply!
|
||||||
|
return false, nil // after success applying
|
||||||
|
if any_error { return false, err } // anytime there's an err!
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
The `CheckApply` function is called by the `mgmt` engine when it believes a call
|
||||||
|
is necessary. Under certain conditions when a `Watch` call does not invalidate
|
||||||
|
the state of the resource, and no refresh call was sent, its execution might be
|
||||||
|
skipped. This is an engine optimization, and not a bug. It is mentioned here in
|
||||||
|
the documentation in case you are confused as to why a debug message you've
|
||||||
|
added to the code isn't always printed.
|
||||||
|
|
||||||
|
####Refresh notifications
|
||||||
|
Some resources may choose to support receiving refresh notifications. In general
|
||||||
|
these should be avoided if possible, but nevertheless, they do make sense in
|
||||||
|
certain situations. Resources that support these need to verify if one was sent
|
||||||
|
during the CheckApply phase of execution. This is accomplished by calling the
|
||||||
|
`Refresh() bool` method of the resource, and inspecting the return value. This
|
||||||
|
is only necessary if you plan to perform a refresh action. Refresh actions
|
||||||
|
should still respect the `apply` variable, and no system changes should be made
|
||||||
|
if it is `false`. Refresh notifications are generated by any resource when an
|
||||||
|
action is applied by that resource and are transmitted through graph edges which
|
||||||
|
have enabled their propagation. Resources that currently perform some refresh
|
||||||
|
action include `svc`, `timer`, and `password`.
|
||||||
|
|
||||||
|
####Paired execution
|
||||||
|
For many resources it is not uncommon to see `CheckApply` run twice in rapid
|
||||||
|
succession. This is usually not a pathological occurrence, but rather a healthy
|
||||||
|
pattern which is a consequence of the event system. When the state of the
|
||||||
|
resource is incorrect, `CheckApply` will run to remedy the state. In response to
|
||||||
|
having just changed the state, it is usually the case that this repair will
|
||||||
|
trigger the `Watch` code! In response, a second `CheckApply` is triggered, which
|
||||||
|
will likely find the state to now be correct.
|
||||||
|
|
||||||
|
####Summary
|
||||||
|
* Anytime an error occurs during `CheckApply`, you should return `(false, err)`.
|
||||||
|
* If the state is correct and no changes are needed, return `(true, nil)`.
|
||||||
|
* You should only make changes to the system if `apply` is set to `true`.
|
||||||
|
* After checking the state and possibly applying the fix, return `(false, nil)`.
|
||||||
|
* Returning `(true, err)` is a programming error and will cause a `Fatal`.
|
||||||
|
|
||||||
|
###Watch
|
||||||
|
```golang
|
||||||
|
Watch(chan Event) error
|
||||||
|
```
|
||||||
|
|
||||||
|
`Watch` is a main loop that runs and sends messages when it detects that the
|
||||||
|
state of the resource might have changed. To send a message you should write to
|
||||||
|
the input `Event` channel using the `DoSend` helper method. The Watch function
|
||||||
|
should run continuously until a shutdown message is received. If at any time
|
||||||
|
something goes wrong, you should return an error, and the `mgmt` engine will
|
||||||
|
handle possibly restarting the main loop based on the `retry` meta parameters.
|
||||||
|
|
||||||
|
It is better to send an event notification which turns out to be spurious, than
|
||||||
|
to miss a possible event. Resources which can miss events are incorrect and need
|
||||||
|
to be re-engineered so that this isn't the case. If you have an idea for a
|
||||||
|
resource which would fit this criteria, but you can't find a solution, please
|
||||||
|
contact the `mgmt` maintainers so that this problem can be investigated and a
|
||||||
|
possible system level engineering fix can be found.
|
||||||
|
|
||||||
|
You may have trouble deciding how much resource state checking should happen in
|
||||||
|
the `Watch` loop versus deferring it all to the `CheckApply` method. You may
|
||||||
|
want to put some simple fast path checking in `Watch` to avoid generating
|
||||||
|
obviously spurious events, but in general it's best to keep the `Watch` method
|
||||||
|
as simple as possible. Contact the `mgmt` maintainers if you're not sure.
|
||||||
|
|
||||||
|
If the resource is activated in `polling` mode, the `Watch` method will not get
|
||||||
|
executed. As a result, the resource must still work even if the main loop is not
|
||||||
|
running.
|
||||||
|
|
||||||
|
####Select
|
||||||
|
The lifetime of most resources `Watch` method should be spent in an infinite
|
||||||
|
loop that is bounded by a `select` call. The `select` call is the point where
|
||||||
|
our method hands back control to the engine (and the kernel) so that we can
|
||||||
|
sleep until something of interest wakes us up. In this loop we must process
|
||||||
|
events from the engine via the `<-obj.Events()` call, wait for the converged
|
||||||
|
timeout with `<-cuid.ConvergedTimer()`, and receive events for our resource
|
||||||
|
itself!
|
||||||
|
|
||||||
|
####Events
|
||||||
|
If we receive an internal event from the `<-obj.Events()` method, we can read it
|
||||||
|
with the ReadEvent helper function. This function tells us if we should shutdown
|
||||||
|
our resource, and if we should generate an event. When we want to send an event,
|
||||||
|
we use the `DoSend` helper function. It is also important to mark the resource
|
||||||
|
state as `dirty` if we believe it might have changed. We do this with the
|
||||||
|
`StateOK(false)` function.
|
||||||
|
|
||||||
|
####Startup
|
||||||
|
Once the `Watch` function has finished starting up successfully, it is important
|
||||||
|
to generate one event to notify the `mgmt` engine that we're now listening
|
||||||
|
successfully, so that it can run an initial `CheckApply` to ensure we're safely
|
||||||
|
tracking a healthy state and that we didn't miss anything when `Watch` was down
|
||||||
|
or from before `mgmt` was running. It does this by calling the `Running` method.
|
||||||
|
|
||||||
|
####Converged
|
||||||
|
The engine might be asked to shutdown when the entire state of the system has
|
||||||
|
not seen any changes for some duration of time. In order for the engine to be
|
||||||
|
able to make this determination, each resource must report its converged state.
|
||||||
|
To do this, the `Watch` method should get the `ConvergedUID` handle that has
|
||||||
|
been prepared for it by the engine. This is done by calling the `Converger`
|
||||||
|
method on the resource object. The result can be used to set the converged
|
||||||
|
status with `SetConverged`, and to notify when the particular timeout has been
|
||||||
|
reached by waiting on `ConvergedTimer`.
|
||||||
|
|
||||||
|
Instead of interacting with the `ConvergedUID` with these two methods, we can
|
||||||
|
instead use the `StartTimer` and `ResetTimer` methods which accomplish the same
|
||||||
|
thing, but provide a `select`-free interface for different coding situations.
|
||||||
|
|
||||||
|
####Example
|
||||||
|
```golang
|
||||||
|
// Watch is the listener and main loop for this resource.
|
||||||
|
func (obj *FooRes) Watch(processChan chan event.Event) error {
|
||||||
|
cuid := obj.Converger() // get the converger uid used to report status
|
||||||
|
|
||||||
|
// setup the Foo resource
|
||||||
|
var err error
|
||||||
|
if err, obj.foo = OpenFoo(); err != nil {
|
||||||
|
return err // we couldn't startup
|
||||||
|
}
|
||||||
|
defer obj.whatever.CloseFoo() // shutdown our
|
||||||
|
|
||||||
|
// notify engine that we're running
|
||||||
|
if err := obj.Running(processChan); err != nil {
|
||||||
|
return err // bubble up a NACK...
|
||||||
|
}
|
||||||
|
|
||||||
|
var send = false // send event?
|
||||||
|
var exit = false
|
||||||
|
for {
|
||||||
|
obj.SetState(ResStateWatching) // reset
|
||||||
|
select {
|
||||||
|
case event := <-obj.Events():
|
||||||
|
cuid.SetConverged(false)
|
||||||
|
// we avoid sending events on unpause
|
||||||
|
if exit, send = obj.ReadEvent(&event); exit {
|
||||||
|
return nil // exit
|
||||||
|
}
|
||||||
|
|
||||||
|
// the actual events!
|
||||||
|
case event := <-obj.foo.Events:
|
||||||
|
if is_an_event {
|
||||||
|
send = true // used below
|
||||||
|
cuid.SetConverged(false)
|
||||||
|
obj.StateOK(false) // dirty
|
||||||
|
}
|
||||||
|
|
||||||
|
// event errors
|
||||||
|
case err := <-obj.foo.Errors:
|
||||||
|
cuuid.SetConverged(false)
|
||||||
|
return err // will cause a retry or permanent failure
|
||||||
|
|
||||||
|
case <-cuid.ConvergedTimer():
|
||||||
|
cuid.SetConverged(true) // converged!
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// do all our event sending all together to avoid duplicate msgs
|
||||||
|
if send {
|
||||||
|
send = false
|
||||||
|
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
|
||||||
|
return err // we exit or bubble up a NACK...
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
####Summary
|
||||||
|
* Remember to call the appropriate `converger` methods throughout the resource.
|
||||||
|
* Remember to call `Startup` when the `Watch` is running successfully.
|
||||||
|
* Remember to process internal events and shutdown promptly if asked to.
|
||||||
|
* Ensure the design of your resource is well thought out.
|
||||||
|
* Have a look at the existing resources for a rough idea of how this all works.
|
||||||
|
|
||||||
|
###Compare
|
||||||
|
```golang
|
||||||
|
Compare(Res) bool
|
||||||
|
```
|
||||||
|
|
||||||
|
Each resource must have a `Compare` method. This takes as input another resource
|
||||||
|
and must return whether they are identical or not. This is used for identifying
|
||||||
|
if an existing resource can be used in place of a new one with a similar set of
|
||||||
|
parameters. In particular, when switching from one graph to a new (possibly
|
||||||
|
identical) graph, this avoids recomputing the state for resources which don't
|
||||||
|
change or that are sufficiently similar that they don't need to be swapped out.
|
||||||
|
|
||||||
|
In general if all the resource properties are identical, then they usually don't
|
||||||
|
need to be changed. On occasion, not all of them need to be compared, in
|
||||||
|
particular if they store some generated state, or if they aren't significant in
|
||||||
|
some way.
|
||||||
|
|
||||||
|
####Example
|
||||||
|
```golang
|
||||||
|
// Compare two resources and return if they are equivalent.
|
||||||
|
func (obj *FooRes) Compare(res Res) bool {
|
||||||
|
switch res.(type) {
|
||||||
|
case *FooRes: // only compare to other resources of the Foo kind!
|
||||||
|
res := res.(*FileRes)
|
||||||
|
if !obj.BaseRes.Compare(res) { // call base Compare
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if obj.Name != res.Name {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if obj.whatever != res.whatever {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if obj.Flag != res.Flag {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return false // different kind of resource
|
||||||
|
}
|
||||||
|
return true // they must match!
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
###Validate
|
||||||
|
```golang
|
||||||
|
Validate() error
|
||||||
|
```
|
||||||
|
|
||||||
|
This method is used to validate if the populated resource struct is a valid
|
||||||
|
representation of the resource kind. If it does not conform to the resource
|
||||||
|
specifications, it should generate an error. If you notice that this method is
|
||||||
|
quite large, it might be an indication that you might want to reconsider the
|
||||||
|
parameter list and interface to this resource.
|
||||||
|
|
||||||
|
###GetUIDs
|
||||||
|
```golang
|
||||||
|
GetUIDs() []ResUID
|
||||||
|
```
|
||||||
|
|
||||||
|
The `GetUIDs` method returns a list of `ResUID` interfaces that represent the
|
||||||
|
particular resource uniquely. This is used with the AutoEdges API to determine
|
||||||
|
if another resource can match a dependency to this one.
|
||||||
|
|
||||||
|
###AutoEdges
|
||||||
|
```golang
|
||||||
|
AutoEdges() AutoEdge
|
||||||
|
```
|
||||||
|
|
||||||
|
This returns a struct that implements the `AutoEdge` interface. This struct
|
||||||
|
is used to match other resources that might be relevant dependencies for this
|
||||||
|
resource.
|
||||||
|
|
||||||
|
###CollectPattern
|
||||||
|
```golang
|
||||||
|
CollectPattern() string
|
||||||
|
```
|
||||||
|
|
||||||
|
This is currently a stub and will be updated once the DSL is further along.
|
||||||
|
|
||||||
|
##Further considerations
|
||||||
|
There is some additional information that any resource writer will need to know.
|
||||||
|
Each issue is listed separately below!
|
||||||
|
|
||||||
|
###Resource struct
|
||||||
|
Each resource will implement methods as pointer receivers on a resource struct.
|
||||||
|
The resource struct must include an anonymous reference to the `BaseRes` struct.
|
||||||
|
The naming convention for resources is that they end with a `Res` suffix. If
|
||||||
|
you'd like your resource to be accessible by the `YAML` graph API (GAPI), then
|
||||||
|
you'll need to include the appropriate YAML fields as shown below.
|
||||||
|
|
||||||
|
####Example
|
||||||
|
```golang
|
||||||
|
type FooRes struct {
|
||||||
|
BaseRes `yaml:",inline"` // base properties
|
||||||
|
|
||||||
|
Whatever string `yaml:"whatever"` // you pick!
|
||||||
|
Bar int // no yaml, used as public output value for send/recv
|
||||||
|
Baz bool `yaml:"baz"` // something else
|
||||||
|
|
||||||
|
something string // some private field
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
###YAML
|
||||||
|
In addition to labelling your resource struct with YAML fields, you must also
|
||||||
|
add an entry to the internal `GraphConfig` struct. It is a fairly straight
|
||||||
|
forward one line patch.
|
||||||
|
|
||||||
|
```golang
|
||||||
|
type GraphConfig struct {
|
||||||
|
// [snip...]
|
||||||
|
Resources struct {
|
||||||
|
Noop []*resources.NoopRes `yaml:"noop"`
|
||||||
|
File []*resources.FileRes `yaml:"file"`
|
||||||
|
// [snip...]
|
||||||
|
Foo []*resources.FooRes `yaml:"foo"` // tada :)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
###Gob registration
|
||||||
|
All resources must be registered with the `golang` _gob_ module so that they can
|
||||||
|
be encoded and decoded. Make sure to include the following code snippet for this
|
||||||
|
to work.
|
||||||
|
|
||||||
|
```golang
|
||||||
|
import "encoding/gob"
|
||||||
|
func init() { // special golang method that runs once
|
||||||
|
gob.Register(&FooRes{}) // substitude your resource here
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
##Automatic edges
|
||||||
|
Automatic edges in `mgmt` are well described in [this article](https://ttboj.wordpress.com/2016/03/14/automatic-edges-in-mgmt/).
|
||||||
|
The best example of this technique can be seen in the `svc` resource.
|
||||||
|
Unfortunately no further documentation about this subject has been written. To
|
||||||
|
expand this section, please send a patch! Please contact us if you'd like to
|
||||||
|
work on a resource that uses this feature, or to add it to an existing one!
|
||||||
|
|
||||||
|
##Automatic grouping
|
||||||
|
Automatic grouping in `mgmt` is well described in [this article](https://ttboj.wordpress.com/2016/03/30/automatic-grouping-in-mgmt/).
|
||||||
|
The best example of this technique can be seen in the `pkg` resource.
|
||||||
|
Unfortunately no further documentation about this subject has been written. To
|
||||||
|
expand this section, please send a patch! Please contact us if you'd like to
|
||||||
|
work on a resource that uses this feature, or to add it to an existing one!
|
||||||
|
|
||||||
|
|
||||||
|
##Send/Recv
|
||||||
|
In `mgmt` there is a novel concept called _Send/Recv_. For some background,
|
||||||
|
please [read the introductory article](https://ttboj.wordpress.com/2016/12/07/sendrecv-in-mgmt/).
|
||||||
|
When using this feature, the engine will automatically send the user specified
|
||||||
|
value to the intended destination without requiring any resource specific code.
|
||||||
|
Any time that one of the destination values is changed, the engine automatically
|
||||||
|
marks the resource state as `dirty`. To detect if a particular value was
|
||||||
|
received, and if it changed (during this invocation of CheckApply) from the
|
||||||
|
previous value, you can query the Recv parameter. It will contain a `map` of all
|
||||||
|
the keys which can be received on, and the value has a `Changed` property which
|
||||||
|
will indicate whether the value was updated on this particular `CheckApply`
|
||||||
|
invocation. The type of the sending key must match that of the receiving one.
|
||||||
|
This can _only_ be done inside of the `CheckApply` function!
|
||||||
|
|
||||||
|
```golang
|
||||||
|
// inside CheckApply, probably near the top
|
||||||
|
if val, exists := obj.Recv["SomeKey"]; exists {
|
||||||
|
log.Printf("SomeKey was sent to us from: %s[%s].%s", val.Res.Kind(), val.Res.GetName(), val.Key)
|
||||||
|
if val.Changed {
|
||||||
|
log.Printf("SomeKey was just updated!")
|
||||||
|
// you may want to invalidate some local cache
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
Astute readers will note that there isn't anything that prevents a user from
|
||||||
|
sending an identically typed value to some arbitrary (public) key that the
|
||||||
|
resource author hadn't considered! While this is true, resources should probably
|
||||||
|
work within this problem space anyways. The rule of thumb is that any public
|
||||||
|
parameter which is normally used in a resource can be used safely.
|
||||||
|
|
||||||
|
One subtle scenario is that if a resource creates a local cache or stores a
|
||||||
|
computation that depends on the value of a public parameter and will require
|
||||||
|
invalidation should that public parameter change, then you must detect that
|
||||||
|
scenario and invalidate the cache when it occurs. This *must* be processed
|
||||||
|
before there is a possibility of failure in CheckApply, because if we fail (and
|
||||||
|
possibly run again) the subsequent send->recv transfer might not have a new
|
||||||
|
value to copy, and therefore we won't see this notification of change.
|
||||||
|
Therefore, it is important to process these promptly, if they must not be lost,
|
||||||
|
such as for cache invalidation.
|
||||||
|
|
||||||
|
Remember, `Send/Recv` only changes your resource code if you cache state.
|
||||||
|
|
||||||
|
##Composite resources
|
||||||
|
Composite resources are resources which embed one or more existing resources.
|
||||||
|
This is useful to prevent code duplication in higher level resource scenarios.
|
||||||
|
The best example of this technique can be seen in the `nspawn` resource which
|
||||||
|
can be seen to partially embed a `svc` resource, but without its `Watch`.
|
||||||
|
Unfortunately no further documentation about this subject has been written. To
|
||||||
|
expand this section, please send a patch! Please contact us if you'd like to
|
||||||
|
work on a resource that uses this feature, or to add it to an existing one!
|
||||||
|
|
||||||
|
##Frequently asked questions
|
||||||
|
(Send your questions as a patch to this FAQ! I'll review it, merge it, and
|
||||||
|
respond by commit with the answer.)
|
||||||
|
|
||||||
|
###Can I write resources in a different language?
|
||||||
|
Currently `golang` is the only supported language for built-in resources. We
|
||||||
|
might consider allowing external resources to be imported in the future. This
|
||||||
|
will likely require a language that can expose a C-like API, such as `python` or
|
||||||
|
`ruby`. Custom `golang` resources are already possible when using mgmt as a lib.
|
||||||
|
Higher level resource collections will be possible once the `mgmt` DSL is ready.
|
||||||
|
|
||||||
|
###What new resource primitives need writing?
|
||||||
|
There are still many ideas for new resources that haven't been written yet. If
|
||||||
|
you'd like to contribute one, please contact us and tell us about your idea!
|
||||||
|
|
||||||
|
###Where can I find more information about mgmt?
|
||||||
|
Additional blog posts, videos and other material [is available!](https://github.com/purpleidea/mgmt/#on-the-web).
|
||||||
|
|
||||||
|
##Suggestions
|
||||||
|
If you have any ideas for API changes or other improvements to resource writing,
|
||||||
|
please let us know! We're still pre 1.0 and pre 0.1 and happy to break API in
|
||||||
|
order to get it right!
|
||||||
|
|
||||||
|
##Authors
|
||||||
|
|
||||||
|
Copyright (C) 2013-2016+ James Shubin and the project contributors
|
||||||
|
|
||||||
|
Please see the
|
||||||
|
[AUTHORS](https://github.com/purpleidea/mgmt/tree/master/AUTHORS) file
|
||||||
|
for more information.
|
||||||
|
|
||||||
|
* [github](https://github.com/purpleidea/)
|
||||||
|
* [@purpleidea](https://twitter.com/#!/purpleidea)
|
||||||
|
* [https://ttboj.wordpress.com/](https://ttboj.wordpress.com/)
|
||||||
@@ -316,7 +316,7 @@ func (obj *EmbdEtcd) Connect(reconnect bool) error {
|
|||||||
if emax > maxClientConnectRetries {
|
if emax > maxClientConnectRetries {
|
||||||
log.Printf("Etcd: The dataDir (%s) might be inconsistent or corrupt.", obj.dataDir)
|
log.Printf("Etcd: The dataDir (%s) might be inconsistent or corrupt.", obj.dataDir)
|
||||||
log.Printf("Etcd: Please see: %s", "https://github.com/purpleidea/mgmt/blob/master/DOCUMENTATION.md#what-does-the-error-message-about-an-inconsistent-datadir-mean")
|
log.Printf("Etcd: Please see: %s", "https://github.com/purpleidea/mgmt/blob/master/DOCUMENTATION.md#what-does-the-error-message-about-an-inconsistent-datadir-mean")
|
||||||
obj.cError = fmt.Errorf("Can't find an available endpoint.")
|
obj.cError = fmt.Errorf("can't find an available endpoint")
|
||||||
return obj.cError
|
return obj.cError
|
||||||
}
|
}
|
||||||
err = &CtxDelayErr{time.Duration(emax) * time.Second, "No endpoints available yet!"} // retry with backoff...
|
err = &CtxDelayErr{time.Duration(emax) * time.Second, "No endpoints available yet!"} // retry with backoff...
|
||||||
|
|||||||
@@ -85,8 +85,8 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) {
|
|||||||
return g, err
|
return g, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// SwitchStream returns nil errors every time there could be a new graph.
|
// Next returns nil errors every time there could be a new graph.
|
||||||
func (obj *MyGAPI) SwitchStream() chan error {
|
func (obj *MyGAPI) Next() chan error {
|
||||||
if obj.data.NoWatch || obj.Interval <= 0 {
|
if obj.data.NoWatch || obj.Interval <= 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -78,8 +78,8 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) {
|
|||||||
return g, nil
|
return g, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SwitchStream returns nil errors every time there could be a new graph.
|
// Next returns nil errors every time there could be a new graph.
|
||||||
func (obj *MyGAPI) SwitchStream() chan error {
|
func (obj *MyGAPI) Next() chan error {
|
||||||
if obj.data.NoWatch || obj.Interval <= 0 {
|
if obj.data.NoWatch || obj.Interval <= 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -86,7 +86,7 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) {
|
|||||||
Name: "file1",
|
Name: "file1",
|
||||||
// send->recv!
|
// send->recv!
|
||||||
Recv: map[string]*resources.Send{
|
Recv: map[string]*resources.Send{
|
||||||
"Content": &resources.Send{Res: p1, Key: "Password"},
|
"Content": {Res: p1, Key: "Password"},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
Path: "/tmp/mgmt/secret",
|
Path: "/tmp/mgmt/secret",
|
||||||
@@ -120,8 +120,8 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) {
|
|||||||
return g, nil
|
return g, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SwitchStream returns nil errors every time there could be a new graph.
|
// Next returns nil errors every time there could be a new graph.
|
||||||
func (obj *MyGAPI) SwitchStream() chan error {
|
func (obj *MyGAPI) Next() chan error {
|
||||||
if obj.data.NoWatch || obj.Interval <= 0 {
|
if obj.data.NoWatch || obj.Interval <= 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -45,6 +45,6 @@ type Data struct {
|
|||||||
type GAPI interface {
|
type GAPI interface {
|
||||||
Init(Data) error // initializes the GAPI and passes in useful data
|
Init(Data) error // initializes the GAPI and passes in useful data
|
||||||
Graph() (*pgraph.Graph, error) // returns the most recent pgraph
|
Graph() (*pgraph.Graph, error) // returns the most recent pgraph
|
||||||
SwitchStream() chan error // returns a stream of switch events
|
Next() chan error // returns a stream of switch events
|
||||||
Close() error // shutdown the GAPI
|
Close() error // shutdown the GAPI
|
||||||
}
|
}
|
||||||
|
|||||||
22
lib/main.go
22
lib/main.go
@@ -23,7 +23,6 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/purpleidea/mgmt/converger"
|
"github.com/purpleidea/mgmt/converger"
|
||||||
@@ -267,7 +266,6 @@ func (obj *Main) Run() error {
|
|||||||
// TODO: Import admin key
|
// TODO: Import admin key
|
||||||
}
|
}
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
var G, oldGraph *pgraph.Graph
|
var G, oldGraph *pgraph.Graph
|
||||||
|
|
||||||
// exit after `max-runtime` seconds for no reason at all...
|
// exit after `max-runtime` seconds for no reason at all...
|
||||||
@@ -346,7 +344,7 @@ func (obj *Main) Run() error {
|
|||||||
if err := obj.GAPI.Init(data); err != nil {
|
if err := obj.GAPI.Init(data); err != nil {
|
||||||
obj.Exit(fmt.Errorf("Main: GAPI: Init failed: %v", err))
|
obj.Exit(fmt.Errorf("Main: GAPI: Init failed: %v", err))
|
||||||
} else if !obj.NoWatch {
|
} else if !obj.NoWatch {
|
||||||
gapiChan = obj.GAPI.SwitchStream() // stream of graph switch events!
|
gapiChan = obj.GAPI.Next() // stream of graph switch events!
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -412,8 +410,8 @@ func (obj *Main) Run() error {
|
|||||||
log.Printf("Config: Error creating new graph: %v", err)
|
log.Printf("Config: Error creating new graph: %v", err)
|
||||||
// unpause!
|
// unpause!
|
||||||
if !first {
|
if !first {
|
||||||
G.Start(&wg, first) // sync
|
G.Start(first) // sync
|
||||||
converger.Start() // after G.Start()
|
converger.Start() // after G.Start()
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -440,8 +438,8 @@ func (obj *Main) Run() error {
|
|||||||
log.Printf("Config: Error running graph sync: %v", err)
|
log.Printf("Config: Error running graph sync: %v", err)
|
||||||
// unpause!
|
// unpause!
|
||||||
if !first {
|
if !first {
|
||||||
G.Start(&wg, first) // sync
|
G.Start(first) // sync
|
||||||
converger.Start() // after G.Start()
|
converger.Start() // after G.Start()
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -466,8 +464,8 @@ func (obj *Main) Run() error {
|
|||||||
// some are not ready yet and the EtcdWatch
|
// some are not ready yet and the EtcdWatch
|
||||||
// loops, we'll cause G.Pause(...) before we
|
// loops, we'll cause G.Pause(...) before we
|
||||||
// even got going, thus causing nil pointer errors
|
// even got going, thus causing nil pointer errors
|
||||||
G.Start(&wg, first) // sync
|
G.Start(first) // sync
|
||||||
converger.Start() // after G.Start()
|
converger.Start() // after G.Start()
|
||||||
first = false
|
first = false
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@@ -542,11 +540,11 @@ func (obj *Main) Run() error {
|
|||||||
reterr = multierr.Append(reterr, err) // list of errors
|
reterr = multierr.Append(reterr, err) // list of errors
|
||||||
}
|
}
|
||||||
|
|
||||||
G.Exit() // tell all the children to exit
|
|
||||||
|
|
||||||
// tell inner main loop to exit
|
// tell inner main loop to exit
|
||||||
close(exitchan)
|
close(exitchan)
|
||||||
|
|
||||||
|
G.Exit() // tell all the children to exit, and waits for them to do so
|
||||||
|
|
||||||
// cleanup etcd main loop last so it can process everything first
|
// cleanup etcd main loop last so it can process everything first
|
||||||
if err := EmbdEtcd.Destroy(); err != nil { // shutdown and cleanup etcd
|
if err := EmbdEtcd.Destroy(); err != nil { // shutdown and cleanup etcd
|
||||||
err = errwrap.Wrapf(err, "Etcd exited poorly!")
|
err = errwrap.Wrapf(err, "Etcd exited poorly!")
|
||||||
@@ -557,8 +555,6 @@ func (obj *Main) Run() error {
|
|||||||
log.Printf("Main: Graph: %v", G)
|
log.Printf("Main: Graph: %v", G)
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait() // wait for primary go routines to exit
|
|
||||||
|
|
||||||
// TODO: wait for each vertex to exit...
|
// TODO: wait for each vertex to exit...
|
||||||
log.Println("Goodbye!")
|
log.Println("Goodbye!")
|
||||||
return reterr
|
return reterr
|
||||||
|
|||||||
@@ -289,6 +289,9 @@ func (g *Graph) Worker(v *Vertex) error {
|
|||||||
// the Watch() function about which graph it is
|
// the Watch() function about which graph it is
|
||||||
// running on, which isolates things nicely...
|
// running on, which isolates things nicely...
|
||||||
obj := v.Res
|
obj := v.Res
|
||||||
|
// TODO: is there a better system for the `Watching` flag?
|
||||||
|
obj.SetWatching(true)
|
||||||
|
defer obj.SetWatching(false)
|
||||||
processChan := make(chan event.Event)
|
processChan := make(chan event.Event)
|
||||||
go func() {
|
go func() {
|
||||||
running := false
|
running := false
|
||||||
@@ -415,7 +418,9 @@ func (g *Graph) Worker(v *Vertex) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TODO: reset the watch retry count after some amount of success
|
// TODO: reset the watch retry count after some amount of success
|
||||||
|
v.Res.RegisterConverger()
|
||||||
e := v.Res.Watch(processChan)
|
e := v.Res.Watch(processChan)
|
||||||
|
v.Res.UnregisterConverger()
|
||||||
if e == nil { // exit signal
|
if e == nil { // exit signal
|
||||||
err = nil // clean exit
|
err = nil // clean exit
|
||||||
break
|
break
|
||||||
@@ -445,31 +450,14 @@ func (g *Graph) Worker(v *Vertex) error {
|
|||||||
|
|
||||||
// Start is a main kick to start the graph. It goes through in reverse topological
|
// Start is a main kick to start the graph. It goes through in reverse topological
|
||||||
// sort order so that events can't hit un-started vertices.
|
// sort order so that events can't hit un-started vertices.
|
||||||
func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue
|
func (g *Graph) Start(first bool) { // start or continue
|
||||||
log.Printf("State: %v -> %v", g.setState(graphStateStarting), g.getState())
|
log.Printf("State: %v -> %v", g.setState(graphStateStarting), g.getState())
|
||||||
defer log.Printf("State: %v -> %v", g.setState(graphStateStarted), g.getState())
|
defer log.Printf("State: %v -> %v", g.setState(graphStateStarted), g.getState())
|
||||||
|
var wg sync.WaitGroup
|
||||||
t, _ := g.TopologicalSort()
|
t, _ := g.TopologicalSort()
|
||||||
// TODO: only calculate indegree if `first` is true to save resources
|
// TODO: only calculate indegree if `first` is true to save resources
|
||||||
indegree := g.InDegree() // compute all of the indegree's
|
indegree := g.InDegree() // compute all of the indegree's
|
||||||
for _, v := range Reverse(t) {
|
for _, v := range Reverse(t) {
|
||||||
|
|
||||||
if !v.Res.IsWatching() { // if Watch() is not running...
|
|
||||||
wg.Add(1)
|
|
||||||
// must pass in value to avoid races...
|
|
||||||
// see: https://ttboj.wordpress.com/2015/07/27/golang-parallelism-issues-causing-too-many-open-files-error/
|
|
||||||
go func(vv *Vertex) {
|
|
||||||
defer wg.Done()
|
|
||||||
// TODO: if a sufficient number of workers error,
|
|
||||||
// should something be done? Will these restart
|
|
||||||
// after perma-failure if we have a graph change?
|
|
||||||
if err := g.Worker(vv); err != nil { // contains the Watch and CheckApply loops
|
|
||||||
log.Printf("%s[%s]: Exited with failure: %v", vv.Kind(), vv.GetName(), err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
log.Printf("%s[%s]: Exited", vv.Kind(), vv.GetName())
|
|
||||||
}(v)
|
|
||||||
}
|
|
||||||
|
|
||||||
// selective poke: here we reduce the number of initial pokes
|
// selective poke: here we reduce the number of initial pokes
|
||||||
// to the minimum required to activate every vertex in the
|
// to the minimum required to activate every vertex in the
|
||||||
// graph, either by direct action, or by getting poked by a
|
// graph, either by direct action, or by getting poked by a
|
||||||
@@ -483,18 +471,39 @@ func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue
|
|||||||
// function) we need to poke to *unpause* every graph vertex,
|
// function) we need to poke to *unpause* every graph vertex,
|
||||||
// and not just selectively the subset with no indegree.
|
// and not just selectively the subset with no indegree.
|
||||||
if (!first) || indegree[v] == 0 {
|
if (!first) || indegree[v] == 0 {
|
||||||
// ensure state is started before continuing on to next vertex
|
v.Res.Starter(true) // let the startup code know to poke
|
||||||
for !v.SendEvent(event.EventStart, true, false) {
|
}
|
||||||
if g.Flags.Debug {
|
|
||||||
// if SendEvent fails, we aren't up yet
|
if !v.Res.IsWatching() { // if Watch() is not running...
|
||||||
log.Printf("%s[%s]: Retrying SendEvent(Start)", v.Kind(), v.GetName())
|
g.wg.Add(1)
|
||||||
// sleep here briefly or otherwise cause
|
// must pass in value to avoid races...
|
||||||
// a different goroutine to be scheduled
|
// see: https://ttboj.wordpress.com/2015/07/27/golang-parallelism-issues-causing-too-many-open-files-error/
|
||||||
time.Sleep(1 * time.Millisecond)
|
go func(vv *Vertex) {
|
||||||
|
defer g.wg.Done()
|
||||||
|
// TODO: if a sufficient number of workers error,
|
||||||
|
// should something be done? Will these restart
|
||||||
|
// after perma-failure if we have a graph change?
|
||||||
|
if err := g.Worker(vv); err != nil { // contains the Watch and CheckApply loops
|
||||||
|
log.Printf("%s[%s]: Exited with failure: %v", vv.Kind(), vv.GetName(), err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
log.Printf("%s[%s]: Exited", vv.Kind(), vv.GetName())
|
||||||
|
}(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
// let the vertices run their startup code in parallel
|
||||||
|
wg.Add(1)
|
||||||
|
go func(vv *Vertex) {
|
||||||
|
defer wg.Done()
|
||||||
|
vv.Res.Started() // block until started
|
||||||
|
}(v)
|
||||||
|
|
||||||
|
if !first { // unpause!
|
||||||
|
v.Res.SendEvent(event.EventStart, true, false) // sync!
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
wg.Wait() // wait for everyone
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pause sends pause events to the graph in a topological sort order.
|
// Pause sends pause events to the graph in a topological sort order.
|
||||||
@@ -522,4 +531,5 @@ func (g *Graph) Exit() {
|
|||||||
|
|
||||||
v.SendEvent(event.EventExit, true, false)
|
v.SendEvent(event.EventExit, true, false)
|
||||||
}
|
}
|
||||||
|
g.wg.Wait() // for now, this doesn't need to be a separate Wait() method
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -40,6 +40,7 @@ const (
|
|||||||
graphStatePaused
|
graphStatePaused
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Flags contains specific constants used by the graph.
|
||||||
type Flags struct {
|
type Flags struct {
|
||||||
Debug bool
|
Debug bool
|
||||||
}
|
}
|
||||||
@@ -55,7 +56,8 @@ type Graph struct {
|
|||||||
Adjacency map[*Vertex]map[*Vertex]*Edge // *Vertex -> *Vertex (edge)
|
Adjacency map[*Vertex]map[*Vertex]*Edge // *Vertex -> *Vertex (edge)
|
||||||
Flags Flags
|
Flags Flags
|
||||||
state graphState
|
state graphState
|
||||||
mutex sync.Mutex // used when modifying graph State variable
|
mutex *sync.Mutex // used when modifying graph State variable
|
||||||
|
wg *sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
// Vertex is the primary vertex struct in this library.
|
// Vertex is the primary vertex struct in this library.
|
||||||
@@ -78,6 +80,9 @@ func NewGraph(name string) *Graph {
|
|||||||
Name: name,
|
Name: name,
|
||||||
Adjacency: make(map[*Vertex]map[*Vertex]*Edge),
|
Adjacency: make(map[*Vertex]map[*Vertex]*Edge),
|
||||||
state: graphStateNil,
|
state: graphStateNil,
|
||||||
|
// ptr b/c: Mutex/WaitGroup must not be copied after first use
|
||||||
|
mutex: &sync.Mutex{},
|
||||||
|
wg: &sync.WaitGroup{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -112,6 +117,8 @@ func (g *Graph) Copy() *Graph {
|
|||||||
Adjacency: make(map[*Vertex]map[*Vertex]*Edge, len(g.Adjacency)),
|
Adjacency: make(map[*Vertex]map[*Vertex]*Edge, len(g.Adjacency)),
|
||||||
Flags: g.Flags,
|
Flags: g.Flags,
|
||||||
state: g.state,
|
state: g.state,
|
||||||
|
mutex: g.mutex,
|
||||||
|
wg: g.wg,
|
||||||
}
|
}
|
||||||
for k, v := range g.Adjacency {
|
for k, v := range g.Adjacency {
|
||||||
newGraph.Adjacency[k] = v // copy
|
newGraph.Adjacency[k] = v // copy
|
||||||
|
|||||||
@@ -74,8 +74,8 @@ func (obj *GAPI) Graph() (*pgraph.Graph, error) {
|
|||||||
return g, err
|
return g, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// SwitchStream returns nil errors every time there could be a new graph.
|
// Next returns nil errors every time there could be a new graph.
|
||||||
func (obj *GAPI) SwitchStream() chan error {
|
func (obj *GAPI) Next() chan error {
|
||||||
if obj.data.NoWatch {
|
if obj.data.NoWatch {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -32,7 +32,8 @@ import (
|
|||||||
const (
|
const (
|
||||||
// PuppetYAMLBufferSize is the maximum buffer size for the yaml input data
|
// PuppetYAMLBufferSize is the maximum buffer size for the yaml input data
|
||||||
PuppetYAMLBufferSize = 65535
|
PuppetYAMLBufferSize = 65535
|
||||||
Debug = false // FIXME: integrate with global debug flag
|
// Debug is a local debug constant used in this module
|
||||||
|
Debug = false // FIXME: integrate with global debug flag
|
||||||
)
|
)
|
||||||
|
|
||||||
func runPuppetCommand(cmd *exec.Cmd) ([]byte, error) {
|
func runPuppetCommand(cmd *exec.Cmd) ([]byte, error) {
|
||||||
|
|||||||
@@ -156,11 +156,11 @@ func (obj *SSH) Sftp() error {
|
|||||||
var err error
|
var err error
|
||||||
|
|
||||||
if obj.client == nil {
|
if obj.client == nil {
|
||||||
return fmt.Errorf("Not dialed!")
|
return fmt.Errorf("not dialed")
|
||||||
}
|
}
|
||||||
// this check is needed because the golang path.Base function is weird!
|
// this check is needed because the golang path.Base function is weird!
|
||||||
if strings.HasSuffix(obj.file, "/") {
|
if strings.HasSuffix(obj.file, "/") {
|
||||||
return fmt.Errorf("File must not be a directory.")
|
return fmt.Errorf("file must not be a directory")
|
||||||
}
|
}
|
||||||
|
|
||||||
// we run local operations first so that remote clean up is easier...
|
// we run local operations first so that remote clean up is easier...
|
||||||
@@ -254,7 +254,7 @@ func (obj *SSH) Sftp() error {
|
|||||||
// make file executable; don't cache this in case it didn't ever happen
|
// make file executable; don't cache this in case it didn't ever happen
|
||||||
// TODO: do we want the group or other bits set?
|
// TODO: do we want the group or other bits set?
|
||||||
if err := obj.sftp.Chmod(obj.execpath, 0770); err != nil {
|
if err := obj.sftp.Chmod(obj.execpath, 0770); err != nil {
|
||||||
return fmt.Errorf("Can't set file mode bits!")
|
return fmt.Errorf("can't set file mode bits")
|
||||||
}
|
}
|
||||||
|
|
||||||
// copy graph file
|
// copy graph file
|
||||||
@@ -273,7 +273,7 @@ func (obj *SSH) Sftp() error {
|
|||||||
// SftpGraphCopy is a helper function used for re-copying the graph definition.
|
// SftpGraphCopy is a helper function used for re-copying the graph definition.
|
||||||
func (obj *SSH) SftpGraphCopy() (int64, error) {
|
func (obj *SSH) SftpGraphCopy() (int64, error) {
|
||||||
if obj.filepath == "" {
|
if obj.filepath == "" {
|
||||||
return -1, fmt.Errorf("Sftp session isn't ready yet!")
|
return -1, fmt.Errorf("sftp session isn't ready yet")
|
||||||
}
|
}
|
||||||
return obj.SftpCopy(obj.file, obj.filepath)
|
return obj.SftpCopy(obj.file, obj.filepath)
|
||||||
}
|
}
|
||||||
@@ -281,7 +281,7 @@ func (obj *SSH) SftpGraphCopy() (int64, error) {
|
|||||||
// SftpCopy is a simple helper function that runs a local -> remote sftp copy.
|
// SftpCopy is a simple helper function that runs a local -> remote sftp copy.
|
||||||
func (obj *SSH) SftpCopy(src, dst string) (int64, error) {
|
func (obj *SSH) SftpCopy(src, dst string) (int64, error) {
|
||||||
if obj.sftp == nil {
|
if obj.sftp == nil {
|
||||||
return -1, fmt.Errorf("Sftp session is not active!")
|
return -1, fmt.Errorf("sftp session is not active")
|
||||||
}
|
}
|
||||||
var err error
|
var err error
|
||||||
// TODO: add a check to make sure we don't run two copies of this
|
// TODO: add a check to make sure we don't run two copies of this
|
||||||
@@ -313,7 +313,7 @@ func (obj *SSH) SftpCopy(src, dst string) (int64, error) {
|
|||||||
return n, fmt.Errorf("Can't copy to remote path: %v", err)
|
return n, fmt.Errorf("Can't copy to remote path: %v", err)
|
||||||
}
|
}
|
||||||
if n <= 0 {
|
if n <= 0 {
|
||||||
return n, fmt.Errorf("Zero bytes copied!")
|
return n, fmt.Errorf("zero bytes copied")
|
||||||
}
|
}
|
||||||
return n, nil
|
return n, nil
|
||||||
}
|
}
|
||||||
@@ -391,10 +391,10 @@ func (obj *SSH) Tunnel() error {
|
|||||||
var err error
|
var err error
|
||||||
|
|
||||||
if len(obj.clientURLs) < 1 {
|
if len(obj.clientURLs) < 1 {
|
||||||
return fmt.Errorf("Need at least one client URL to tunnel!")
|
return fmt.Errorf("need at least one client URL to tunnel")
|
||||||
}
|
}
|
||||||
if len(obj.remoteURLs) < 1 {
|
if len(obj.remoteURLs) < 1 {
|
||||||
return fmt.Errorf("Need at least one remote URL to tunnel!")
|
return fmt.Errorf("need at least one remote URL to tunnel")
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: do something less arbitrary about which one we pick?
|
// TODO: do something less arbitrary about which one we pick?
|
||||||
@@ -477,10 +477,10 @@ func (obj *SSH) TunnelClose() error {
|
|||||||
// Exec runs the binary on the remote server.
|
// Exec runs the binary on the remote server.
|
||||||
func (obj *SSH) Exec() error {
|
func (obj *SSH) Exec() error {
|
||||||
if obj.execpath == "" {
|
if obj.execpath == "" {
|
||||||
return fmt.Errorf("Must have a binary path to execute!")
|
return fmt.Errorf("must have a binary path to execute")
|
||||||
}
|
}
|
||||||
if obj.filepath == "" {
|
if obj.filepath == "" {
|
||||||
return fmt.Errorf("Must have a graph definition to run!")
|
return fmt.Errorf("must have a graph definition to run")
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
@@ -772,7 +772,7 @@ func (obj *Remotes) NewSSH(file string) (*SSH, error) {
|
|||||||
}
|
}
|
||||||
host = x[0]
|
host = x[0]
|
||||||
if host == "" {
|
if host == "" {
|
||||||
return nil, fmt.Errorf("Empty hostname!")
|
return nil, fmt.Errorf("empty hostname")
|
||||||
}
|
}
|
||||||
|
|
||||||
user := defaultUser // default
|
user := defaultUser // default
|
||||||
@@ -795,7 +795,7 @@ func (obj *Remotes) NewSSH(file string) (*SSH, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if len(auth) == 0 {
|
if len(auth) == 0 {
|
||||||
return nil, fmt.Errorf("No authentication methods available!")
|
return nil, fmt.Errorf("no authentication methods available")
|
||||||
}
|
}
|
||||||
|
|
||||||
//hostname := config.Hostname // TODO: optionally specify local hostname somehow
|
//hostname := config.Hostname // TODO: optionally specify local hostname somehow
|
||||||
@@ -804,7 +804,7 @@ func (obj *Remotes) NewSSH(file string) (*SSH, error) {
|
|||||||
hostname = host // default to above
|
hostname = host // default to above
|
||||||
}
|
}
|
||||||
if util.StrInList(hostname, obj.hostnames) {
|
if util.StrInList(hostname, obj.hostnames) {
|
||||||
return nil, fmt.Errorf("Remote: Hostname `%s` already exists!", hostname)
|
return nil, fmt.Errorf("Remote: Hostname `%s` already exists", hostname)
|
||||||
}
|
}
|
||||||
obj.hostnames = append(obj.hostnames, hostname)
|
obj.hostnames = append(obj.hostnames, hostname)
|
||||||
|
|
||||||
@@ -830,7 +830,7 @@ func (obj *Remotes) NewSSH(file string) (*SSH, error) {
|
|||||||
// sshKeyAuth is a helper function to get the ssh key auth struct needed
|
// sshKeyAuth is a helper function to get the ssh key auth struct needed
|
||||||
func (obj *Remotes) sshKeyAuth() (ssh.AuthMethod, error) {
|
func (obj *Remotes) sshKeyAuth() (ssh.AuthMethod, error) {
|
||||||
if obj.sshPrivIdRsa == "" {
|
if obj.sshPrivIdRsa == "" {
|
||||||
return nil, fmt.Errorf("Empty path specified!")
|
return nil, fmt.Errorf("empty path specified")
|
||||||
}
|
}
|
||||||
p := ""
|
p := ""
|
||||||
// TODO: this doesn't match strings of the form: ~james/.ssh/id_rsa
|
// TODO: this doesn't match strings of the form: ~james/.ssh/id_rsa
|
||||||
@@ -843,7 +843,7 @@ func (obj *Remotes) sshKeyAuth() (ssh.AuthMethod, error) {
|
|||||||
p = path.Join(usr.HomeDir, obj.sshPrivIdRsa[len("~/"):])
|
p = path.Join(usr.HomeDir, obj.sshPrivIdRsa[len("~/"):])
|
||||||
}
|
}
|
||||||
if p == "" {
|
if p == "" {
|
||||||
return nil, fmt.Errorf("Empty path specified!")
|
return nil, fmt.Errorf("empty path specified")
|
||||||
}
|
}
|
||||||
// A public key may be used to authenticate against the server by using
|
// A public key may be used to authenticate against the server by using
|
||||||
// an unencrypted PEM-encoded private key file. If you have an encrypted
|
// an unencrypted PEM-encoded private key file. If you have an encrypted
|
||||||
@@ -892,7 +892,7 @@ func (obj *Remotes) passwordCallback(user, host string) func() (string, error) {
|
|||||||
case e := <-failchan:
|
case e := <-failchan:
|
||||||
return "", e
|
return "", e
|
||||||
case <-util.TimeAfterOrBlock(timeout):
|
case <-util.TimeAfterOrBlock(timeout):
|
||||||
return "", fmt.Errorf("Interactive timeout reached!")
|
return "", fmt.Errorf("interactive timeout reached")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return cb
|
return cb
|
||||||
|
|||||||
@@ -25,7 +25,6 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/purpleidea/mgmt/event"
|
"github.com/purpleidea/mgmt/event"
|
||||||
"github.com/purpleidea/mgmt/util"
|
"github.com/purpleidea/mgmt/util"
|
||||||
@@ -111,22 +110,7 @@ func (obj *ExecRes) BufioChanScanner(scanner *bufio.Scanner) (chan string, chan
|
|||||||
|
|
||||||
// Watch is the primary listener for this resource and it outputs events.
|
// Watch is the primary listener for this resource and it outputs events.
|
||||||
func (obj *ExecRes) Watch(processChan chan event.Event) error {
|
func (obj *ExecRes) Watch(processChan chan event.Event) error {
|
||||||
if obj.IsWatching() {
|
cuid := obj.Converger() // get the converger uid used to report status
|
||||||
return nil
|
|
||||||
}
|
|
||||||
obj.SetWatching(true)
|
|
||||||
defer obj.SetWatching(false)
|
|
||||||
cuid := obj.converger.Register()
|
|
||||||
defer cuid.Unregister()
|
|
||||||
|
|
||||||
var startup bool
|
|
||||||
Startup := func(block bool) <-chan time.Time {
|
|
||||||
if block {
|
|
||||||
return nil // blocks forever
|
|
||||||
//return make(chan time.Time) // blocks forever
|
|
||||||
}
|
|
||||||
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
|
|
||||||
}
|
|
||||||
|
|
||||||
var send = false // send event?
|
var send = false // send event?
|
||||||
var exit = false
|
var exit = false
|
||||||
@@ -169,6 +153,11 @@ func (obj *ExecRes) Watch(processChan chan event.Event) error {
|
|||||||
bufioch, errch = obj.BufioChanScanner(scanner)
|
bufioch, errch = obj.BufioChanScanner(scanner)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// notify engine that we're running
|
||||||
|
if err := obj.Running(processChan); err != nil {
|
||||||
|
return err // bubble up a NACK...
|
||||||
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
obj.SetState(ResStateWatching) // reset
|
obj.SetState(ResStateWatching) // reset
|
||||||
select {
|
select {
|
||||||
@@ -199,15 +188,10 @@ func (obj *ExecRes) Watch(processChan chan event.Event) error {
|
|||||||
case <-cuid.ConvergedTimer():
|
case <-cuid.ConvergedTimer():
|
||||||
cuid.SetConverged(true) // converged!
|
cuid.SetConverged(true) // converged!
|
||||||
continue
|
continue
|
||||||
|
|
||||||
case <-Startup(startup):
|
|
||||||
cuid.SetConverged(false)
|
|
||||||
send = true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// do all our event sending all together to avoid duplicate msgs
|
// do all our event sending all together to avoid duplicate msgs
|
||||||
if send {
|
if send {
|
||||||
startup = true // startup finished
|
|
||||||
send = false
|
send = false
|
||||||
// it is okay to invalidate the clean state on poke too
|
// it is okay to invalidate the clean state on poke too
|
||||||
obj.StateOK(false) // something made state dirty
|
obj.StateOK(false) // something made state dirty
|
||||||
|
|||||||
@@ -30,7 +30,6 @@ import (
|
|||||||
"path"
|
"path"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/purpleidea/mgmt/event"
|
"github.com/purpleidea/mgmt/event"
|
||||||
"github.com/purpleidea/mgmt/recwatch"
|
"github.com/purpleidea/mgmt/recwatch"
|
||||||
@@ -142,22 +141,7 @@ func (obj *FileRes) Validate() error {
|
|||||||
// must be restarted. On a clean exit it returns nil.
|
// must be restarted. On a clean exit it returns nil.
|
||||||
// FIXME: Also watch the source directory when using obj.Source !!!
|
// FIXME: Also watch the source directory when using obj.Source !!!
|
||||||
func (obj *FileRes) Watch(processChan chan event.Event) error {
|
func (obj *FileRes) Watch(processChan chan event.Event) error {
|
||||||
if obj.IsWatching() {
|
cuid := obj.Converger() // get the converger uid used to report status
|
||||||
return nil // TODO: should this be an error?
|
|
||||||
}
|
|
||||||
obj.SetWatching(true)
|
|
||||||
defer obj.SetWatching(false)
|
|
||||||
cuid := obj.converger.Register()
|
|
||||||
defer cuid.Unregister()
|
|
||||||
|
|
||||||
var startup bool
|
|
||||||
Startup := func(block bool) <-chan time.Time {
|
|
||||||
if block {
|
|
||||||
return nil // blocks forever
|
|
||||||
//return make(chan time.Time) // blocks forever
|
|
||||||
}
|
|
||||||
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
|
|
||||||
}
|
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
obj.recWatcher, err = recwatch.NewRecWatcher(obj.Path, obj.Recurse)
|
obj.recWatcher, err = recwatch.NewRecWatcher(obj.Path, obj.Recurse)
|
||||||
@@ -166,6 +150,11 @@ func (obj *FileRes) Watch(processChan chan event.Event) error {
|
|||||||
}
|
}
|
||||||
defer obj.recWatcher.Close()
|
defer obj.recWatcher.Close()
|
||||||
|
|
||||||
|
// notify engine that we're running
|
||||||
|
if err := obj.Running(processChan); err != nil {
|
||||||
|
return err // bubble up a NACK...
|
||||||
|
}
|
||||||
|
|
||||||
var send = false // send event?
|
var send = false // send event?
|
||||||
var exit = false
|
var exit = false
|
||||||
|
|
||||||
@@ -200,16 +189,10 @@ func (obj *FileRes) Watch(processChan chan event.Event) error {
|
|||||||
case <-cuid.ConvergedTimer():
|
case <-cuid.ConvergedTimer():
|
||||||
cuid.SetConverged(true) // converged!
|
cuid.SetConverged(true) // converged!
|
||||||
continue
|
continue
|
||||||
|
|
||||||
case <-Startup(startup):
|
|
||||||
cuid.SetConverged(false)
|
|
||||||
send = true
|
|
||||||
obj.StateOK(false) // dirty
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// do all our event sending all together to avoid duplicate msgs
|
// do all our event sending all together to avoid duplicate msgs
|
||||||
if send {
|
if send {
|
||||||
startup = true // startup finished
|
|
||||||
send = false
|
send = false
|
||||||
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
|
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
|
||||||
return err // we exit or bubble up a NACK...
|
return err // we exit or bubble up a NACK...
|
||||||
@@ -787,7 +770,7 @@ func (obj *FileRes) Compare(res Res) bool {
|
|||||||
if obj.Name != res.Name {
|
if obj.Name != res.Name {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if obj.path != res.Path {
|
if obj.path != res.path {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if (obj.Content == nil) != (res.Content == nil) { // xor
|
if (obj.Content == nil) != (res.Content == nil) { // xor
|
||||||
|
|||||||
@@ -22,7 +22,6 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/purpleidea/mgmt/event"
|
"github.com/purpleidea/mgmt/event"
|
||||||
"github.com/purpleidea/mgmt/util"
|
"github.com/purpleidea/mgmt/util"
|
||||||
@@ -109,22 +108,7 @@ func (obj *HostnameRes) Validate() error {
|
|||||||
|
|
||||||
// Watch is the primary listener for this resource and it outputs events.
|
// Watch is the primary listener for this resource and it outputs events.
|
||||||
func (obj *HostnameRes) Watch(processChan chan event.Event) error {
|
func (obj *HostnameRes) Watch(processChan chan event.Event) error {
|
||||||
if obj.IsWatching() {
|
cuid := obj.Converger() // get the converger uid used to report status
|
||||||
return nil // TODO: should this be an error?
|
|
||||||
}
|
|
||||||
obj.SetWatching(true)
|
|
||||||
defer obj.SetWatching(false)
|
|
||||||
cuid := obj.converger.Register()
|
|
||||||
defer cuid.Unregister()
|
|
||||||
|
|
||||||
var startup bool
|
|
||||||
Startup := func(block bool) <-chan time.Time {
|
|
||||||
if block {
|
|
||||||
return nil // blocks forever
|
|
||||||
//return make(chan time.Time) // blocks forever
|
|
||||||
}
|
|
||||||
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
|
|
||||||
}
|
|
||||||
|
|
||||||
// if we share the bus with others, we will get each others messages!!
|
// if we share the bus with others, we will get each others messages!!
|
||||||
bus, err := util.SystemBusPrivateUsable() // don't share the bus connection!
|
bus, err := util.SystemBusPrivateUsable() // don't share the bus connection!
|
||||||
@@ -142,6 +126,11 @@ func (obj *HostnameRes) Watch(processChan chan event.Event) error {
|
|||||||
signals := make(chan *dbus.Signal, 10) // closed by dbus package
|
signals := make(chan *dbus.Signal, 10) // closed by dbus package
|
||||||
bus.Signal(signals)
|
bus.Signal(signals)
|
||||||
|
|
||||||
|
// notify engine that we're running
|
||||||
|
if err := obj.Running(processChan); err != nil {
|
||||||
|
return err // bubble up a NACK...
|
||||||
|
}
|
||||||
|
|
||||||
var send = false // send event?
|
var send = false // send event?
|
||||||
|
|
||||||
for {
|
for {
|
||||||
@@ -164,15 +153,10 @@ func (obj *HostnameRes) Watch(processChan chan event.Event) error {
|
|||||||
case <-cuid.ConvergedTimer():
|
case <-cuid.ConvergedTimer():
|
||||||
cuid.SetConverged(true) // converged!
|
cuid.SetConverged(true) // converged!
|
||||||
continue
|
continue
|
||||||
|
|
||||||
case <-Startup(startup):
|
|
||||||
cuid.SetConverged(false)
|
|
||||||
send = true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// do all our event sending all together to avoid duplicate msgs
|
// do all our event sending all together to avoid duplicate msgs
|
||||||
if send {
|
if send {
|
||||||
startup = true // startup finished
|
|
||||||
send = false
|
send = false
|
||||||
|
|
||||||
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
|
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
|
||||||
|
|||||||
@@ -23,7 +23,6 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
"regexp"
|
"regexp"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/purpleidea/mgmt/event"
|
"github.com/purpleidea/mgmt/event"
|
||||||
|
|
||||||
@@ -136,21 +135,11 @@ func (obj *MsgRes) journalPriority() journal.Priority {
|
|||||||
|
|
||||||
// Watch is the primary listener for this resource and it outputs events.
|
// Watch is the primary listener for this resource and it outputs events.
|
||||||
func (obj *MsgRes) Watch(processChan chan event.Event) error {
|
func (obj *MsgRes) Watch(processChan chan event.Event) error {
|
||||||
if obj.IsWatching() {
|
cuid := obj.Converger() // get the converger uid used to report status
|
||||||
return nil
|
|
||||||
}
|
|
||||||
obj.SetWatching(true)
|
|
||||||
defer obj.SetWatching(false)
|
|
||||||
cuid := obj.converger.Register()
|
|
||||||
defer cuid.Unregister()
|
|
||||||
|
|
||||||
var startup bool
|
// notify engine that we're running
|
||||||
Startup := func(block bool) <-chan time.Time {
|
if err := obj.Running(processChan); err != nil {
|
||||||
if block {
|
return err // bubble up a NACK...
|
||||||
return nil // blocks forever
|
|
||||||
//return make(chan time.Time) // blocks forever
|
|
||||||
}
|
|
||||||
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var send = false // send event?
|
var send = false // send event?
|
||||||
@@ -168,18 +157,11 @@ func (obj *MsgRes) Watch(processChan chan event.Event) error {
|
|||||||
case <-cuid.ConvergedTimer():
|
case <-cuid.ConvergedTimer():
|
||||||
cuid.SetConverged(true) // converged!
|
cuid.SetConverged(true) // converged!
|
||||||
continue
|
continue
|
||||||
|
|
||||||
case <-Startup(startup):
|
|
||||||
cuid.SetConverged(false)
|
|
||||||
send = true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// do all our event sending all together to avoid duplicate msgs
|
// do all our event sending all together to avoid duplicate msgs
|
||||||
if send {
|
if send {
|
||||||
startup = true // startup finished
|
|
||||||
send = false
|
send = false
|
||||||
// only do this on certain types of events
|
|
||||||
//obj.isStateOK = false // something made state dirty
|
|
||||||
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
|
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
|
||||||
return err // we exit or bubble up a NACK...
|
return err // we exit or bubble up a NACK...
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -20,7 +20,6 @@ package resources
|
|||||||
import (
|
import (
|
||||||
"encoding/gob"
|
"encoding/gob"
|
||||||
"log"
|
"log"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/purpleidea/mgmt/event"
|
"github.com/purpleidea/mgmt/event"
|
||||||
)
|
)
|
||||||
@@ -60,21 +59,11 @@ func (obj *NoopRes) Validate() error {
|
|||||||
|
|
||||||
// Watch is the primary listener for this resource and it outputs events.
|
// Watch is the primary listener for this resource and it outputs events.
|
||||||
func (obj *NoopRes) Watch(processChan chan event.Event) error {
|
func (obj *NoopRes) Watch(processChan chan event.Event) error {
|
||||||
if obj.IsWatching() {
|
cuid := obj.Converger() // get the converger uid used to report status
|
||||||
return nil // TODO: should this be an error?
|
|
||||||
}
|
|
||||||
obj.SetWatching(true)
|
|
||||||
defer obj.SetWatching(false)
|
|
||||||
cuid := obj.converger.Register()
|
|
||||||
defer cuid.Unregister()
|
|
||||||
|
|
||||||
var startup bool
|
// notify engine that we're running
|
||||||
Startup := func(block bool) <-chan time.Time {
|
if err := obj.Running(processChan); err != nil {
|
||||||
if block {
|
return err // bubble up a NACK...
|
||||||
return nil // blocks forever
|
|
||||||
//return make(chan time.Time) // blocks forever
|
|
||||||
}
|
|
||||||
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var send = false // send event?
|
var send = false // send event?
|
||||||
@@ -92,15 +81,10 @@ func (obj *NoopRes) Watch(processChan chan event.Event) error {
|
|||||||
case <-cuid.ConvergedTimer():
|
case <-cuid.ConvergedTimer():
|
||||||
cuid.SetConverged(true) // converged!
|
cuid.SetConverged(true) // converged!
|
||||||
continue
|
continue
|
||||||
|
|
||||||
case <-Startup(startup):
|
|
||||||
cuid.SetConverged(false)
|
|
||||||
send = true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// do all our event sending all together to avoid duplicate msgs
|
// do all our event sending all together to avoid duplicate msgs
|
||||||
if send {
|
if send {
|
||||||
startup = true // startup finished
|
|
||||||
send = false
|
send = false
|
||||||
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
|
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
|
||||||
return err // we exit or bubble up a NACK...
|
return err // we exit or bubble up a NACK...
|
||||||
|
|||||||
@@ -22,7 +22,6 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/purpleidea/mgmt/event"
|
"github.com/purpleidea/mgmt/event"
|
||||||
"github.com/purpleidea/mgmt/util"
|
"github.com/purpleidea/mgmt/util"
|
||||||
@@ -96,22 +95,7 @@ func (obj *NspawnRes) Validate() error {
|
|||||||
|
|
||||||
// Watch for state changes and sends a message to the bus if there is a change
|
// Watch for state changes and sends a message to the bus if there is a change
|
||||||
func (obj *NspawnRes) Watch(processChan chan event.Event) error {
|
func (obj *NspawnRes) Watch(processChan chan event.Event) error {
|
||||||
if obj.IsWatching() {
|
cuid := obj.Converger() // get the converger uid used to report status
|
||||||
return nil
|
|
||||||
}
|
|
||||||
obj.SetWatching(true)
|
|
||||||
defer obj.SetWatching(false)
|
|
||||||
cuid := obj.converger.Register()
|
|
||||||
defer cuid.Unregister()
|
|
||||||
|
|
||||||
var startup bool
|
|
||||||
Startup := func(block bool) <-chan time.Time {
|
|
||||||
if block {
|
|
||||||
return nil // blocks forever
|
|
||||||
}
|
|
||||||
// 1/2 the resolution of converged timeout
|
|
||||||
return time.After(time.Duration(500) * time.Millisecond)
|
|
||||||
}
|
|
||||||
|
|
||||||
// this resource depends on systemd ensure that it's running
|
// this resource depends on systemd ensure that it's running
|
||||||
if !systemdUtil.IsRunningSystemd() {
|
if !systemdUtil.IsRunningSystemd() {
|
||||||
@@ -135,6 +119,11 @@ func (obj *NspawnRes) Watch(processChan chan event.Event) error {
|
|||||||
buschan := make(chan *dbus.Signal, 10)
|
buschan := make(chan *dbus.Signal, 10)
|
||||||
bus.Signal(buschan)
|
bus.Signal(buschan)
|
||||||
|
|
||||||
|
// notify engine that we're running
|
||||||
|
if err := obj.Running(processChan); err != nil {
|
||||||
|
return err // bubble up a NACK...
|
||||||
|
}
|
||||||
|
|
||||||
var send = false
|
var send = false
|
||||||
var exit = false
|
var exit = false
|
||||||
|
|
||||||
@@ -165,16 +154,10 @@ func (obj *NspawnRes) Watch(processChan chan event.Event) error {
|
|||||||
case <-cuid.ConvergedTimer():
|
case <-cuid.ConvergedTimer():
|
||||||
cuid.SetConverged(true) // converged!
|
cuid.SetConverged(true) // converged!
|
||||||
continue
|
continue
|
||||||
|
|
||||||
case <-Startup(startup):
|
|
||||||
cuid.SetConverged(false)
|
|
||||||
send = true
|
|
||||||
obj.StateOK(false) // dirty
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// do all our event sending all together to avoid duplicate msgs
|
// do all our event sending all together to avoid duplicate msgs
|
||||||
if send {
|
if send {
|
||||||
startup = true // startup finished
|
|
||||||
send = false
|
send = false
|
||||||
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
|
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
|
||||||
return err // we exit or bubble up a NACK...
|
return err // we exit or bubble up a NACK...
|
||||||
|
|||||||
@@ -27,7 +27,6 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/purpleidea/mgmt/event"
|
"github.com/purpleidea/mgmt/event"
|
||||||
"github.com/purpleidea/mgmt/recwatch"
|
"github.com/purpleidea/mgmt/recwatch"
|
||||||
@@ -169,22 +168,7 @@ Loop:
|
|||||||
|
|
||||||
// Watch is the primary listener for this resource and it outputs events.
|
// Watch is the primary listener for this resource and it outputs events.
|
||||||
func (obj *PasswordRes) Watch(processChan chan event.Event) error {
|
func (obj *PasswordRes) Watch(processChan chan event.Event) error {
|
||||||
if obj.IsWatching() {
|
cuid := obj.Converger() // get the converger uid used to report status
|
||||||
return nil // TODO: should this be an error?
|
|
||||||
}
|
|
||||||
obj.SetWatching(true)
|
|
||||||
defer obj.SetWatching(false)
|
|
||||||
cuid := obj.converger.Register()
|
|
||||||
defer cuid.Unregister()
|
|
||||||
|
|
||||||
var startup bool
|
|
||||||
Startup := func(block bool) <-chan time.Time {
|
|
||||||
if block {
|
|
||||||
return nil // blocks forever
|
|
||||||
//return make(chan time.Time) // blocks forever
|
|
||||||
}
|
|
||||||
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
|
|
||||||
}
|
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
obj.recWatcher, err = recwatch.NewRecWatcher(obj.path, false)
|
obj.recWatcher, err = recwatch.NewRecWatcher(obj.path, false)
|
||||||
@@ -193,6 +177,11 @@ func (obj *PasswordRes) Watch(processChan chan event.Event) error {
|
|||||||
}
|
}
|
||||||
defer obj.recWatcher.Close()
|
defer obj.recWatcher.Close()
|
||||||
|
|
||||||
|
// notify engine that we're running
|
||||||
|
if err := obj.Running(processChan); err != nil {
|
||||||
|
return err // bubble up a NACK...
|
||||||
|
}
|
||||||
|
|
||||||
var send = false // send event?
|
var send = false // send event?
|
||||||
var exit = false
|
var exit = false
|
||||||
for {
|
for {
|
||||||
@@ -220,15 +209,10 @@ func (obj *PasswordRes) Watch(processChan chan event.Event) error {
|
|||||||
case <-cuid.ConvergedTimer():
|
case <-cuid.ConvergedTimer():
|
||||||
cuid.SetConverged(true) // converged!
|
cuid.SetConverged(true) // converged!
|
||||||
continue
|
continue
|
||||||
|
|
||||||
case <-Startup(startup):
|
|
||||||
cuid.SetConverged(false)
|
|
||||||
send = true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// do all our event sending all together to avoid duplicate msgs
|
// do all our event sending all together to avoid duplicate msgs
|
||||||
if send {
|
if send {
|
||||||
startup = true // startup finished
|
|
||||||
send = false
|
send = false
|
||||||
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
|
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
|
||||||
return err // we exit or bubble up a NACK...
|
return err // we exit or bubble up a NACK...
|
||||||
|
|||||||
@@ -23,7 +23,6 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
"path"
|
"path"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/purpleidea/mgmt/event"
|
"github.com/purpleidea/mgmt/event"
|
||||||
"github.com/purpleidea/mgmt/resources/packagekit"
|
"github.com/purpleidea/mgmt/resources/packagekit"
|
||||||
@@ -110,22 +109,7 @@ func (obj *PkgRes) Validate() error {
|
|||||||
// TODO: https://github.com/hughsie/PackageKit/issues/109
|
// TODO: https://github.com/hughsie/PackageKit/issues/109
|
||||||
// TODO: https://github.com/hughsie/PackageKit/issues/110
|
// TODO: https://github.com/hughsie/PackageKit/issues/110
|
||||||
func (obj *PkgRes) Watch(processChan chan event.Event) error {
|
func (obj *PkgRes) Watch(processChan chan event.Event) error {
|
||||||
if obj.IsWatching() {
|
cuid := obj.Converger() // get the converger uid used to report status
|
||||||
return nil
|
|
||||||
}
|
|
||||||
obj.SetWatching(true)
|
|
||||||
defer obj.SetWatching(false)
|
|
||||||
cuid := obj.converger.Register()
|
|
||||||
defer cuid.Unregister()
|
|
||||||
|
|
||||||
var startup bool
|
|
||||||
Startup := func(block bool) <-chan time.Time {
|
|
||||||
if block {
|
|
||||||
return nil // blocks forever
|
|
||||||
//return make(chan time.Time) // blocks forever
|
|
||||||
}
|
|
||||||
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
|
|
||||||
}
|
|
||||||
|
|
||||||
bus := packagekit.NewBus()
|
bus := packagekit.NewBus()
|
||||||
if bus == nil {
|
if bus == nil {
|
||||||
@@ -138,6 +122,11 @@ func (obj *PkgRes) Watch(processChan chan event.Event) error {
|
|||||||
return errwrap.Wrapf(err, "Error adding signal match")
|
return errwrap.Wrapf(err, "Error adding signal match")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// notify engine that we're running
|
||||||
|
if err := obj.Running(processChan); err != nil {
|
||||||
|
return err // bubble up a NACK...
|
||||||
|
}
|
||||||
|
|
||||||
var send = false // send event?
|
var send = false // send event?
|
||||||
var exit = false
|
var exit = false
|
||||||
|
|
||||||
@@ -175,16 +164,10 @@ func (obj *PkgRes) Watch(processChan chan event.Event) error {
|
|||||||
case <-cuid.ConvergedTimer():
|
case <-cuid.ConvergedTimer():
|
||||||
cuid.SetConverged(true) // converged!
|
cuid.SetConverged(true) // converged!
|
||||||
continue
|
continue
|
||||||
|
|
||||||
case <-Startup(startup):
|
|
||||||
cuid.SetConverged(false)
|
|
||||||
send = true
|
|
||||||
obj.StateOK(false) // dirty
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// do all our event sending all together to avoid duplicate msgs
|
// do all our event sending all together to avoid duplicate msgs
|
||||||
if send {
|
if send {
|
||||||
startup = true // startup finished
|
|
||||||
send = false
|
send = false
|
||||||
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
|
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
|
||||||
return err // we exit or bubble up a NACK...
|
return err // we exit or bubble up a NACK...
|
||||||
|
|||||||
@@ -130,6 +130,9 @@ type Base interface {
|
|||||||
AssociateData(*Data)
|
AssociateData(*Data)
|
||||||
IsWatching() bool
|
IsWatching() bool
|
||||||
SetWatching(bool)
|
SetWatching(bool)
|
||||||
|
RegisterConverger()
|
||||||
|
UnregisterConverger()
|
||||||
|
Converger() converger.ConvergerUID
|
||||||
GetState() ResState
|
GetState() ResState
|
||||||
SetState(ResState)
|
SetState(ResState)
|
||||||
DoSend(chan event.Event, string) (bool, error)
|
DoSend(chan event.Event, string) (bool, error)
|
||||||
@@ -147,6 +150,9 @@ type Base interface {
|
|||||||
GetGroup() []Res // return everyone grouped inside me
|
GetGroup() []Res // return everyone grouped inside me
|
||||||
SetGroup([]Res)
|
SetGroup([]Res)
|
||||||
VarDir(string) (string, error)
|
VarDir(string) (string, error)
|
||||||
|
Running(chan event.Event) error // notify the engine that Watch started
|
||||||
|
Started() <-chan struct{} // returns when the resource has started
|
||||||
|
Starter(bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Res is the minimum interface you need to implement to define a new resource.
|
// Res is the minimum interface you need to implement to define a new resource.
|
||||||
@@ -171,14 +177,17 @@ type BaseRes struct {
|
|||||||
kind string
|
kind string
|
||||||
events chan event.Event
|
events chan event.Event
|
||||||
converger converger.Converger // converged tracking
|
converger converger.Converger // converged tracking
|
||||||
prefix string // base prefix for this resource
|
cuid converger.ConvergerUID
|
||||||
|
prefix string // base prefix for this resource
|
||||||
debug bool
|
debug bool
|
||||||
state ResState
|
state ResState
|
||||||
watching bool // is Watch() loop running ?
|
watching bool // is Watch() loop running ?
|
||||||
isStateOK bool // whether the state is okay based on events or not
|
started chan struct{} // closed when worker is started/running
|
||||||
isGrouped bool // am i contained within a group?
|
starter bool // does this have indegree == 0 ? XXX: usually?
|
||||||
grouped []Res // list of any grouped resources
|
isStateOK bool // whether the state is okay based on events or not
|
||||||
refresh bool // does this resource have a refresh to run?
|
isGrouped bool // am i contained within a group?
|
||||||
|
grouped []Res // list of any grouped resources
|
||||||
|
refresh bool // does this resource have a refresh to run?
|
||||||
//refreshState StatefulBool // TODO: future stateful bool
|
//refreshState StatefulBool // TODO: future stateful bool
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -229,6 +238,7 @@ func (obj *BaseRes) Init() error {
|
|||||||
return fmt.Errorf("Resource did not set kind!")
|
return fmt.Errorf("Resource did not set kind!")
|
||||||
}
|
}
|
||||||
obj.events = make(chan event.Event) // unbuffered chan to avoid stale events
|
obj.events = make(chan event.Event) // unbuffered chan to avoid stale events
|
||||||
|
obj.started = make(chan struct{}) // closes when started
|
||||||
//dir, err := obj.VarDir("")
|
//dir, err := obj.VarDir("")
|
||||||
//if err != nil {
|
//if err != nil {
|
||||||
// return errwrap.Wrapf(err, "VarDir failed in Init()")
|
// return errwrap.Wrapf(err, "VarDir failed in Init()")
|
||||||
@@ -275,16 +285,34 @@ func (obj *BaseRes) AssociateData(data *Data) {
|
|||||||
obj.debug = data.Debug
|
obj.debug = data.Debug
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsWatching tells us if the Watch() function is running.
|
// IsWatching tells us if the Worker() function is running.
|
||||||
func (obj *BaseRes) IsWatching() bool {
|
func (obj *BaseRes) IsWatching() bool {
|
||||||
return obj.watching
|
return obj.watching
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetWatching stores the status of if the Watch() function is running.
|
// SetWatching stores the status of if the Worker() function is running.
|
||||||
func (obj *BaseRes) SetWatching(b bool) {
|
func (obj *BaseRes) SetWatching(b bool) {
|
||||||
obj.watching = b
|
obj.watching = b
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RegisterConverger sets up the cuid for the resource. This is a helper
|
||||||
|
// function for the engine, and shouldn't be called by the resources directly.
|
||||||
|
func (obj *BaseRes) RegisterConverger() {
|
||||||
|
obj.cuid = obj.converger.Register()
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnregisterConverger tears down the cuid for the resource. This is a helper
|
||||||
|
// function for the engine, and shouldn't be called by the resources directly.
|
||||||
|
func (obj *BaseRes) UnregisterConverger() {
|
||||||
|
obj.cuid.Unregister()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Converger returns the ConvergerUID for the resource. This should be called
|
||||||
|
// by the Watch method of the resource to set the converged state.
|
||||||
|
func (obj *BaseRes) Converger() converger.ConvergerUID {
|
||||||
|
return obj.cuid
|
||||||
|
}
|
||||||
|
|
||||||
// GetState returns the state of the resource.
|
// GetState returns the state of the resource.
|
||||||
func (obj *BaseRes) GetState() ResState {
|
func (obj *BaseRes) GetState() ResState {
|
||||||
return obj.state
|
return obj.state
|
||||||
@@ -403,6 +431,13 @@ func (obj *BaseRes) VarDir(extra string) (string, error) {
|
|||||||
return p, nil
|
return p, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Started returns a channel that closes when the resource has started up.
|
||||||
|
func (obj *BaseRes) Started() <-chan struct{} { return obj.started }
|
||||||
|
|
||||||
|
// Starter sets the starter bool. This defines if a vertex has an indegree of 0.
|
||||||
|
// If we have an indegree of 0, we'll need to be a poke initiator in the graph.
|
||||||
|
func (obj *BaseRes) Starter(b bool) { obj.starter = b }
|
||||||
|
|
||||||
// ResToB64 encodes a resource to a base64 encoded string (after serialization)
|
// ResToB64 encodes a resource to a base64 encoded string (after serialization)
|
||||||
func ResToB64(res Res) (string, error) {
|
func ResToB64(res Res) (string, error) {
|
||||||
b := bytes.Buffer{}
|
b := bytes.Buffer{}
|
||||||
|
|||||||
@@ -60,8 +60,8 @@ func (obj *BaseRes) ReadEvent(ev *event.Event) (exit, send bool) {
|
|||||||
var poke bool
|
var poke bool
|
||||||
// ensure that a CheckApply runs by sending with a dirty state...
|
// ensure that a CheckApply runs by sending with a dirty state...
|
||||||
if ev.GetActivity() { // if previous node did work, and we were notified...
|
if ev.GetActivity() { // if previous node did work, and we were notified...
|
||||||
obj.StateOK(false) // dirty
|
//obj.StateOK(false) // not necessarily
|
||||||
poke = true // poke!
|
poke = true // poke!
|
||||||
// XXX: this should be elsewhere in case Watch isn't used (eg: Polling instead...)
|
// XXX: this should be elsewhere in case Watch isn't used (eg: Polling instead...)
|
||||||
// XXX: unless this is used in our "fallback" polling implementation???
|
// XXX: unless this is used in our "fallback" polling implementation???
|
||||||
//obj.SetRefresh(true) // TODO: is this redundant?
|
//obj.SetRefresh(true) // TODO: is this redundant?
|
||||||
@@ -108,6 +108,23 @@ func (obj *BaseRes) ReadEvent(ev *event.Event) (exit, send bool) {
|
|||||||
return true, false // required to keep the stupid go compiler happy
|
return true, false // required to keep the stupid go compiler happy
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Running is called by the Watch method of the resource once it has started up.
|
||||||
|
// This signals to the engine to kick off the initial CheckApply resource check.
|
||||||
|
func (obj *BaseRes) Running(processChan chan event.Event) error {
|
||||||
|
obj.StateOK(false) // assume we're initially dirty
|
||||||
|
cuid := obj.Converger() // get the converger uid used to report status
|
||||||
|
cuid.SetConverged(false) // a reasonable initial assumption
|
||||||
|
close(obj.started) // send started signal
|
||||||
|
|
||||||
|
// FIXME: exit return value is unused atm, so ignore it for now...
|
||||||
|
//if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
|
||||||
|
var err error
|
||||||
|
if obj.starter { // vertices of indegree == 0 should send initial pokes
|
||||||
|
_, err = obj.DoSend(processChan, "") // trigger a CheckApply
|
||||||
|
}
|
||||||
|
return err // bubble up any possible error (or nil)
|
||||||
|
}
|
||||||
|
|
||||||
// Send points to a value that a resource will send.
|
// Send points to a value that a resource will send.
|
||||||
type Send struct {
|
type Send struct {
|
||||||
Res Res // a handle to the resource which is sending a value
|
Res Res // a handle to the resource which is sending a value
|
||||||
|
|||||||
@@ -23,7 +23,6 @@ import (
|
|||||||
"encoding/gob"
|
"encoding/gob"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/purpleidea/mgmt/event"
|
"github.com/purpleidea/mgmt/event"
|
||||||
"github.com/purpleidea/mgmt/util"
|
"github.com/purpleidea/mgmt/util"
|
||||||
@@ -76,22 +75,7 @@ func (obj *SvcRes) Validate() error {
|
|||||||
|
|
||||||
// Watch is the primary listener for this resource and it outputs events.
|
// Watch is the primary listener for this resource and it outputs events.
|
||||||
func (obj *SvcRes) Watch(processChan chan event.Event) error {
|
func (obj *SvcRes) Watch(processChan chan event.Event) error {
|
||||||
if obj.IsWatching() {
|
cuid := obj.Converger() // get the converger uid used to report status
|
||||||
return nil
|
|
||||||
}
|
|
||||||
obj.SetWatching(true)
|
|
||||||
defer obj.SetWatching(false)
|
|
||||||
cuid := obj.converger.Register()
|
|
||||||
defer cuid.Unregister()
|
|
||||||
|
|
||||||
var startup bool
|
|
||||||
Startup := func(block bool) <-chan time.Time {
|
|
||||||
if block {
|
|
||||||
return nil // blocks forever
|
|
||||||
//return make(chan time.Time) // blocks forever
|
|
||||||
}
|
|
||||||
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
|
|
||||||
}
|
|
||||||
|
|
||||||
// obj.Name: svc name
|
// obj.Name: svc name
|
||||||
if !systemdUtil.IsRunningSystemd() {
|
if !systemdUtil.IsRunningSystemd() {
|
||||||
@@ -116,6 +100,11 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error {
|
|||||||
buschan := make(chan *dbus.Signal, 10)
|
buschan := make(chan *dbus.Signal, 10)
|
||||||
bus.Signal(buschan)
|
bus.Signal(buschan)
|
||||||
|
|
||||||
|
// notify engine that we're running
|
||||||
|
if err := obj.Running(processChan); err != nil {
|
||||||
|
return err // bubble up a NACK...
|
||||||
|
}
|
||||||
|
|
||||||
var svc = fmt.Sprintf("%s.service", obj.Name) // systemd name
|
var svc = fmt.Sprintf("%s.service", obj.Name) // systemd name
|
||||||
var send = false // send event?
|
var send = false // send event?
|
||||||
var exit = false
|
var exit = false
|
||||||
@@ -175,11 +164,6 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error {
|
|||||||
case <-cuid.ConvergedTimer():
|
case <-cuid.ConvergedTimer():
|
||||||
cuid.SetConverged(true) // converged!
|
cuid.SetConverged(true) // converged!
|
||||||
continue
|
continue
|
||||||
|
|
||||||
case <-Startup(startup):
|
|
||||||
cuid.SetConverged(false)
|
|
||||||
send = true
|
|
||||||
obj.StateOK(false) // dirty
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if !activeSet {
|
if !activeSet {
|
||||||
@@ -227,16 +211,10 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error {
|
|||||||
case <-cuid.ConvergedTimer():
|
case <-cuid.ConvergedTimer():
|
||||||
cuid.SetConverged(true) // converged!
|
cuid.SetConverged(true) // converged!
|
||||||
continue
|
continue
|
||||||
|
|
||||||
case <-Startup(startup):
|
|
||||||
cuid.SetConverged(false)
|
|
||||||
send = true
|
|
||||||
obj.StateOK(false) // dirty
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if send {
|
if send {
|
||||||
startup = true // startup finished
|
|
||||||
send = false
|
send = false
|
||||||
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
|
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
|
||||||
return err // we exit or bubble up a NACK...
|
return err // we exit or bubble up a NACK...
|
||||||
|
|||||||
@@ -74,27 +74,17 @@ func (obj *TimerRes) newTicker() *time.Ticker {
|
|||||||
|
|
||||||
// Watch is the primary listener for this resource and it outputs events.
|
// Watch is the primary listener for this resource and it outputs events.
|
||||||
func (obj *TimerRes) Watch(processChan chan event.Event) error {
|
func (obj *TimerRes) Watch(processChan chan event.Event) error {
|
||||||
if obj.IsWatching() {
|
cuid := obj.Converger() // get the converger uid used to report status
|
||||||
return nil
|
|
||||||
}
|
|
||||||
obj.SetWatching(true)
|
|
||||||
defer obj.SetWatching(false)
|
|
||||||
cuid := obj.converger.Register()
|
|
||||||
defer cuid.Unregister()
|
|
||||||
|
|
||||||
var startup bool
|
|
||||||
Startup := func(block bool) <-chan time.Time {
|
|
||||||
if block {
|
|
||||||
return nil // blocks forever
|
|
||||||
//return make(chan time.Time) // blocks forever
|
|
||||||
}
|
|
||||||
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
|
|
||||||
}
|
|
||||||
|
|
||||||
// create a time.Ticker for the given interval
|
// create a time.Ticker for the given interval
|
||||||
obj.ticker = obj.newTicker()
|
obj.ticker = obj.newTicker()
|
||||||
defer obj.ticker.Stop()
|
defer obj.ticker.Stop()
|
||||||
|
|
||||||
|
// notify engine that we're running
|
||||||
|
if err := obj.Running(processChan); err != nil {
|
||||||
|
return err // bubble up a NACK...
|
||||||
|
}
|
||||||
|
|
||||||
var send = false
|
var send = false
|
||||||
|
|
||||||
for {
|
for {
|
||||||
@@ -113,13 +103,9 @@ func (obj *TimerRes) Watch(processChan chan event.Event) error {
|
|||||||
case <-cuid.ConvergedTimer():
|
case <-cuid.ConvergedTimer():
|
||||||
cuid.SetConverged(true)
|
cuid.SetConverged(true)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
case <-Startup(startup):
|
|
||||||
cuid.SetConverged(false)
|
|
||||||
send = true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if send {
|
if send {
|
||||||
startup = true // startup finished
|
|
||||||
send = false
|
send = false
|
||||||
if exit, err := obj.DoSend(processChan, "timer ticked"); exit || err != nil {
|
if exit, err := obj.DoSend(processChan, "timer ticked"); exit || err != nil {
|
||||||
return err // we exit or bubble up a NACK...
|
return err // we exit or bubble up a NACK...
|
||||||
|
|||||||
@@ -132,22 +132,7 @@ func (obj *VirtRes) connect() (conn libvirt.VirConnection, err error) {
|
|||||||
|
|
||||||
// Watch is the primary listener for this resource and it outputs events.
|
// Watch is the primary listener for this resource and it outputs events.
|
||||||
func (obj *VirtRes) Watch(processChan chan event.Event) error {
|
func (obj *VirtRes) Watch(processChan chan event.Event) error {
|
||||||
if obj.IsWatching() {
|
cuid := obj.Converger() // get the converger uid used to report status
|
||||||
return nil
|
|
||||||
}
|
|
||||||
obj.SetWatching(true)
|
|
||||||
defer obj.SetWatching(false)
|
|
||||||
cuid := obj.converger.Register()
|
|
||||||
defer cuid.Unregister()
|
|
||||||
|
|
||||||
var startup bool
|
|
||||||
Startup := func(block bool) <-chan time.Time {
|
|
||||||
if block {
|
|
||||||
return nil // blocks forever
|
|
||||||
//return make(chan time.Time) // blocks forever
|
|
||||||
}
|
|
||||||
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
|
|
||||||
}
|
|
||||||
|
|
||||||
conn, err := obj.connect()
|
conn, err := obj.connect()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -203,6 +188,11 @@ func (obj *VirtRes) Watch(processChan chan event.Event) error {
|
|||||||
)
|
)
|
||||||
defer conn.DomainEventDeregister(callbackID)
|
defer conn.DomainEventDeregister(callbackID)
|
||||||
|
|
||||||
|
// notify engine that we're running
|
||||||
|
if err := obj.Running(processChan); err != nil {
|
||||||
|
return err // bubble up a NACK...
|
||||||
|
}
|
||||||
|
|
||||||
var send = false
|
var send = false
|
||||||
var exit = false
|
var exit = false
|
||||||
|
|
||||||
@@ -260,15 +250,9 @@ func (obj *VirtRes) Watch(processChan chan event.Event) error {
|
|||||||
case <-cuid.ConvergedTimer():
|
case <-cuid.ConvergedTimer():
|
||||||
cuid.SetConverged(true) // converged!
|
cuid.SetConverged(true) // converged!
|
||||||
continue
|
continue
|
||||||
|
|
||||||
case <-Startup(startup):
|
|
||||||
cuid.SetConverged(false)
|
|
||||||
send = true
|
|
||||||
obj.StateOK(false) // dirty
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if send {
|
if send {
|
||||||
startup = true // startup finished
|
|
||||||
send = false
|
send = false
|
||||||
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
|
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
|
||||||
return err // we exit or bubble up a NACK...
|
return err // we exit or bubble up a NACK...
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ ROOT=$(dirname "${BASH_SOURCE}")/..
|
|||||||
|
|
||||||
GO_VERSION=($(go version))
|
GO_VERSION=($(go version))
|
||||||
|
|
||||||
if [[ -z $(echo "${GO_VERSION[2]}" | grep -E 'go1.2|go1.3|go1.4|go1.5|go1.6|go1.7|go1.8') ]]; then
|
if [[ -z $(echo "${GO_VERSION[2]}" | grep -E 'go1.2|go1.3|go1.4|go1.5|go1.6|go1.7|go1.8|devel') ]]; then
|
||||||
echo "Unknown go version '${GO_VERSION[2]}', failing gofmt."
|
echo "Unknown go version '${GO_VERSION[2]}', failing gofmt."
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|||||||
@@ -74,8 +74,8 @@ func (obj *GAPI) Graph() (*pgraph.Graph, error) {
|
|||||||
return g, err
|
return g, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// SwitchStream returns nil errors every time there could be a new graph.
|
// Next returns nil errors every time there could be a new graph.
|
||||||
func (obj *GAPI) SwitchStream() chan error {
|
func (obj *GAPI) Next() chan error {
|
||||||
if obj.data.NoWatch {
|
if obj.data.NoWatch {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -34,10 +34,6 @@ import (
|
|||||||
"gopkg.in/yaml.v2"
|
"gopkg.in/yaml.v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
Debug = false // FIXME: integrate with global debug flag
|
|
||||||
)
|
|
||||||
|
|
||||||
type collectorResConfig struct {
|
type collectorResConfig struct {
|
||||||
Kind string `yaml:"kind"`
|
Kind string `yaml:"kind"`
|
||||||
Pattern string `yaml:"pattern"` // XXX: Not Implemented
|
Pattern string `yaml:"pattern"` // XXX: Not Implemented
|
||||||
@@ -120,9 +116,6 @@ func (c *GraphConfig) NewGraphFromConfig(hostname string, world gapi.World, noop
|
|||||||
slice := reflect.ValueOf(iface)
|
slice := reflect.ValueOf(iface)
|
||||||
// XXX: should we just drop these everywhere and have the kind strings be all lowercase?
|
// XXX: should we just drop these everywhere and have the kind strings be all lowercase?
|
||||||
kind := util.FirstToUpper(name)
|
kind := util.FirstToUpper(name)
|
||||||
if Debug {
|
|
||||||
log.Printf("Config: Processing: %v...", kind)
|
|
||||||
}
|
|
||||||
for j := 0; j < slice.Len(); j++ { // loop through resources of same kind
|
for j := 0; j < slice.Len(); j++ { // loop through resources of same kind
|
||||||
x := slice.Index(j).Interface()
|
x := slice.Index(j).Interface()
|
||||||
res, ok := x.(resources.Res) // convert to Res type
|
res, ok := x.(resources.Res) // convert to Res type
|
||||||
|
|||||||
Reference in New Issue
Block a user