diff --git a/config.go b/config.go index 1e507efb..54bf1efd 100644 --- a/config.go +++ b/config.go @@ -43,6 +43,7 @@ type edgeConfig struct { To vertexConfig `yaml:"to"` } +// GraphConfig is the data structure that describes a single graph to run. type GraphConfig struct { Graph string `yaml:"graph"` Resources struct { @@ -60,6 +61,7 @@ type GraphConfig struct { Remote string `yaml:"remote"` } +// Parse parses a data stream into the graph structure. func (c *GraphConfig) Parse(data []byte) error { if err := yaml.Unmarshal(data, c); err != nil { return err @@ -70,6 +72,7 @@ func (c *GraphConfig) Parse(data []byte) error { return nil } +// ParseConfigFromFile takes a filename and returns the graph config structure. func ParseConfigFromFile(filename string) *GraphConfig { data, err := ioutil.ReadFile(filename) if err != nil { @@ -295,7 +298,7 @@ func (g *Graph) addEdgesByMatchingUUIDS(v *Vertex, uuids []ResUUID) []bool { return result } -// add auto edges to graph +// AutoEdges adds the automatic edges to the graph. func (g *Graph) AutoEdges() { log.Println("Compile: Adding AutoEdges...") for _, v := range g.GetVertices() { // for each vertexes autoedges diff --git a/etcd.go b/etcd.go index f27d4e8c..6778c189 100644 --- a/etcd.go +++ b/etcd.go @@ -69,6 +69,7 @@ import ( "google.golang.org/grpc" ) +// constant parameters which may need to be tweaked or customized const ( NS = "_mgmt" // root namespace for mgmt operations seedSentinel = "_seed" // you must not name your hostname this @@ -83,7 +84,7 @@ const ( ) var ( - ErrApplyDeltaEventsInconsistent = errors.New("Etcd: ApplyDeltaEvents: Inconsistent key!") + errApplyDeltaEventsInconsistent = errors.New("Etcd: ApplyDeltaEvents: Inconsistent key!") ) // AW is a struct for the AddWatcher queue @@ -161,7 +162,7 @@ type EmbdEtcd struct { // EMBeddeD etcd exitTimeout <-chan time.Time hostname string - memberId uint64 // cluster membership id of server if running + memberID uint64 // cluster membership id of server if running endpoints etcdtypes.URLsMap // map of servers a client could connect to clientURLs etcdtypes.URLs // locations to listen for clients if i am a server serverURLs etcdtypes.URLs // locations to listen for servers if i am a server (peer) @@ -283,8 +284,8 @@ func (obj *EmbdEtcd) Connect(reconnect bool) error { return nil } } - var emax uint16 = 0 - for { // loop until connect + var emax uint16 // = 0 + for { // loop until connect var err error cfg := obj.GetConfig() if eps := obj.endpoints; len(eps) > 0 { @@ -516,8 +517,8 @@ func (obj *EmbdEtcd) CtxError(ctx context.Context, err error) (context.Context, // tmin <= texp^iter - 1 <= tmax // TODO: check my math return time.Duration(math.Min(math.Max(math.Pow(float64(texp), float64(iter))-1.0, float64(tmin)), float64(tmax))) * time.Millisecond } - var isTimeout bool = false - var iter int = 0 + var isTimeout = false + var iter int // = 0 if ctxerr, ok := ctx.Value(ctxErr).(error); ok { if DEBUG { log.Printf("Etcd: CtxError: err(%v), ctxerr(%v)", err, ctxerr) @@ -1055,7 +1056,7 @@ func (obj *EmbdEtcd) rawAddWatcher(ctx context.Context, aw *AW) (func(), error) rch := obj.client.Watcher.Watch(ctx, aw.path, aw.opts...) obj.rLock.RUnlock() var rev int64 - var useRev bool = false + var useRev = false var retry, locked bool = false, false for { response := <-rch // read @@ -1125,7 +1126,7 @@ func (obj *EmbdEtcd) rawAddWatcher(ctx context.Context, aw *AW) (func(), error) // rawCallback is the companion to AddWatcher which runs the callback processing func rawCallback(ctx context.Context, re *RE) error { - var err error = re.err // the watch event itself might have had an error + var err = re.err // the watch event itself might have had an error if err == nil { if callback := re.callback; callback != nil { // TODO: we could add an async option if needed @@ -1290,47 +1291,46 @@ func (obj *EmbdEtcd) volunteerCallback(re *RE) error { log.Printf("Etcd: Quitters: Shutting down %d members...", lq) } for _, quitter := range quitters { - if mID, ok := Uint64KeyFromStrInMap(quitter, membersMap); ok { - EtcdNominate(obj, quitter, nil) // unnominate - // once we issue the above unnominate, that peer will - // shutdown, and this might cause us to loose quorum, - // therefore, let that member remove itself, and then - // double check that it did happen in case delinquent - // TODO: get built-in transactional member Add/Remove - // functionality to avoid a separate nominate list... - if quitter == obj.hostname { // remove in unnominate! - log.Printf("Etcd: Quitters: Removing self...") - continue // TODO: CtxDelayErr ? - } - - log.Printf("Etcd: Waiting %d seconds for %s to self remove...", selfRemoveTimeout, quitter) - time.Sleep(selfRemoveTimeout * time.Second) - // in case the removed member doesn't remove itself, do it! - removed, err := EtcdMemberRemove(obj, mID) - if err != nil { - return fmt.Errorf("Etcd: Member Remove: Error: %+v", err) - } - if removed { - log.Printf("Etcd: Member Removed (forced): %v(%v)", quitter, mID) - } - - // Remove the endpoint from our list to avoid blocking - // future MemberList calls which would try and connect - // to a missing endpoint... The endpoints should get - // updated from the member exiting safely if it doesn't - // crash, but if it did and/or since it's a race to see - // if the update event will get seen before we need the - // new data, just do it now anyways, then update the - // endpoint list and trigger a reconnect. - delete(obj.endpoints, quitter) // proactively delete it - obj.endpointCallback(nil) // update! - log.Printf("Member %s (%d) removed successfully!", quitter, mID) - return &CtxReconnectErr{"a member was removed"} // retry asap and update endpoint list - - } else { + mID, ok := Uint64KeyFromStrInMap(quitter, membersMap) + if !ok { // programming error log.Fatalf("Etcd: Member Remove: Error: %v(%v) not in members list!", quitter, mID) } + EtcdNominate(obj, quitter, nil) // unnominate + // once we issue the above unnominate, that peer will + // shutdown, and this might cause us to loose quorum, + // therefore, let that member remove itself, and then + // double check that it did happen in case delinquent + // TODO: get built-in transactional member Add/Remove + // functionality to avoid a separate nominate list... + if quitter == obj.hostname { // remove in unnominate! + log.Printf("Etcd: Quitters: Removing self...") + continue // TODO: CtxDelayErr ? + } + + log.Printf("Etcd: Waiting %d seconds for %s to self remove...", selfRemoveTimeout, quitter) + time.Sleep(selfRemoveTimeout * time.Second) + // in case the removed member doesn't remove itself, do it! + removed, err := EtcdMemberRemove(obj, mID) + if err != nil { + return fmt.Errorf("Etcd: Member Remove: Error: %+v", err) + } + if removed { + log.Printf("Etcd: Member Removed (forced): %v(%v)", quitter, mID) + } + + // Remove the endpoint from our list to avoid blocking + // future MemberList calls which would try and connect + // to a missing endpoint... The endpoints should get + // updated from the member exiting safely if it doesn't + // crash, but if it did and/or since it's a race to see + // if the update event will get seen before we need the + // new data, just do it now anyways, then update the + // endpoint list and trigger a reconnect. + delete(obj.endpoints, quitter) // proactively delete it + obj.endpointCallback(nil) // update! + log.Printf("Member %s (%d) removed successfully!", quitter, mID) + return &CtxReconnectErr{"a member was removed"} // retry asap and update endpoint list } return nil @@ -1344,7 +1344,7 @@ func (obj *EmbdEtcd) nominateCallback(re *RE) error { defer log.Printf("Trace: Etcd: nominateCallback(): Finished!") } bootstrapping := len(obj.endpoints) == 0 - var revision int64 = 0 + var revision int64 // = 0 if re != nil { revision = re.response.Header.Revision } @@ -1366,7 +1366,7 @@ func (obj *EmbdEtcd) nominateCallback(re *RE) error { nominated := obj.nominated if nominated, err := ApplyDeltaEvents(re, nominated); err == nil { obj.nominated = nominated - } else if !re.retryHint || err != ErrApplyDeltaEventsInconsistent { + } else if !re.retryHint || err != errApplyDeltaEventsInconsistent { log.Fatal(err) } @@ -1442,13 +1442,13 @@ func (obj *EmbdEtcd) nominateCallback(re *RE) error { if len(obj.nominated) != 0 { // don't call if nobody left but me! // this works around: https://github.com/coreos/etcd/issues/5482, // and it probably makes sense to avoid calling if we're the last - log.Printf("Etcd: Member Remove: Removing self: %v", obj.memberId) - removed, err := EtcdMemberRemove(obj, obj.memberId) + log.Printf("Etcd: Member Remove: Removing self: %v", obj.memberID) + removed, err := EtcdMemberRemove(obj, obj.memberID) if err != nil { return fmt.Errorf("Etcd: Member Remove: Error: %+v", err) } if removed { - log.Printf("Etcd: Member Removed (self): %v(%v)", obj.hostname, obj.memberId) + log.Printf("Etcd: Member Removed (self): %v(%v)", obj.hostname, obj.memberID) } } @@ -1512,7 +1512,7 @@ func (obj *EmbdEtcd) endpointCallback(re *RE) error { } // change detection - var changed bool = false // do we need to update? + var changed = false // do we need to update? if len(obj.endpoints) != len(endpoints) { changed = true } @@ -1662,7 +1662,7 @@ func (obj *EmbdEtcd) StartServer(newCluster bool, peerURLsMap etcdtypes.URLsMap) } //log.Fatal(<-obj.server.Err()) XXX log.Printf("Etcd: StartServer: Server running...") - obj.memberId = uint64(obj.server.Server.ID()) // store member id for internal use + obj.memberID = uint64(obj.server.Server.ID()) // store member id for internal use obj.serverwg.Add(1) return nil @@ -1677,7 +1677,7 @@ func (obj *EmbdEtcd) DestroyServer() error { } log.Printf("Etcd: DestroyServer: Done closing...") - obj.memberId = 0 + obj.memberID = 0 if obj.server == nil { // skip the .Done() below because we didn't .Add(1) it. return err } @@ -1901,11 +1901,11 @@ func EtcdAddHostnameConvergedWatcher(obj *EmbdEtcd, callbackFn func(map[string]b internalCbFn := func(re *RE) error { // TODO: get the value from the response, and apply delta... // for now, just run a get operation which is easier to code! - if m, err := EtcdHostnameConverged(obj); err == nil { - return callbackFn(m) // call my function - } else { + m, err := EtcdHostnameConverged(obj) + if err != nil { return err } + return callbackFn(m) // call my function } return obj.AddWatcher(path, internalCbFn, true, true, etcd.WithPrefix()) // no block and no converger reset } @@ -1944,6 +1944,7 @@ func EtcdGetClusterSize(obj *EmbdEtcd) (uint16, error) { return uint16(v), nil } +// EtcdMemberAdd adds a member to the cluster. func EtcdMemberAdd(obj *EmbdEtcd, peerURLs etcdtypes.URLs) (*etcd.MemberAddResponse, error) { //obj.Connect(false) // TODO ? ctx := context.Background() @@ -1967,7 +1968,7 @@ func EtcdMemberAdd(obj *EmbdEtcd, peerURLs etcdtypes.URLs) (*etcd.MemberAddRespo } // EtcdMemberRemove removes a member by mID and returns if it worked, and also -// if there was an error. This is because It might have run without error, but +// if there was an error. This is because it might have run without error, but // the member wasn't found, for example. func EtcdMemberRemove(obj *EmbdEtcd, mID uint64) (bool, error) { //obj.Connect(false) // TODO ? @@ -2260,7 +2261,7 @@ func ApplyDeltaEvents(re *RE, urlsmap etcdtypes.URLsMap) (etcdtypes.URLsMap, err if DEBUG { log.Printf("Etcd: ApplyDeltaEvents: Inconsistent key: %v", key) } - return nil, ErrApplyDeltaEventsInconsistent + return nil, errApplyDeltaEventsInconsistent } delete(urlsmap, key) diff --git a/event.go b/event.go index 0a93edec..b4f58433 100644 --- a/event.go +++ b/event.go @@ -29,8 +29,10 @@ const ( eventBackPoke ) +// Resp is a channel to be used for boolean responses. type Resp chan bool +// Event is the main struct that stores event information and responses. type Event struct { Name eventName Resp Resp // channel to send an ack response on, nil to skip @@ -39,45 +41,46 @@ type Event struct { Activity bool // did something interesting happen? } -// send a single acknowledgement on the channel if one was requested +// ACK sends a single acknowledgement on the channel if one was requested. func (event *Event) ACK() { if event.Resp != nil { // if they've requested an ACK event.Resp.ACK() } } +// NACK sends a negative acknowledgement message on the channel if one was requested. func (event *Event) NACK() { if event.Resp != nil { // if they've requested a NACK event.Resp.NACK() } } -// Resp is just a helper to return the right type of response channel +// NewResp is just a helper to return the right type of response channel. func NewResp() Resp { resp := make(chan bool) return resp } -// ACK sends a true value to resp +// ACK sends a true value to resp. func (resp Resp) ACK() { if resp != nil { resp <- true } } -// NACK sends a false value to resp +// NACK sends a false value to resp. func (resp Resp) NACK() { if resp != nil { resp <- false } } -// Wait waits for any response from a Resp channel and returns it +// Wait waits for any response from a Resp channel and returns it. func (resp Resp) Wait() bool { return <-resp } -// ACKWait waits for a +ive Ack from a Resp channel +// ACKWait waits for a +ive Ack from a Resp channel. func (resp Resp) ACKWait() { for { // wait until true value @@ -87,7 +90,7 @@ func (resp Resp) ACKWait() { } } -// get the activity value +// GetActivity returns the activity value. func (event *Event) GetActivity() bool { return event.Activity } diff --git a/main.go b/main.go index cd7b9fe5..3450b19a 100644 --- a/main.go +++ b/main.go @@ -38,6 +38,7 @@ var ( prefix = fmt.Sprintf("/var/lib/%s/", program) ) +// variables controlling verbosity const ( DEBUG = false // add additional log messages TRACE = false // add execution flow log messages diff --git a/pgraph.go b/pgraph.go index 79375bee..4f463242 100644 --- a/pgraph.go +++ b/pgraph.go @@ -43,6 +43,7 @@ const ( graphStatePaused ) +// Graph is the graph structure in this library. // The graph abstract data type (ADT) is defined as follows: // * the directed graph arrows point from left to right ( -> ) // * the arrows point away from their dependencies (eg: arrows mean "before") @@ -55,15 +56,18 @@ type Graph struct { mutex sync.Mutex // used when modifying graph State variable } +// Vertex is the primary vertex struct in this library. type Vertex struct { Res // anonymous field timestamp int64 // last updated timestamp ? } +// Edge is the primary edge struct in this library. type Edge struct { Name string } +// NewGraph builds a new graph. func NewGraph(name string) *Graph { return &Graph{ Name: name, @@ -72,12 +76,14 @@ func NewGraph(name string) *Graph { } } +// NewVertex returns a new graph vertex struct with a contained resource. func NewVertex(r Res) *Vertex { return &Vertex{ Res: r, } } +// NewEdge returns a new graph edge struct. func NewEdge(name string) *Edge { return &Edge{ Name: name, @@ -97,27 +103,30 @@ func (g *Graph) Copy() *Graph { return newGraph } -// returns the name of the graph +// GetName returns the name of the graph. func (g *Graph) GetName() string { return g.Name } -// set name of the graph +// SetName sets the name of the graph. func (g *Graph) SetName(name string) { g.Name = name } -func (g *Graph) GetState() graphState { +// getState returns the state of the graph. This state is used for optimizing +// certain algorithms by knowing what part of processing the graph is currently +// undergoing. +func (g *Graph) getState() graphState { //g.mutex.Lock() //defer g.mutex.Unlock() return g.state } -// set graph state and return previous state -func (g *Graph) SetState(state graphState) graphState { +// setState sets the graph state and returns the previous state. +func (g *Graph) setState(state graphState) graphState { g.mutex.Lock() defer g.mutex.Unlock() - prev := g.GetState() + prev := g.getState() g.state = state return prev } @@ -131,6 +140,7 @@ func (g *Graph) AddVertex(xv ...*Vertex) { } } +// DeleteVertex deletes a particular vertex from the graph. func (g *Graph) DeleteVertex(v *Vertex) { delete(g.Adjacency, v) for k := range g.Adjacency { @@ -138,7 +148,7 @@ func (g *Graph) DeleteVertex(v *Vertex) { } } -// adds a directed edge to the graph from v1 to v2 +// AddEdge adds a directed edge to the graph from v1 to v2. func (g *Graph) AddEdge(v1, v2 *Vertex, e *Edge) { // NOTE: this doesn't allow more than one edge between two vertexes... g.AddVertex(v1, v2) // supports adding N vertices now @@ -147,6 +157,8 @@ func (g *Graph) AddEdge(v1, v2 *Vertex, e *Edge) { g.Adjacency[v1][v2] = e } +// GetVertexMatch searches for an equivalent resource in the graph and returns +// the vertex it is found in, or nil if not found. func (g *Graph) GetVertexMatch(obj Res) *Vertex { for k := range g.Adjacency { if k.Res.Compare(obj) { @@ -156,6 +168,7 @@ func (g *Graph) GetVertexMatch(obj Res) *Vertex { return nil } +// HasVertex returns if the input vertex exists in the graph. func (g *Graph) HasVertex(v *Vertex) bool { if _, exists := g.Adjacency[v]; exists { return true @@ -163,12 +176,12 @@ func (g *Graph) HasVertex(v *Vertex) bool { return false } -// number of vertices in the graph +// NumVertices returns the number of vertices in the graph. func (g *Graph) NumVertices() int { return len(g.Adjacency) } -// number of edges in the graph +// NumEdges returns the number of edges in the graph. func (g *Graph) NumEdges() int { count := 0 for k := range g.Adjacency { @@ -187,7 +200,7 @@ func (g *Graph) GetVertices() []*Vertex { return vertices } -// returns a channel of all vertices in the graph +// GetVerticesChan returns a channel of all vertices in the graph. func (g *Graph) GetVerticesChan() chan *Vertex { ch := make(chan *Vertex) go func(ch chan *Vertex) { @@ -199,6 +212,7 @@ func (g *Graph) GetVerticesChan() chan *Vertex { return ch } +// VertexSlice is a linear list of vertices. It can be sorted. type VertexSlice []*Vertex func (vs VertexSlice) Len() int { return len(vs) } @@ -216,7 +230,7 @@ func (g *Graph) GetVerticesSorted() []*Vertex { return vertices } -// make the graph pretty print +// String makes the graph pretty print. func (g *Graph) String() string { return fmt.Sprintf("Vertices(%d), Edges(%d)", g.NumVertices(), g.NumEdges()) } @@ -226,7 +240,7 @@ func (v *Vertex) String() string { return fmt.Sprintf("%s[%s]", v.Res.Kind(), v.Res.GetName()) } -// output the graph in graphviz format +// Graphviz outputs the graph in graphviz format. // https://en.wikipedia.org/wiki/DOT_%28graph_description_language%29 func (g *Graph) Graphviz() (out string) { //digraph g { @@ -258,7 +272,8 @@ func (g *Graph) Graphviz() (out string) { return } -// write out the graphviz data and run the correct graphviz filter command +// ExecGraphviz writes out the graphviz data and runs the correct graphviz +// filter command. func (g *Graph) ExecGraphviz(program, filename string) error { switch program { @@ -308,8 +323,8 @@ func (g *Graph) ExecGraphviz(program, filename string) error { return nil } -// return an array (slice) of all directed vertices to vertex v (??? -> v) -// OKTimestamp should use this +// IncomingGraphEdges returns an array (slice) of all directed vertices to +// vertex v (??? -> v). OKTimestamp should probably use this. func (g *Graph) IncomingGraphEdges(v *Vertex) []*Vertex { // TODO: we might be able to implement this differently by reversing // the Adjacency graph and then looping through it again... @@ -324,8 +339,8 @@ func (g *Graph) IncomingGraphEdges(v *Vertex) []*Vertex { return s } -// return an array (slice) of all vertices that vertex v points to (v -> ???) -// poke should use this +// OutgoingGraphEdges returns an array (slice) of all vertices that vertex v +// points to (v -> ???). Poke should probably use this. func (g *Graph) OutgoingGraphEdges(v *Vertex) []*Vertex { var s []*Vertex for k := range g.Adjacency[v] { // forward paths @@ -334,7 +349,8 @@ func (g *Graph) OutgoingGraphEdges(v *Vertex) []*Vertex { return s } -// return an array (slice) of all vertices that connect to vertex v +// GraphEdges returns an array (slice) of all vertices that connect to vertex v. +// This is the union of IncomingGraphEdges and OutgoingGraphEdges. func (g *Graph) GraphEdges(v *Vertex) []*Vertex { var s []*Vertex s = append(s, g.IncomingGraphEdges(v)...) @@ -342,6 +358,7 @@ func (g *Graph) GraphEdges(v *Vertex) []*Vertex { return s } +// DFS returns a depth first search for the graph, starting at the input vertex. func (g *Graph) DFS(start *Vertex) []*Vertex { var d []*Vertex // discovered var s []*Vertex // stack @@ -364,7 +381,7 @@ func (g *Graph) DFS(start *Vertex) []*Vertex { return d } -// build a new graph containing only vertices from the list... +// FilterGraph builds a new graph containing only vertices from the list. func (g *Graph) FilterGraph(name string, vertices []*Vertex) *Graph { newgraph := NewGraph(name) for k1, x := range g.Adjacency { @@ -378,8 +395,8 @@ func (g *Graph) FilterGraph(name string, vertices []*Vertex) *Graph { return newgraph } -// return a channel containing the N disconnected graphs in our main graph -// we can then process each of these in parallel +// GetDisconnectedGraphs returns a channel containing the N disconnected graphs +// in our main graph. We can then process each of these in parallel. func (g *Graph) GetDisconnectedGraphs() chan *Graph { ch := make(chan *Graph) go func() { @@ -414,8 +431,7 @@ func (g *Graph) GetDisconnectedGraphs() chan *Graph { return ch } -// return the indegree for the graph, IOW the count of vertices that point to me -// NOTE: this returns the values for all vertices in one big lookup table +// InDegree returns the count of vertices that point to me in one big lookup map. func (g *Graph) InDegree() map[*Vertex]int { result := make(map[*Vertex]int) for k := range g.Adjacency { @@ -430,8 +446,7 @@ func (g *Graph) InDegree() map[*Vertex]int { return result } -// return the outdegree for the graph, IOW the count of vertices that point away -// NOTE: this returns the values for all vertices in one big lookup table +// OutDegree returns the count of vertices that point away in one big lookup map. func (g *Graph) OutDegree() map[*Vertex]int { result := make(map[*Vertex]int) @@ -444,7 +459,7 @@ func (g *Graph) OutDegree() map[*Vertex]int { return result } -// returns a topological sort for the graph +// TopologicalSort returns the sort of graph vertices in that order. // based on descriptions and code from wikipedia and rosetta code // TODO: add memoization, and cache invalidation to speed this up :) func (g *Graph) TopologicalSort() (result []*Vertex, ok bool) { // kahn's algorithm @@ -626,15 +641,6 @@ func (g *Graph) VertexMerge(v1, v2 *Vertex, vertexMergeFn func(*Vertex, *Vertex) return nil // success } -func HeisenbergCount(ch chan *Vertex) int { - c := 0 - for x := range ch { - _ = x - c++ - } - return c -} - // GetTimestamp returns the timestamp of a vertex func (v *Vertex) GetTimestamp() int64 { return v.timestamp @@ -646,7 +652,7 @@ func (v *Vertex) UpdateTimestamp() int64 { return v.timestamp } -// can this element run right now? +// OKTimestamp returns true if this element can run right now? func (g *Graph) OKTimestamp(v *Vertex) bool { // these are all the vertices pointing TO v, eg: ??? -> v for _, n := range g.IncomingGraphEdges(v) { @@ -665,14 +671,14 @@ func (g *Graph) OKTimestamp(v *Vertex) bool { return true } -// notify nodes after me in the dependency graph that they need refreshing... +// Poke notifies nodes after me in the dependency graph that they need refreshing... // NOTE: this assumes that this can never fail or need to be rescheduled func (g *Graph) Poke(v *Vertex, activity bool) { // these are all the vertices pointing AWAY FROM v, eg: v -> ??? for _, n := range g.OutgoingGraphEdges(v) { // XXX: if we're in state event and haven't been cancelled by // apply, then we can cancel a poke to a child, right? XXX - // XXX: if n.Res.GetState() != resStateEvent { // is this correct? + // XXX: if n.Res.getState() != resStateEvent { // is this correct? if true { // XXX if DEBUG { log.Printf("%v[%v]: Poke: %v[%v]", v.Kind(), v.GetName(), n.Kind(), n.GetName()) @@ -686,7 +692,7 @@ func (g *Graph) Poke(v *Vertex, activity bool) { } } -// poke the pre-requisites that are stale and need to run before I can run... +// BackPoke pokes the pre-requisites that are stale and need to run before I can run. func (g *Graph) BackPoke(v *Vertex) { // these are all the vertices pointing TO v, eg: ??? -> v for _, n := range g.IncomingGraphEdges(v) { @@ -710,6 +716,7 @@ func (g *Graph) BackPoke(v *Vertex) { } } +// Process is the primary function to execute for a particular vertex in the graph. // XXX: rename this function func (g *Graph) Process(v *Vertex) { obj := v.Res @@ -764,10 +771,11 @@ func (g *Graph) Process(v *Vertex) { } } -// main kick to start the graph +// Start is a main kick to start the graph. It goes through in reverse topological +// sort order so that events can't hit un-started vertices. func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue - log.Printf("State: %v -> %v", g.SetState(graphStateStarting), g.GetState()) - defer log.Printf("State: %v -> %v", g.SetState(graphStateStarted), g.GetState()) + log.Printf("State: %v -> %v", g.setState(graphStateStarting), g.getState()) + defer log.Printf("State: %v -> %v", g.setState(graphStateStarted), g.getState()) t, _ := g.TopologicalSort() // TODO: only calculate indegree if `first` is true to save resources indegree := g.InDegree() // compute all of the indegree's @@ -829,15 +837,17 @@ func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue } } +// Pause sends pause events to the graph in a topological sort order. func (g *Graph) Pause() { - log.Printf("State: %v -> %v", g.SetState(graphStatePausing), g.GetState()) - defer log.Printf("State: %v -> %v", g.SetState(graphStatePaused), g.GetState()) + log.Printf("State: %v -> %v", g.setState(graphStatePausing), g.getState()) + defer log.Printf("State: %v -> %v", g.setState(graphStatePaused), g.getState()) t, _ := g.TopologicalSort() for _, v := range t { // squeeze out the events... v.SendEvent(eventPause, true, false) } } +// Exit sends exit events to the graph in a topological sort order. func (g *Graph) Exit() { if g == nil { return @@ -860,7 +870,7 @@ func (g *Graph) AssociateData(converger Converger) { } } -// in array function to test *Vertex in a slice of *Vertices +// VertexContains is an "in array" function to test for a vertex in a slice of vertices. func VertexContains(needle *Vertex, haystack []*Vertex) bool { for _, v := range haystack { if needle == v { @@ -870,7 +880,7 @@ func VertexContains(needle *Vertex, haystack []*Vertex) bool { return false } -// reverse a list of vertices +// Reverse reverses a list of vertices. func Reverse(vs []*Vertex) []*Vertex { //var out []*Vertex // XXX: golint suggests, but it fails testing out := make([]*Vertex, 0) // empty list diff --git a/remote.go b/remote.go index 26db7e43..0c5452f8 100644 --- a/remote.go +++ b/remote.go @@ -1060,6 +1060,7 @@ func cleanURL(s string) string { // Semaphore is a counting semaphore. type Semaphore chan struct{} +// NewSemaphore creates a new semaphore. func NewSemaphore(size int) Semaphore { return make(Semaphore, size) }