miscellaneous cleanups and fixes

This commit is contained in:
James Shubin
2016-08-30 04:41:13 -04:00
parent 636f2a36b1
commit 6794aff77c
13 changed files with 35 additions and 26 deletions

View File

@@ -72,13 +72,13 @@ func (c *GraphConfig) Parse(data []byte) error {
func ParseConfigFromFile(filename string) *GraphConfig { func ParseConfigFromFile(filename string) *GraphConfig {
data, err := ioutil.ReadFile(filename) data, err := ioutil.ReadFile(filename)
if err != nil { if err != nil {
log.Printf("Error: Config: ParseConfigFromFile: File: %v", err) log.Printf("Config: Error: ParseConfigFromFile: File: %v", err)
return nil return nil
} }
var config GraphConfig var config GraphConfig
if err := config.Parse(data); err != nil { if err := config.Parse(data); err != nil {
log.Printf("Error: Config: ParseConfigFromFile: Parse: %v", err) log.Printf("Config: Error: ParseConfigFromFile: Parse: %v", err)
return nil return nil
} }
@@ -122,7 +122,7 @@ func (g *Graph) NewGraphFromConfig(config *GraphConfig, embdEtcd *EmbdEtcd, host
x := slice.Index(j).Interface() x := slice.Index(j).Interface()
res, ok := x.(Res) // convert to Res type res, ok := x.(Res) // convert to Res type
if !ok { if !ok {
return nil, fmt.Errorf("Error: Config: Can't convert: %v of type: %T to Res.", x, x) return nil, fmt.Errorf("Config: Error: Can't convert: %v of type: %T to Res.", x, x)
} }
if noop { if noop {
res.Meta().Noop = noop res.Meta().Noop = noop

View File

@@ -209,7 +209,7 @@ func (obj *converger) Loop(startPaused bool) {
continue continue
} }
case _ = <-obj.channel: case <-obj.channel:
if !obj.isConverged() { if !obj.isConverged() {
if obj.converged { // we're doing a state change if obj.converged { // we're doing a state change
if obj.stateFn != nil { if obj.stateFn != nil {

View File

@@ -180,8 +180,8 @@ type EmbdEtcd struct { // EMBeddeD etcd
delq chan *DL // delete queue delq chan *DL // delete queue
txnq chan *TN // txn queue txnq chan *TN // txn queue
converger Converger // converged tracking
prefix string // folder prefix to use for misc storage prefix string // folder prefix to use for misc storage
converger Converger // converged tracking
// etcd server related // etcd server related
serverwg sync.WaitGroup // wait for server to shutdown serverwg sync.WaitGroup // wait for server to shutdown
@@ -190,7 +190,7 @@ type EmbdEtcd struct { // EMBeddeD etcd
} }
// NewEmbdEtcd creates the top level embedded etcd struct client and server obj // NewEmbdEtcd creates the top level embedded etcd struct client and server obj
func NewEmbdEtcd(hostname string, seeds, clientURLs, serverURLs etcdtypes.URLs, noServer bool, idealClusterSize uint16, converger Converger, prefix string) *EmbdEtcd { func NewEmbdEtcd(hostname string, seeds, clientURLs, serverURLs etcdtypes.URLs, noServer bool, idealClusterSize uint16, prefix string, converger Converger) *EmbdEtcd {
endpoints := make(etcdtypes.URLsMap) endpoints := make(etcdtypes.URLsMap)
if hostname == seedSentinel { // safety if hostname == seedSentinel { // safety
return nil return nil
@@ -1557,8 +1557,7 @@ func (obj *EmbdEtcd) idealClusterSizeCallback(re *RE) error {
} }
path := fmt.Sprintf("/%s/idealClusterSize", NS) path := fmt.Sprintf("/%s/idealClusterSize", NS)
for _, event := range re.response.Events { for _, event := range re.response.Events {
key := bytes.NewBuffer(event.Kv.Key).String() if key := bytes.NewBuffer(event.Kv.Key).String(); key != path {
if key != path {
continue continue
} }
if event.Type != etcd.EventTypePut { if event.Type != etcd.EventTypePut {

View File

@@ -184,7 +184,7 @@ func (obj *ExecRes) Watch(processChan chan Event) {
return // exit return // exit
} }
case _ = <-cuuid.ConvergedTimer(): case <-cuuid.ConvergedTimer():
cuuid.SetConverged(true) // converged! cuuid.SetConverged(true) // converged!
continue continue
} }

View File

@@ -248,7 +248,7 @@ func (obj *FileRes) Watch(processChan chan Event) {
} }
//dirty = false // these events don't invalidate state //dirty = false // these events don't invalidate state
case _ = <-cuuid.ConvergedTimer(): case <-cuuid.ConvergedTimer():
cuuid.SetConverged(true) // converged! cuuid.SetConverged(true) // converged!
continue continue
} }

11
main.go
View File

@@ -63,6 +63,7 @@ func waitForSignal(exit chan bool) {
} }
} }
// run is the main run target.
func run(c *cli.Context) error { func run(c *cli.Context) error {
var start = time.Now().UnixNano() var start = time.Now().UnixNano()
log.Printf("This is: %v, version: %v", program, version) log.Printf("This is: %v, version: %v", program, version)
@@ -184,10 +185,14 @@ func run(c *cli.Context) error {
serverURLs, serverURLs,
c.Bool("no-server"), c.Bool("no-server"),
idealClusterSize, idealClusterSize,
converger,
prefix, prefix,
converger,
) )
if err := EmbdEtcd.Startup(); err != nil { // startup (returns when etcd main loop is running) if EmbdEtcd == nil {
// TODO: verify EmbdEtcd is not nil below...
log.Printf("Main: Etcd: Creation failed!")
exit <- true
} else if err := EmbdEtcd.Startup(); err != nil { // startup (returns when etcd main loop is running)
log.Printf("Main: Etcd: Startup failed: %v", err) log.Printf("Main: Etcd: Startup failed: %v", err)
exit <- true exit <- true
} }
@@ -241,7 +246,7 @@ func run(c *cli.Context) error {
config = ParseConfigFromPuppet(c.String("puppet"), c.String("puppet-conf")) config = ParseConfigFromPuppet(c.String("puppet"), c.String("puppet-conf"))
} }
if config == nil { if config == nil {
log.Printf("Config parse failure") log.Printf("Config: Parse failure")
continue continue
} }

View File

@@ -75,7 +75,7 @@ func (obj *NoopRes) Watch(processChan chan Event) {
return // exit return // exit
} }
case _ = <-cuuid.ConvergedTimer(): case <-cuuid.ConvergedTimer():
cuuid.SetConverged(true) // converged! cuuid.SetConverged(true) // converged!
continue continue
} }

View File

@@ -415,7 +415,7 @@ loop:
} else { } else {
return fmt.Errorf("PackageKit: Error: %v", signal.Body) return fmt.Errorf("PackageKit: Error: %v", signal.Body)
} }
case _ = <-TimeAfterOrBlock(timeout): case <-TimeAfterOrBlock(timeout):
if finished { if finished {
log.Println("PackageKit: Timeout: InstallPackages: Waiting for 'Destroy'") log.Println("PackageKit: Timeout: InstallPackages: Waiting for 'Destroy'")
return nil // got tired of waiting for Destroy return nil // got tired of waiting for Destroy

2
pkg.go
View File

@@ -161,7 +161,7 @@ func (obj *PkgRes) Watch(processChan chan Event) {
} }
dirty = false // these events don't invalidate state dirty = false // these events don't invalidate state
case _ = <-cuuid.ConvergedTimer(): case <-cuuid.ConvergedTimer():
cuuid.SetConverged(true) // converged! cuuid.SetConverged(true) // converged!
continue continue
} }

