engine: Split out the etcd cluster size options
This is clean up work so that it's easier to generalize the world backends.
This commit is contained in:
@@ -65,6 +65,8 @@ type ConfigEtcdRes struct {
|
|||||||
// IdealClusterSize to zero.
|
// IdealClusterSize to zero.
|
||||||
AllowSizeShutdown bool `lang:"allow_size_shutdown"`
|
AllowSizeShutdown bool `lang:"allow_size_shutdown"`
|
||||||
|
|
||||||
|
world engine.EtcdWorld
|
||||||
|
|
||||||
// sizeFlag determines whether sizeCheckApply already ran or not.
|
// sizeFlag determines whether sizeCheckApply already ran or not.
|
||||||
sizeFlag bool
|
sizeFlag bool
|
||||||
|
|
||||||
@@ -93,6 +95,12 @@ func (obj *ConfigEtcdRes) Validate() error {
|
|||||||
func (obj *ConfigEtcdRes) Init(init *engine.Init) error {
|
func (obj *ConfigEtcdRes) Init(init *engine.Init) error {
|
||||||
obj.init = init // save for later
|
obj.init = init // save for later
|
||||||
|
|
||||||
|
world, ok := obj.init.World.(engine.EtcdWorld)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("world backend does not support the EtcdWorld interface")
|
||||||
|
}
|
||||||
|
obj.world = world
|
||||||
|
|
||||||
obj.interruptChan = make(chan struct{})
|
obj.interruptChan = make(chan struct{})
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@@ -109,7 +117,7 @@ func (obj *ConfigEtcdRes) Watch(ctx context.Context) error {
|
|||||||
defer wg.Wait()
|
defer wg.Wait()
|
||||||
innerCtx, cancel := context.WithCancel(ctx)
|
innerCtx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
ch, err := obj.init.World.IdealClusterSizeWatch(util.CtxWithWg(innerCtx, wg))
|
ch, err := obj.world.IdealClusterSizeWatch(util.CtxWithWg(innerCtx, wg))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errwrap.Wrapf(err, "could not watch ideal cluster size")
|
return errwrap.Wrapf(err, "could not watch ideal cluster size")
|
||||||
}
|
}
|
||||||
@@ -158,7 +166,7 @@ func (obj *ConfigEtcdRes) sizeCheckApply(ctx context.Context, apply bool) (bool,
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
val, err := obj.init.World.IdealClusterSizeGet(ctx)
|
val, err := obj.world.IdealClusterSizeGet(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, errwrap.Wrapf(err, "could not get ideal cluster size")
|
return false, errwrap.Wrapf(err, "could not get ideal cluster size")
|
||||||
}
|
}
|
||||||
@@ -181,7 +189,7 @@ func (obj *ConfigEtcdRes) sizeCheckApply(ctx context.Context, apply bool) (bool,
|
|||||||
|
|
||||||
// set!
|
// set!
|
||||||
// This is run as a transaction so we detect if we needed to change it.
|
// This is run as a transaction so we detect if we needed to change it.
|
||||||
changed, err := obj.init.World.IdealClusterSizeSet(ctx, obj.IdealClusterSize)
|
changed, err := obj.world.IdealClusterSizeSet(ctx, obj.IdealClusterSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, errwrap.Wrapf(err, "could not set ideal cluster size")
|
return false, errwrap.Wrapf(err, "could not set ideal cluster size")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -45,10 +45,6 @@ type World interface { // TODO: is there a better name for this interface?
|
|||||||
// FIXME: should this method take a "filter" data struct instead of many args?
|
// FIXME: should this method take a "filter" data struct instead of many args?
|
||||||
ResCollect(ctx context.Context, hostnameFilter, kindFilter []string) ([]Res, error)
|
ResCollect(ctx context.Context, hostnameFilter, kindFilter []string) ([]Res, error)
|
||||||
|
|
||||||
IdealClusterSizeWatch(context.Context) (chan error, error)
|
|
||||||
IdealClusterSizeGet(context.Context) (uint16, error)
|
|
||||||
IdealClusterSizeSet(context.Context, uint16) (bool, error)
|
|
||||||
|
|
||||||
StrWatch(ctx context.Context, namespace string) (chan error, error)
|
StrWatch(ctx context.Context, namespace string) (chan error, error)
|
||||||
StrIsNotExist(error) bool
|
StrIsNotExist(error) bool
|
||||||
StrGet(ctx context.Context, namespace string) (string, error)
|
StrGet(ctx context.Context, namespace string) (string, error)
|
||||||
@@ -75,3 +71,13 @@ type World interface { // TODO: is there a better name for this interface?
|
|||||||
// WatchMembers returns a channel of changing members in the cluster.
|
// WatchMembers returns a channel of changing members in the cluster.
|
||||||
WatchMembers(context.Context) (<-chan *interfaces.MembersResult, error)
|
WatchMembers(context.Context) (<-chan *interfaces.MembersResult, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// EtcdWorld is a world interface that should be implemented if the world
|
||||||
|
// backend is implementing etcd, and if it supports dynamically resizing things.
|
||||||
|
// TODO: In theory we could generalize this to support other backends, but lets
|
||||||
|
// assume it's specific to etcd only for now.
|
||||||
|
type EtcdWorld interface {
|
||||||
|
IdealClusterSizeWatch(context.Context) (chan error, error)
|
||||||
|
IdealClusterSizeGet(context.Context) (uint16, error)
|
||||||
|
IdealClusterSizeSet(context.Context, uint16) (bool, error)
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user