misc: Golint fixes

This commit is contained in:
James Shubin
2016-09-02 01:03:40 -04:00
parent 5e3f03df06
commit b46432b5b6
6 changed files with 132 additions and 113 deletions

View File

@@ -43,6 +43,7 @@ type edgeConfig struct {
To vertexConfig `yaml:"to"` To vertexConfig `yaml:"to"`
} }
// GraphConfig is the data structure that describes a single graph to run.
type GraphConfig struct { type GraphConfig struct {
Graph string `yaml:"graph"` Graph string `yaml:"graph"`
Resources struct { Resources struct {
@@ -60,6 +61,7 @@ type GraphConfig struct {
Remote string `yaml:"remote"` Remote string `yaml:"remote"`
} }
// Parse parses a data stream into the graph structure.
func (c *GraphConfig) Parse(data []byte) error { func (c *GraphConfig) Parse(data []byte) error {
if err := yaml.Unmarshal(data, c); err != nil { if err := yaml.Unmarshal(data, c); err != nil {
return err return err
@@ -70,6 +72,7 @@ func (c *GraphConfig) Parse(data []byte) error {
return nil return nil
} }
// ParseConfigFromFile takes a filename and returns the graph config structure.
func ParseConfigFromFile(filename string) *GraphConfig { func ParseConfigFromFile(filename string) *GraphConfig {
data, err := ioutil.ReadFile(filename) data, err := ioutil.ReadFile(filename)
if err != nil { if err != nil {
@@ -295,7 +298,7 @@ func (g *Graph) addEdgesByMatchingUUIDS(v *Vertex, uuids []ResUUID) []bool {
return result return result
} }
// add auto edges to graph // AutoEdges adds the automatic edges to the graph.
func (g *Graph) AutoEdges() { func (g *Graph) AutoEdges() {
log.Println("Compile: Adding AutoEdges...") log.Println("Compile: Adding AutoEdges...")
for _, v := range g.GetVertices() { // for each vertexes autoedges for _, v := range g.GetVertices() { // for each vertexes autoedges

119
etcd.go
View File

@@ -69,6 +69,7 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
) )
// constant parameters which may need to be tweaked or customized
const ( const (
NS = "_mgmt" // root namespace for mgmt operations NS = "_mgmt" // root namespace for mgmt operations
seedSentinel = "_seed" // you must not name your hostname this seedSentinel = "_seed" // you must not name your hostname this
@@ -83,7 +84,7 @@ const (
) )
var ( var (
ErrApplyDeltaEventsInconsistent = errors.New("Etcd: ApplyDeltaEvents: Inconsistent key!") errApplyDeltaEventsInconsistent = errors.New("Etcd: ApplyDeltaEvents: Inconsistent key!")
) )
// AW is a struct for the AddWatcher queue // AW is a struct for the AddWatcher queue
@@ -161,7 +162,7 @@ type EmbdEtcd struct { // EMBeddeD etcd
exitTimeout <-chan time.Time exitTimeout <-chan time.Time
hostname string hostname string
memberId uint64 // cluster membership id of server if running memberID uint64 // cluster membership id of server if running
endpoints etcdtypes.URLsMap // map of servers a client could connect to endpoints etcdtypes.URLsMap // map of servers a client could connect to
clientURLs etcdtypes.URLs // locations to listen for clients if i am a server clientURLs etcdtypes.URLs // locations to listen for clients if i am a server
serverURLs etcdtypes.URLs // locations to listen for servers if i am a server (peer) serverURLs etcdtypes.URLs // locations to listen for servers if i am a server (peer)
@@ -283,8 +284,8 @@ func (obj *EmbdEtcd) Connect(reconnect bool) error {
return nil return nil
} }
} }
var emax uint16 = 0 var emax uint16 // = 0
for { // loop until connect for { // loop until connect
var err error var err error
cfg := obj.GetConfig() cfg := obj.GetConfig()
if eps := obj.endpoints; len(eps) > 0 { if eps := obj.endpoints; len(eps) > 0 {
@@ -516,8 +517,8 @@ func (obj *EmbdEtcd) CtxError(ctx context.Context, err error) (context.Context,
// tmin <= texp^iter - 1 <= tmax // TODO: check my math // tmin <= texp^iter - 1 <= tmax // TODO: check my math
return time.Duration(math.Min(math.Max(math.Pow(float64(texp), float64(iter))-1.0, float64(tmin)), float64(tmax))) * time.Millisecond return time.Duration(math.Min(math.Max(math.Pow(float64(texp), float64(iter))-1.0, float64(tmin)), float64(tmax))) * time.Millisecond
} }
var isTimeout bool = false var isTimeout = false
var iter int = 0 var iter int // = 0
if ctxerr, ok := ctx.Value(ctxErr).(error); ok { if ctxerr, ok := ctx.Value(ctxErr).(error); ok {
if DEBUG { if DEBUG {
log.Printf("Etcd: CtxError: err(%v), ctxerr(%v)", err, ctxerr) log.Printf("Etcd: CtxError: err(%v), ctxerr(%v)", err, ctxerr)
@@ -1055,7 +1056,7 @@ func (obj *EmbdEtcd) rawAddWatcher(ctx context.Context, aw *AW) (func(), error)
rch := obj.client.Watcher.Watch(ctx, aw.path, aw.opts...) rch := obj.client.Watcher.Watch(ctx, aw.path, aw.opts...)
obj.rLock.RUnlock() obj.rLock.RUnlock()
var rev int64 var rev int64
var useRev bool = false var useRev = false
var retry, locked bool = false, false var retry, locked bool = false, false
for { for {
response := <-rch // read response := <-rch // read
@@ -1125,7 +1126,7 @@ func (obj *EmbdEtcd) rawAddWatcher(ctx context.Context, aw *AW) (func(), error)
// rawCallback is the companion to AddWatcher which runs the callback processing // rawCallback is the companion to AddWatcher which runs the callback processing
func rawCallback(ctx context.Context, re *RE) error { func rawCallback(ctx context.Context, re *RE) error {
var err error = re.err // the watch event itself might have had an error var err = re.err // the watch event itself might have had an error
if err == nil { if err == nil {
if callback := re.callback; callback != nil { if callback := re.callback; callback != nil {
// TODO: we could add an async option if needed // TODO: we could add an async option if needed
@@ -1290,47 +1291,46 @@ func (obj *EmbdEtcd) volunteerCallback(re *RE) error {
log.Printf("Etcd: Quitters: Shutting down %d members...", lq) log.Printf("Etcd: Quitters: Shutting down %d members...", lq)
} }
for _, quitter := range quitters { for _, quitter := range quitters {
if mID, ok := Uint64KeyFromStrInMap(quitter, membersMap); ok { mID, ok := Uint64KeyFromStrInMap(quitter, membersMap)
EtcdNominate(obj, quitter, nil) // unnominate if !ok {
// once we issue the above unnominate, that peer will
// shutdown, and this might cause us to loose quorum,
// therefore, let that member remove itself, and then
// double check that it did happen in case delinquent
// TODO: get built-in transactional member Add/Remove
// functionality to avoid a separate nominate list...
if quitter == obj.hostname { // remove in unnominate!
log.Printf("Etcd: Quitters: Removing self...")
continue // TODO: CtxDelayErr ?
}
log.Printf("Etcd: Waiting %d seconds for %s to self remove...", selfRemoveTimeout, quitter)
time.Sleep(selfRemoveTimeout * time.Second)
// in case the removed member doesn't remove itself, do it!
removed, err := EtcdMemberRemove(obj, mID)
if err != nil {
return fmt.Errorf("Etcd: Member Remove: Error: %+v", err)
}
if removed {
log.Printf("Etcd: Member Removed (forced): %v(%v)", quitter, mID)
}
// Remove the endpoint from our list to avoid blocking
// future MemberList calls which would try and connect
// to a missing endpoint... The endpoints should get
// updated from the member exiting safely if it doesn't
// crash, but if it did and/or since it's a race to see
// if the update event will get seen before we need the
// new data, just do it now anyways, then update the
// endpoint list and trigger a reconnect.
delete(obj.endpoints, quitter) // proactively delete it
obj.endpointCallback(nil) // update!
log.Printf("Member %s (%d) removed successfully!", quitter, mID)
return &CtxReconnectErr{"a member was removed"} // retry asap and update endpoint list
} else {
// programming error // programming error
log.Fatalf("Etcd: Member Remove: Error: %v(%v) not in members list!", quitter, mID) log.Fatalf("Etcd: Member Remove: Error: %v(%v) not in members list!", quitter, mID)
} }
EtcdNominate(obj, quitter, nil) // unnominate
// once we issue the above unnominate, that peer will
// shutdown, and this might cause us to loose quorum,
// therefore, let that member remove itself, and then
// double check that it did happen in case delinquent
// TODO: get built-in transactional member Add/Remove
// functionality to avoid a separate nominate list...
if quitter == obj.hostname { // remove in unnominate!
log.Printf("Etcd: Quitters: Removing self...")
continue // TODO: CtxDelayErr ?
}
log.Printf("Etcd: Waiting %d seconds for %s to self remove...", selfRemoveTimeout, quitter)
time.Sleep(selfRemoveTimeout * time.Second)
// in case the removed member doesn't remove itself, do it!
removed, err := EtcdMemberRemove(obj, mID)
if err != nil {
return fmt.Errorf("Etcd: Member Remove: Error: %+v", err)
}
if removed {
log.Printf("Etcd: Member Removed (forced): %v(%v)", quitter, mID)
}
// Remove the endpoint from our list to avoid blocking
// future MemberList calls which would try and connect
// to a missing endpoint... The endpoints should get
// updated from the member exiting safely if it doesn't
// crash, but if it did and/or since it's a race to see
// if the update event will get seen before we need the
// new data, just do it now anyways, then update the
// endpoint list and trigger a reconnect.
delete(obj.endpoints, quitter) // proactively delete it
obj.endpointCallback(nil) // update!
log.Printf("Member %s (%d) removed successfully!", quitter, mID)
return &CtxReconnectErr{"a member was removed"} // retry asap and update endpoint list
} }
return nil return nil
@@ -1344,7 +1344,7 @@ func (obj *EmbdEtcd) nominateCallback(re *RE) error {
defer log.Printf("Trace: Etcd: nominateCallback(): Finished!") defer log.Printf("Trace: Etcd: nominateCallback(): Finished!")
} }
bootstrapping := len(obj.endpoints) == 0 bootstrapping := len(obj.endpoints) == 0
var revision int64 = 0 var revision int64 // = 0
if re != nil { if re != nil {
revision = re.response.Header.Revision revision = re.response.Header.Revision
} }
@@ -1366,7 +1366,7 @@ func (obj *EmbdEtcd) nominateCallback(re *RE) error {
nominated := obj.nominated nominated := obj.nominated
if nominated, err := ApplyDeltaEvents(re, nominated); err == nil { if nominated, err := ApplyDeltaEvents(re, nominated); err == nil {
obj.nominated = nominated obj.nominated = nominated
} else if !re.retryHint || err != ErrApplyDeltaEventsInconsistent { } else if !re.retryHint || err != errApplyDeltaEventsInconsistent {
log.Fatal(err) log.Fatal(err)
} }
@@ -1442,13 +1442,13 @@ func (obj *EmbdEtcd) nominateCallback(re *RE) error {
if len(obj.nominated) != 0 { // don't call if nobody left but me! if len(obj.nominated) != 0 { // don't call if nobody left but me!
// this works around: https://github.com/coreos/etcd/issues/5482, // this works around: https://github.com/coreos/etcd/issues/5482,
// and it probably makes sense to avoid calling if we're the last // and it probably makes sense to avoid calling if we're the last
log.Printf("Etcd: Member Remove: Removing self: %v", obj.memberId) log.Printf("Etcd: Member Remove: Removing self: %v", obj.memberID)
removed, err := EtcdMemberRemove(obj, obj.memberId) removed, err := EtcdMemberRemove(obj, obj.memberID)
if err != nil { if err != nil {
return fmt.Errorf("Etcd: Member Remove: Error: %+v", err) return fmt.Errorf("Etcd: Member Remove: Error: %+v", err)
} }
if removed { if removed {
log.Printf("Etcd: Member Removed (self): %v(%v)", obj.hostname, obj.memberId) log.Printf("Etcd: Member Removed (self): %v(%v)", obj.hostname, obj.memberID)
} }
} }
@@ -1512,7 +1512,7 @@ func (obj *EmbdEtcd) endpointCallback(re *RE) error {
} }
// change detection // change detection
var changed bool = false // do we need to update? var changed = false // do we need to update?
if len(obj.endpoints) != len(endpoints) { if len(obj.endpoints) != len(endpoints) {
changed = true changed = true
} }
@@ -1662,7 +1662,7 @@ func (obj *EmbdEtcd) StartServer(newCluster bool, peerURLsMap etcdtypes.URLsMap)
} }
//log.Fatal(<-obj.server.Err()) XXX //log.Fatal(<-obj.server.Err()) XXX
log.Printf("Etcd: StartServer: Server running...") log.Printf("Etcd: StartServer: Server running...")
obj.memberId = uint64(obj.server.Server.ID()) // store member id for internal use obj.memberID = uint64(obj.server.Server.ID()) // store member id for internal use
obj.serverwg.Add(1) obj.serverwg.Add(1)
return nil return nil
@@ -1677,7 +1677,7 @@ func (obj *EmbdEtcd) DestroyServer() error {
} }
log.Printf("Etcd: DestroyServer: Done closing...") log.Printf("Etcd: DestroyServer: Done closing...")
obj.memberId = 0 obj.memberID = 0
if obj.server == nil { // skip the .Done() below because we didn't .Add(1) it. if obj.server == nil { // skip the .Done() below because we didn't .Add(1) it.
return err return err
} }
@@ -1901,11 +1901,11 @@ func EtcdAddHostnameConvergedWatcher(obj *EmbdEtcd, callbackFn func(map[string]b
internalCbFn := func(re *RE) error { internalCbFn := func(re *RE) error {
// TODO: get the value from the response, and apply delta... // TODO: get the value from the response, and apply delta...
// for now, just run a get operation which is easier to code! // for now, just run a get operation which is easier to code!
if m, err := EtcdHostnameConverged(obj); err == nil { m, err := EtcdHostnameConverged(obj)
return callbackFn(m) // call my function if err != nil {
} else {
return err return err
} }
return callbackFn(m) // call my function
} }
return obj.AddWatcher(path, internalCbFn, true, true, etcd.WithPrefix()) // no block and no converger reset return obj.AddWatcher(path, internalCbFn, true, true, etcd.WithPrefix()) // no block and no converger reset
} }
@@ -1944,6 +1944,7 @@ func EtcdGetClusterSize(obj *EmbdEtcd) (uint16, error) {
return uint16(v), nil return uint16(v), nil
} }
// EtcdMemberAdd adds a member to the cluster.
func EtcdMemberAdd(obj *EmbdEtcd, peerURLs etcdtypes.URLs) (*etcd.MemberAddResponse, error) { func EtcdMemberAdd(obj *EmbdEtcd, peerURLs etcdtypes.URLs) (*etcd.MemberAddResponse, error) {
//obj.Connect(false) // TODO ? //obj.Connect(false) // TODO ?
ctx := context.Background() ctx := context.Background()
@@ -1967,7 +1968,7 @@ func EtcdMemberAdd(obj *EmbdEtcd, peerURLs etcdtypes.URLs) (*etcd.MemberAddRespo
} }
// EtcdMemberRemove removes a member by mID and returns if it worked, and also // EtcdMemberRemove removes a member by mID and returns if it worked, and also
// if there was an error. This is because It might have run without error, but // if there was an error. This is because it might have run without error, but
// the member wasn't found, for example. // the member wasn't found, for example.
func EtcdMemberRemove(obj *EmbdEtcd, mID uint64) (bool, error) { func EtcdMemberRemove(obj *EmbdEtcd, mID uint64) (bool, error) {
//obj.Connect(false) // TODO ? //obj.Connect(false) // TODO ?
@@ -2260,7 +2261,7 @@ func ApplyDeltaEvents(re *RE, urlsmap etcdtypes.URLsMap) (etcdtypes.URLsMap, err
if DEBUG { if DEBUG {
log.Printf("Etcd: ApplyDeltaEvents: Inconsistent key: %v", key) log.Printf("Etcd: ApplyDeltaEvents: Inconsistent key: %v", key)
} }
return nil, ErrApplyDeltaEventsInconsistent return nil, errApplyDeltaEventsInconsistent
} }
delete(urlsmap, key) delete(urlsmap, key)

View File

@@ -29,8 +29,10 @@ const (
eventBackPoke eventBackPoke
) )
// Resp is a channel to be used for boolean responses.
type Resp chan bool type Resp chan bool
// Event is the main struct that stores event information and responses.
type Event struct { type Event struct {
Name eventName Name eventName
Resp Resp // channel to send an ack response on, nil to skip Resp Resp // channel to send an ack response on, nil to skip
@@ -39,45 +41,46 @@ type Event struct {
Activity bool // did something interesting happen? Activity bool // did something interesting happen?
} }
// send a single acknowledgement on the channel if one was requested // ACK sends a single acknowledgement on the channel if one was requested.
func (event *Event) ACK() { func (event *Event) ACK() {
if event.Resp != nil { // if they've requested an ACK if event.Resp != nil { // if they've requested an ACK
event.Resp.ACK() event.Resp.ACK()
} }
} }
// NACK sends a negative acknowledgement message on the channel if one was requested.
func (event *Event) NACK() { func (event *Event) NACK() {
if event.Resp != nil { // if they've requested a NACK if event.Resp != nil { // if they've requested a NACK
event.Resp.NACK() event.Resp.NACK()
} }
} }
// Resp is just a helper to return the right type of response channel // NewResp is just a helper to return the right type of response channel.
func NewResp() Resp { func NewResp() Resp {
resp := make(chan bool) resp := make(chan bool)
return resp return resp
} }
// ACK sends a true value to resp // ACK sends a true value to resp.
func (resp Resp) ACK() { func (resp Resp) ACK() {
if resp != nil { if resp != nil {
resp <- true resp <- true
} }
} }
// NACK sends a false value to resp // NACK sends a false value to resp.
func (resp Resp) NACK() { func (resp Resp) NACK() {
if resp != nil { if resp != nil {
resp <- false resp <- false
} }
} }
// Wait waits for any response from a Resp channel and returns it // Wait waits for any response from a Resp channel and returns it.
func (resp Resp) Wait() bool { func (resp Resp) Wait() bool {
return <-resp return <-resp
} }
// ACKWait waits for a +ive Ack from a Resp channel // ACKWait waits for a +ive Ack from a Resp channel.
func (resp Resp) ACKWait() { func (resp Resp) ACKWait() {
for { for {
// wait until true value // wait until true value
@@ -87,7 +90,7 @@ func (resp Resp) ACKWait() {
} }
} }
// get the activity value // GetActivity returns the activity value.
func (event *Event) GetActivity() bool { func (event *Event) GetActivity() bool {
return event.Activity return event.Activity
} }

View File

@@ -38,6 +38,7 @@ var (
prefix = fmt.Sprintf("/var/lib/%s/", program) prefix = fmt.Sprintf("/var/lib/%s/", program)
) )
// variables controlling verbosity
const ( const (
DEBUG = false // add additional log messages DEBUG = false // add additional log messages
TRACE = false // add execution flow log messages TRACE = false // add execution flow log messages

102
pgraph.go
View File

@@ -43,6 +43,7 @@ const (
graphStatePaused graphStatePaused
) )
// Graph is the graph structure in this library.
// The graph abstract data type (ADT) is defined as follows: // The graph abstract data type (ADT) is defined as follows:
// * the directed graph arrows point from left to right ( -> ) // * the directed graph arrows point from left to right ( -> )
// * the arrows point away from their dependencies (eg: arrows mean "before") // * the arrows point away from their dependencies (eg: arrows mean "before")
@@ -55,15 +56,18 @@ type Graph struct {
mutex sync.Mutex // used when modifying graph State variable mutex sync.Mutex // used when modifying graph State variable
} }
// Vertex is the primary vertex struct in this library.
type Vertex struct { type Vertex struct {
Res // anonymous field Res // anonymous field
timestamp int64 // last updated timestamp ? timestamp int64 // last updated timestamp ?
} }
// Edge is the primary edge struct in this library.
type Edge struct { type Edge struct {
Name string Name string
} }
// NewGraph builds a new graph.
func NewGraph(name string) *Graph { func NewGraph(name string) *Graph {
return &Graph{ return &Graph{
Name: name, Name: name,
@@ -72,12 +76,14 @@ func NewGraph(name string) *Graph {
} }
} }
// NewVertex returns a new graph vertex struct with a contained resource.
func NewVertex(r Res) *Vertex { func NewVertex(r Res) *Vertex {
return &Vertex{ return &Vertex{
Res: r, Res: r,
} }
} }
// NewEdge returns a new graph edge struct.
func NewEdge(name string) *Edge { func NewEdge(name string) *Edge {
return &Edge{ return &Edge{
Name: name, Name: name,
@@ -97,27 +103,30 @@ func (g *Graph) Copy() *Graph {
return newGraph return newGraph
} }
// returns the name of the graph // GetName returns the name of the graph.
func (g *Graph) GetName() string { func (g *Graph) GetName() string {
return g.Name return g.Name
} }
// set name of the graph // SetName sets the name of the graph.
func (g *Graph) SetName(name string) { func (g *Graph) SetName(name string) {
g.Name = name g.Name = name
} }
func (g *Graph) GetState() graphState { // getState returns the state of the graph. This state is used for optimizing
// certain algorithms by knowing what part of processing the graph is currently
// undergoing.
func (g *Graph) getState() graphState {
//g.mutex.Lock() //g.mutex.Lock()
//defer g.mutex.Unlock() //defer g.mutex.Unlock()
return g.state return g.state
} }
// set graph state and return previous state // setState sets the graph state and returns the previous state.
func (g *Graph) SetState(state graphState) graphState { func (g *Graph) setState(state graphState) graphState {
g.mutex.Lock() g.mutex.Lock()
defer g.mutex.Unlock() defer g.mutex.Unlock()
prev := g.GetState() prev := g.getState()
g.state = state g.state = state
return prev return prev
} }
@@ -131,6 +140,7 @@ func (g *Graph) AddVertex(xv ...*Vertex) {
} }
} }
// DeleteVertex deletes a particular vertex from the graph.
func (g *Graph) DeleteVertex(v *Vertex) { func (g *Graph) DeleteVertex(v *Vertex) {
delete(g.Adjacency, v) delete(g.Adjacency, v)
for k := range g.Adjacency { for k := range g.Adjacency {
@@ -138,7 +148,7 @@ func (g *Graph) DeleteVertex(v *Vertex) {
} }
} }
// adds a directed edge to the graph from v1 to v2 // AddEdge adds a directed edge to the graph from v1 to v2.
func (g *Graph) AddEdge(v1, v2 *Vertex, e *Edge) { func (g *Graph) AddEdge(v1, v2 *Vertex, e *Edge) {
// NOTE: this doesn't allow more than one edge between two vertexes... // NOTE: this doesn't allow more than one edge between two vertexes...
g.AddVertex(v1, v2) // supports adding N vertices now g.AddVertex(v1, v2) // supports adding N vertices now
@@ -147,6 +157,8 @@ func (g *Graph) AddEdge(v1, v2 *Vertex, e *Edge) {
g.Adjacency[v1][v2] = e g.Adjacency[v1][v2] = e
} }
// GetVertexMatch searches for an equivalent resource in the graph and returns
// the vertex it is found in, or nil if not found.
func (g *Graph) GetVertexMatch(obj Res) *Vertex { func (g *Graph) GetVertexMatch(obj Res) *Vertex {
for k := range g.Adjacency { for k := range g.Adjacency {
if k.Res.Compare(obj) { if k.Res.Compare(obj) {
@@ -156,6 +168,7 @@ func (g *Graph) GetVertexMatch(obj Res) *Vertex {
return nil return nil
} }
// HasVertex returns if the input vertex exists in the graph.
func (g *Graph) HasVertex(v *Vertex) bool { func (g *Graph) HasVertex(v *Vertex) bool {
if _, exists := g.Adjacency[v]; exists { if _, exists := g.Adjacency[v]; exists {
return true return true
@@ -163,12 +176,12 @@ func (g *Graph) HasVertex(v *Vertex) bool {
return false return false
} }
// number of vertices in the graph // NumVertices returns the number of vertices in the graph.
func (g *Graph) NumVertices() int { func (g *Graph) NumVertices() int {
return len(g.Adjacency) return len(g.Adjacency)
} }
// number of edges in the graph // NumEdges returns the number of edges in the graph.
func (g *Graph) NumEdges() int { func (g *Graph) NumEdges() int {
count := 0 count := 0
for k := range g.Adjacency { for k := range g.Adjacency {
@@ -187,7 +200,7 @@ func (g *Graph) GetVertices() []*Vertex {
return vertices return vertices
} }
// returns a channel of all vertices in the graph // GetVerticesChan returns a channel of all vertices in the graph.
func (g *Graph) GetVerticesChan() chan *Vertex { func (g *Graph) GetVerticesChan() chan *Vertex {
ch := make(chan *Vertex) ch := make(chan *Vertex)
go func(ch chan *Vertex) { go func(ch chan *Vertex) {
@@ -199,6 +212,7 @@ func (g *Graph) GetVerticesChan() chan *Vertex {
return ch return ch
} }
// VertexSlice is a linear list of vertices. It can be sorted.
type VertexSlice []*Vertex type VertexSlice []*Vertex
func (vs VertexSlice) Len() int { return len(vs) } func (vs VertexSlice) Len() int { return len(vs) }
@@ -216,7 +230,7 @@ func (g *Graph) GetVerticesSorted() []*Vertex {
return vertices return vertices
} }
// make the graph pretty print // String makes the graph pretty print.
func (g *Graph) String() string { func (g *Graph) String() string {
return fmt.Sprintf("Vertices(%d), Edges(%d)", g.NumVertices(), g.NumEdges()) return fmt.Sprintf("Vertices(%d), Edges(%d)", g.NumVertices(), g.NumEdges())
} }
@@ -226,7 +240,7 @@ func (v *Vertex) String() string {
return fmt.Sprintf("%s[%s]", v.Res.Kind(), v.Res.GetName()) return fmt.Sprintf("%s[%s]", v.Res.Kind(), v.Res.GetName())
} }
// output the graph in graphviz format // Graphviz outputs the graph in graphviz format.
// https://en.wikipedia.org/wiki/DOT_%28graph_description_language%29 // https://en.wikipedia.org/wiki/DOT_%28graph_description_language%29
func (g *Graph) Graphviz() (out string) { func (g *Graph) Graphviz() (out string) {
//digraph g { //digraph g {
@@ -258,7 +272,8 @@ func (g *Graph) Graphviz() (out string) {
return return
} }
// write out the graphviz data and run the correct graphviz filter command // ExecGraphviz writes out the graphviz data and runs the correct graphviz
// filter command.
func (g *Graph) ExecGraphviz(program, filename string) error { func (g *Graph) ExecGraphviz(program, filename string) error {
switch program { switch program {
@@ -308,8 +323,8 @@ func (g *Graph) ExecGraphviz(program, filename string) error {
return nil return nil
} }
// return an array (slice) of all directed vertices to vertex v (??? -> v) // IncomingGraphEdges returns an array (slice) of all directed vertices to
// OKTimestamp should use this // vertex v (??? -> v). OKTimestamp should probably use this.
func (g *Graph) IncomingGraphEdges(v *Vertex) []*Vertex { func (g *Graph) IncomingGraphEdges(v *Vertex) []*Vertex {
// TODO: we might be able to implement this differently by reversing // TODO: we might be able to implement this differently by reversing
// the Adjacency graph and then looping through it again... // the Adjacency graph and then looping through it again...
@@ -324,8 +339,8 @@ func (g *Graph) IncomingGraphEdges(v *Vertex) []*Vertex {
return s return s
} }
// return an array (slice) of all vertices that vertex v points to (v -> ???) // OutgoingGraphEdges returns an array (slice) of all vertices that vertex v
// poke should use this // points to (v -> ???). Poke should probably use this.
func (g *Graph) OutgoingGraphEdges(v *Vertex) []*Vertex { func (g *Graph) OutgoingGraphEdges(v *Vertex) []*Vertex {
var s []*Vertex var s []*Vertex
for k := range g.Adjacency[v] { // forward paths for k := range g.Adjacency[v] { // forward paths
@@ -334,7 +349,8 @@ func (g *Graph) OutgoingGraphEdges(v *Vertex) []*Vertex {
return s return s
} }
// return an array (slice) of all vertices that connect to vertex v // GraphEdges returns an array (slice) of all vertices that connect to vertex v.
// This is the union of IncomingGraphEdges and OutgoingGraphEdges.
func (g *Graph) GraphEdges(v *Vertex) []*Vertex { func (g *Graph) GraphEdges(v *Vertex) []*Vertex {
var s []*Vertex var s []*Vertex
s = append(s, g.IncomingGraphEdges(v)...) s = append(s, g.IncomingGraphEdges(v)...)
@@ -342,6 +358,7 @@ func (g *Graph) GraphEdges(v *Vertex) []*Vertex {
return s return s
} }
// DFS returns a depth first search for the graph, starting at the input vertex.
func (g *Graph) DFS(start *Vertex) []*Vertex { func (g *Graph) DFS(start *Vertex) []*Vertex {
var d []*Vertex // discovered var d []*Vertex // discovered
var s []*Vertex // stack var s []*Vertex // stack
@@ -364,7 +381,7 @@ func (g *Graph) DFS(start *Vertex) []*Vertex {
return d return d
} }
// build a new graph containing only vertices from the list... // FilterGraph builds a new graph containing only vertices from the list.
func (g *Graph) FilterGraph(name string, vertices []*Vertex) *Graph { func (g *Graph) FilterGraph(name string, vertices []*Vertex) *Graph {
newgraph := NewGraph(name) newgraph := NewGraph(name)
for k1, x := range g.Adjacency { for k1, x := range g.Adjacency {
@@ -378,8 +395,8 @@ func (g *Graph) FilterGraph(name string, vertices []*Vertex) *Graph {
return newgraph return newgraph
} }
// return a channel containing the N disconnected graphs in our main graph // GetDisconnectedGraphs returns a channel containing the N disconnected graphs
// we can then process each of these in parallel // in our main graph. We can then process each of these in parallel.
func (g *Graph) GetDisconnectedGraphs() chan *Graph { func (g *Graph) GetDisconnectedGraphs() chan *Graph {
ch := make(chan *Graph) ch := make(chan *Graph)
go func() { go func() {
@@ -414,8 +431,7 @@ func (g *Graph) GetDisconnectedGraphs() chan *Graph {
return ch return ch
} }
// return the indegree for the graph, IOW the count of vertices that point to me // InDegree returns the count of vertices that point to me in one big lookup map.
// NOTE: this returns the values for all vertices in one big lookup table
func (g *Graph) InDegree() map[*Vertex]int { func (g *Graph) InDegree() map[*Vertex]int {
result := make(map[*Vertex]int) result := make(map[*Vertex]int)
for k := range g.Adjacency { for k := range g.Adjacency {
@@ -430,8 +446,7 @@ func (g *Graph) InDegree() map[*Vertex]int {
return result return result
} }
// return the outdegree for the graph, IOW the count of vertices that point away // OutDegree returns the count of vertices that point away in one big lookup map.
// NOTE: this returns the values for all vertices in one big lookup table
func (g *Graph) OutDegree() map[*Vertex]int { func (g *Graph) OutDegree() map[*Vertex]int {
result := make(map[*Vertex]int) result := make(map[*Vertex]int)
@@ -444,7 +459,7 @@ func (g *Graph) OutDegree() map[*Vertex]int {
return result return result
} }
// returns a topological sort for the graph // TopologicalSort returns the sort of graph vertices in that order.
// based on descriptions and code from wikipedia and rosetta code // based on descriptions and code from wikipedia and rosetta code
// TODO: add memoization, and cache invalidation to speed this up :) // TODO: add memoization, and cache invalidation to speed this up :)
func (g *Graph) TopologicalSort() (result []*Vertex, ok bool) { // kahn's algorithm func (g *Graph) TopologicalSort() (result []*Vertex, ok bool) { // kahn's algorithm
@@ -626,15 +641,6 @@ func (g *Graph) VertexMerge(v1, v2 *Vertex, vertexMergeFn func(*Vertex, *Vertex)
return nil // success return nil // success
} }
func HeisenbergCount(ch chan *Vertex) int {
c := 0
for x := range ch {
_ = x
c++
}
return c
}
// GetTimestamp returns the timestamp of a vertex // GetTimestamp returns the timestamp of a vertex
func (v *Vertex) GetTimestamp() int64 { func (v *Vertex) GetTimestamp() int64 {
return v.timestamp return v.timestamp
@@ -646,7 +652,7 @@ func (v *Vertex) UpdateTimestamp() int64 {
return v.timestamp return v.timestamp
} }
// can this element run right now? // OKTimestamp returns true if this element can run right now?
func (g *Graph) OKTimestamp(v *Vertex) bool { func (g *Graph) OKTimestamp(v *Vertex) bool {
// these are all the vertices pointing TO v, eg: ??? -> v // these are all the vertices pointing TO v, eg: ??? -> v
for _, n := range g.IncomingGraphEdges(v) { for _, n := range g.IncomingGraphEdges(v) {
@@ -665,14 +671,14 @@ func (g *Graph) OKTimestamp(v *Vertex) bool {
return true return true
} }
// notify nodes after me in the dependency graph that they need refreshing... // Poke notifies nodes after me in the dependency graph that they need refreshing...
// NOTE: this assumes that this can never fail or need to be rescheduled // NOTE: this assumes that this can never fail or need to be rescheduled
func (g *Graph) Poke(v *Vertex, activity bool) { func (g *Graph) Poke(v *Vertex, activity bool) {
// these are all the vertices pointing AWAY FROM v, eg: v -> ??? // these are all the vertices pointing AWAY FROM v, eg: v -> ???
for _, n := range g.OutgoingGraphEdges(v) { for _, n := range g.OutgoingGraphEdges(v) {
// XXX: if we're in state event and haven't been cancelled by // XXX: if we're in state event and haven't been cancelled by
// apply, then we can cancel a poke to a child, right? XXX // apply, then we can cancel a poke to a child, right? XXX
// XXX: if n.Res.GetState() != resStateEvent { // is this correct? // XXX: if n.Res.getState() != resStateEvent { // is this correct?
if true { // XXX if true { // XXX
if DEBUG { if DEBUG {
log.Printf("%v[%v]: Poke: %v[%v]", v.Kind(), v.GetName(), n.Kind(), n.GetName()) log.Printf("%v[%v]: Poke: %v[%v]", v.Kind(), v.GetName(), n.Kind(), n.GetName())
@@ -686,7 +692,7 @@ func (g *Graph) Poke(v *Vertex, activity bool) {
} }
} }
// poke the pre-requisites that are stale and need to run before I can run... // BackPoke pokes the pre-requisites that are stale and need to run before I can run.
func (g *Graph) BackPoke(v *Vertex) { func (g *Graph) BackPoke(v *Vertex) {
// these are all the vertices pointing TO v, eg: ??? -> v // these are all the vertices pointing TO v, eg: ??? -> v
for _, n := range g.IncomingGraphEdges(v) { for _, n := range g.IncomingGraphEdges(v) {
@@ -710,6 +716,7 @@ func (g *Graph) BackPoke(v *Vertex) {
} }
} }
// Process is the primary function to execute for a particular vertex in the graph.
// XXX: rename this function // XXX: rename this function
func (g *Graph) Process(v *Vertex) { func (g *Graph) Process(v *Vertex) {
obj := v.Res obj := v.Res
@@ -764,10 +771,11 @@ func (g *Graph) Process(v *Vertex) {
} }
} }
// main kick to start the graph // Start is a main kick to start the graph. It goes through in reverse topological
// sort order so that events can't hit un-started vertices.
func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue
log.Printf("State: %v -> %v", g.SetState(graphStateStarting), g.GetState()) log.Printf("State: %v -> %v", g.setState(graphStateStarting), g.getState())
defer log.Printf("State: %v -> %v", g.SetState(graphStateStarted), g.GetState()) defer log.Printf("State: %v -> %v", g.setState(graphStateStarted), g.getState())
t, _ := g.TopologicalSort() t, _ := g.TopologicalSort()
// TODO: only calculate indegree if `first` is true to save resources // TODO: only calculate indegree if `first` is true to save resources
indegree := g.InDegree() // compute all of the indegree's indegree := g.InDegree() // compute all of the indegree's
@@ -829,15 +837,17 @@ func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue
} }
} }
// Pause sends pause events to the graph in a topological sort order.
func (g *Graph) Pause() { func (g *Graph) Pause() {
log.Printf("State: %v -> %v", g.SetState(graphStatePausing), g.GetState()) log.Printf("State: %v -> %v", g.setState(graphStatePausing), g.getState())
defer log.Printf("State: %v -> %v", g.SetState(graphStatePaused), g.GetState()) defer log.Printf("State: %v -> %v", g.setState(graphStatePaused), g.getState())
t, _ := g.TopologicalSort() t, _ := g.TopologicalSort()
for _, v := range t { // squeeze out the events... for _, v := range t { // squeeze out the events...
v.SendEvent(eventPause, true, false) v.SendEvent(eventPause, true, false)
} }
} }
// Exit sends exit events to the graph in a topological sort order.
func (g *Graph) Exit() { func (g *Graph) Exit() {
if g == nil { if g == nil {
return return
@@ -860,7 +870,7 @@ func (g *Graph) AssociateData(converger Converger) {
} }
} }
// in array function to test *Vertex in a slice of *Vertices // VertexContains is an "in array" function to test for a vertex in a slice of vertices.
func VertexContains(needle *Vertex, haystack []*Vertex) bool { func VertexContains(needle *Vertex, haystack []*Vertex) bool {
for _, v := range haystack { for _, v := range haystack {
if needle == v { if needle == v {
@@ -870,7 +880,7 @@ func VertexContains(needle *Vertex, haystack []*Vertex) bool {
return false return false
} }
// reverse a list of vertices // Reverse reverses a list of vertices.
func Reverse(vs []*Vertex) []*Vertex { func Reverse(vs []*Vertex) []*Vertex {
//var out []*Vertex // XXX: golint suggests, but it fails testing //var out []*Vertex // XXX: golint suggests, but it fails testing
out := make([]*Vertex, 0) // empty list out := make([]*Vertex, 0) // empty list

View File

@@ -1060,6 +1060,7 @@ func cleanURL(s string) string {
// Semaphore is a counting semaphore. // Semaphore is a counting semaphore.
type Semaphore chan struct{} type Semaphore chan struct{}
// NewSemaphore creates a new semaphore.
func NewSemaphore(size int) Semaphore { func NewSemaphore(size int) Semaphore {
return make(Semaphore, size) return make(Semaphore, size)
} }