View File

@@ -342,8 +342,9 @@ func (obj *SSH) SftpClean() error {
// TODO: fix this possible? panic if we ever end up caring about it... // TODO: fix this possible? panic if we ever end up caring about it...
// close any copy operations that are in progress... // close any copy operations that are in progress...
obj.f1.Close() // TODO: we probably only need to shutdown one of them, obj.f1.Close() // TODO: we probably only need to shutdown one of them,
obj.f2.Close() // but which one should we shutdown? close both for now if obj.f2 != nil {
obj.f2.Close() // but which one should we shutdown? close both for now
}
// clean up the graph definition in obj.remotewd // clean up the graph definition in obj.remotewd
err := obj.sftp.Remove(obj.filepath) err := obj.sftp.Remove(obj.filepath)
@@ -567,6 +568,9 @@ func (obj *SSH) ExecExit() error {
// Go kicks off the entire sequence of one SSH connection. // Go kicks off the entire sequence of one SSH connection.
func (obj *SSH) Go() error { func (obj *SSH) Go() error {
defer func() {
obj.exiting = true // bonus: set this as a bonus on exit...
}()
if obj.exitCheck() { if obj.exitCheck() {
return nil return nil
} }
@@ -852,6 +856,7 @@ func (obj *Remotes) Run() {
} }
obj.lock.Lock() obj.lock.Lock()
sshobj, exists := obj.sshmap[f] sshobj, exists := obj.sshmap[f]
obj.lock.Unlock()
if !exists || sshobj == nil { if !exists || sshobj == nil {
continue // skip, this hasn't happened yet continue // skip, this hasn't happened yet
} }
@@ -862,7 +867,6 @@ func (obj *Remotes) Run() {
if _, err := sshobj.SftpGraphCopy(); err == nil { // push new copy if _, err := sshobj.SftpGraphCopy(); err == nil { // push new copy
log.Printf("Remote: Copied over new graph definition: %s", f) log.Printf("Remote: Copied over new graph definition: %s", f)
} // ignore errors } // ignore errors
obj.lock.Unlock()
} }
}() }()
} }
@@ -884,7 +888,7 @@ func (obj *Remotes) Run() {
obj.sshmap[f] = sshobj // save a reference obj.sshmap[f] = sshobj // save a reference
obj.wg.Add(1) obj.wg.Add(1)
go func() { go func(sshobj *SSH, f string) {
if obj.cConns != 0 { if obj.cConns != 0 {
defer obj.semaphore.V(1) defer obj.semaphore.V(1)
} }
@@ -892,7 +896,7 @@ func (obj *Remotes) Run() {
if err := sshobj.Go(); err != nil { if err := sshobj.Go(); err != nil {
log.Printf("Remote: Error: %s", err) log.Printf("Remote: Error: %s", err)
} }
}() }(sshobj, f)
obj.lock.Unlock() obj.lock.Unlock()
} }
} }

6
svc.go
View File

@@ -146,7 +146,7 @@ func (obj *SvcRes) Watch(processChan chan Event) {
obj.SetState(resStateWatching) // reset obj.SetState(resStateWatching) // reset
select { select {
case _ = <-buschan: // XXX wait for new units event to unstick case <-buschan: // XXX wait for new units event to unstick
cuuid.SetConverged(false) cuuid.SetConverged(false)
// loop so that we can see the changed invalid signal // loop so that we can see the changed invalid signal
log.Printf("Svc[%v]->DaemonReload()", svc) log.Printf("Svc[%v]->DaemonReload()", svc)
@@ -160,7 +160,7 @@ func (obj *SvcRes) Watch(processChan chan Event) {
dirty = true dirty = true
} }
case _ = <-cuuid.ConvergedTimer(): case <-cuuid.ConvergedTimer():
cuuid.SetConverged(true) // converged! cuuid.SetConverged(true) // converged!
continue continue
} }
@@ -208,7 +208,7 @@ func (obj *SvcRes) Watch(processChan chan Event) {
dirty = true dirty = true
} }
case _ = <-cuuid.ConvergedTimer(): case <-cuuid.ConvergedTimer():
cuuid.SetConverged(true) // converged! cuuid.SetConverged(true) // converged!
continue continue
} }

View File

@@ -5,4 +5,5 @@ ROOT="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && cd .. && pwd )" # dir!
cd "${ROOT}" cd "${ROOT}"
go vet && echo PASS || exit 1 # since it doesn't output an ok message on pass go vet && echo PASS || exit 1 # since it doesn't output an ok message on pass
grep 'log.' *.go | grep '\\n"' && exit 1 || echo PASS # no \n needed in log.Printf() grep 'log.' *.go | grep '\\n"' && echo 'no \n needed in log.Printf()' && exit 1 || echo PASS # no \n needed in log.Printf()
grep 'case _ = <-' *.go && echo 'case _ = <- can be simplified to: case <-' && exit 1 || echo PASS # this can be simplified

View File

@@ -90,7 +90,7 @@ func (obj *TimerRes) Watch(processChan chan Event) {
if exit, _ := obj.ReadEvent(&event); exit { if exit, _ := obj.ReadEvent(&event); exit {
return return
} }
case _ = <-cuuid.ConvergedTimer(): case <-cuuid.ConvergedTimer():
cuuid.SetConverged(true) cuuid.SetConverged(true)
continue continue
} }