Files
mgmt/etcd/callback.go
James Shubin d30ff6cfae legal: Remove year
Instead of constantly making these updates, let's just remove the year
since things are stored in git anyways, and this is not an actual modern
legal risk anymore.
2025-01-26 16:24:51 -05:00

520 lines
20 KiB
Go

// Mgmt
// Copyright (C) James Shubin and the project contributors
// Written by James Shubin <james@shubin.ca> and the project contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//
// Additional permission under GNU GPL version 3 section 7
//
// If you modify this program, or any covered work, by linking or combining it
// with embedded mcl code and modules (and that the embedded mcl code and
// modules which link with this program, contain a copy of their source code in
// the authoritative form) containing parts covered by the terms of any other
// license, the licensors of this program grant you additional permission to
// convey the resulting work. Furthermore, the licensors of this program grant
// the original author, James Shubin, additional permission to update this
// additional permission if he deems it necessary to achieve the goals of this
// additional permission.
package etcd
import (
"context"
"fmt"
"sync"
"github.com/purpleidea/mgmt/etcd/interfaces"
etcdUtil "github.com/purpleidea/mgmt/etcd/util"
"github.com/purpleidea/mgmt/util"
"github.com/purpleidea/mgmt/util/errwrap"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
etcd "go.etcd.io/etcd/client/v3"
)
// nominateApply applies the changed watcher data onto our local caches.
func (obj *EmbdEtcd) nominateApply(data *interfaces.WatcherData) error {
if data == nil { // ignore empty data
return nil
}
// If we tried to lookup the nominated members here (in etcd v3) this
// would sometimes block because we would lose the cluster leader once
// the current leader calls the MemberAdd API and it steps down trying
// to form a two host cluster. Instead, we can look at the event
// response data to read the nominated values! Since we only see what
// has *changed* in the response data, we have to keep track of the
// original state and apply the deltas. This must be idempotent in case
// it errors and is called again. If we're retrying and we get a data
// format error, it's probably not the end of the world.
nominated, err := applyDeltaEvents(data, obj.nominated) // map[hostname]URLs (URLsMap)
if err != nil && err != errInconsistentApply { // allow missing deletes
return err // unexpected error, fail
}
// TODO: do we want to sort this if it becomes a list instead of a map?
//sort.Strings(nominated) // deterministic order
obj.nominated = nominated
return nil
}
// volunteerApply applies the changed watcher data onto our local caches.
func (obj *EmbdEtcd) volunteerApply(data *interfaces.WatcherData) error {
if data == nil { // ignore empty data
return nil
}
volunteers, err := applyDeltaEvents(data, obj.volunteers) // map[hostname]URLs (URLsMap)
if err != nil && err != errInconsistentApply { // allow missing deletes
return err // unexpected error, fail
}
// TODO: do we want to sort this if it becomes a list instead of a map?
//sort.Strings(volunteers) // deterministic order
obj.volunteers = volunteers
return nil
}
// endpointApply applies the changed watcher data onto our local caches. In this
// particular apply function, it also sets our client with the new endpoints.
func (obj *EmbdEtcd) endpointApply(data *interfaces.WatcherData) error {
if data == nil { // ignore empty data
return nil
}
endpoints, err := applyDeltaEvents(data, obj.endpoints) // map[hostname]URLs (URLsMap)
if err != nil && err != errInconsistentApply { // allow missing deletes
return err // unexpected error, fail
}
// is the endpoint list different?
// TODO: do we want to use the skipEndpointApply here too?
skipEndpointApply := obj.NoServer && len(endpoints) == 0 && len(obj.endpoints) > 0
if err := etcdUtil.CmpURLsMap(obj.endpoints, endpoints); err != nil && !skipEndpointApply {
obj.endpoints = endpoints // set
// can happen if a server drops out for example
obj.Logf("endpoint list changed to: %+v", endpoints)
obj.setEndpoints()
} else if err != nil && skipEndpointApply {
obj.Logf("skipping endpoints apply")
}
return nil
}
// nominateCb runs to respond to the nomination list change events.
// Functionally, it controls the starting and stopping of the server process. If
// a nominate message is received for this machine, then it means it is already
// being added to the cluster with member add and the cluster is now waiting for
// it to start up. When a nominate entry is removed, it's up to this function to
// run the member remove right before it shuts its server down.
func (obj *EmbdEtcd) nominateCb(ctx context.Context) error {
// Ensure that only one copy of this function is run simultaneously.
// This is because we don't want to cause runServer to race with
// destroyServer. Let us completely start up before we can cancel it. As
// a special case, destroyServer itself can race against itself. I don't
// think it's possible for contention on this mutex, but we'll leave it
// in for safety.
obj.nominatedMutex.Lock()
defer obj.nominatedMutex.Unlock()
// This ordering mutex is being added for safety, since there is no good
// reason for this function and volunteerCb to run simultaneously, and
// it might be preventing a race condition that was happening.
obj.orderingMutex.Lock()
defer obj.orderingMutex.Unlock()
if obj.Debug {
obj.Logf("nominateCb")
defer obj.Logf("nominateCb: done!")
}
// check if i have actually volunteered first of all...
if obj.NoServer || len(obj.ServerURLs) == 0 {
obj.Logf("inappropriately nominated, rogue or stale server?")
// TODO: should we un-nominate ourself?
return nil // we've done our job successfully
}
// This can happen when we're shutting down, build the nominated value.
if len(obj.nominated) == 0 {
obj.Logf("list of nominations is empty")
//return nil // don't exit, we might want to shutdown the server
} else {
obj.Logf("nominated: %v", obj.nominated)
}
// if there are no other peers, we create a new server
// TODO: do we need an || len(obj.nominated) == 0 if we're the first?
_, exists := obj.nominated[obj.Hostname] // am i nominated?
newCluster := len(obj.nominated) == 1 && exists
if obj.Debug {
obj.Logf("nominateCb: newCluster: %t; exists: %t; obj.server == nil: %t", newCluster, exists, obj.server == nil)
}
// TODO: server start retries should be handled inside of runServer...
if obj.serverAction(serverActionStart) { // start
// no server is running, but it should be
wg := &sync.WaitGroup{}
serverReady, ackReady := obj.ServerReady() // must call ack!
serverExited, ackExited := obj.ServerExited() // must call ack!
var sendError = false
var serverErr error
obj.Logf("waiting for server...")
nominated, err := etcdUtil.CopyURLsMap(obj.nominated)
if err != nil {
return err
}
wg.Add(1)
go func() {
defer wg.Done()
obj.errExitN = make(chan struct{})
defer close(obj.errExitN) // multi-signal for errChan close op
// blocks until server exits
serverErr = obj.runServer(newCluster, nominated)
if serverErr != nil {
// TODO: why isn't this error seen elsewhere?
// TODO: shouldn't it get propagated somewhere?
obj.Logf("runServer exited with: %+v", serverErr)
}
// in case this exits on its own instead of with destroy
defer obj.destroyServer() // run to reset some values
if sendError && serverErr != nil { // exited with an error
select {
case obj.errChan <- errwrap.Wrapf(serverErr, "runServer errored"):
}
}
}()
// block until either server is ready or an early exit occurs
select {
case <-serverReady:
// detach from our local return of errors from an early
// server exit (pre server ready) and switch to channel
sendError = true // gets set before the ackReady() does
ackReady() // must be called
ackExited() // must be called
// pass
case <-serverExited:
ackExited() // must be called
ackReady() // must be called
wg.Wait() // wait for server to finish to get early err
return serverErr
}
// Once the server is online, we *must* publish this information
// so that (1) others know where to connect to us (2) we provide
// an "event" for member add since there is not any event that's
// currently built-in to etcd and (3) so we have a key to expire
// when we shutdown or crash to give us the member remove event.
// please see issue: https://github.com/etcd-io/etcd/issues/5277
} else if obj.serverAction(serverActionStop) { // stop?
// server is running, but it should not be
// i have been un-nominated, remove self and shutdown server!
// we don't need to do a member remove if i'm the last one...
if len(obj.nominated) != 0 { // don't call if nobody left but me!
// work around: https://github.com/etcd-io/etcd/issues/5482
// and it might make sense to avoid it if we're the last
obj.Logf("member remove: removing self: %d", obj.serverID)
resp, err := obj.memberRemove(ctx, obj.serverID)
if err != nil {
if obj.Debug {
obj.Logf("error with member remove: %v", err)
}
return errwrap.Wrapf(err, "member self remove error")
}
if resp != nil {
obj.Logf("member removed (self): %s (%d)", obj.Hostname, obj.serverID)
if err := obj.updateMemberState(resp.Members); err != nil {
return err
}
}
}
// FIXME: if we fail on destroy should we try to run some of the
// other cleanup tasks that usually afterwards (below) anyways ?
if err := obj.destroyServer(); err != nil { // sync until exited
return errwrap.Wrapf(err, "destroyServer errored")
}
// We close with this special sentinel only during destroy/exit.
if obj.closing {
return interfaces.ErrShutdown
}
}
return nil
}
// volunteerCb runs to respond to the volunteer list change events.
// Functionally, it controls the nominating and adding of members. It typically
// nominates a peer so that it knows it will get to be a server, which causes it
// to start up its server. It also runs the member add operation so that the
// cluster gets quorum safely. The member remove operation is typically run in
// the nominateCb of that server when it is asked to shutdown. This occurs when
// the nominate entry for that server is removed. If a server removes its
// volunteer entry we must respond by removing the nomination so that it can
// receive that message and shutdown.
// FIXME: we might need to respond to member change/disconnect/shutdown events,
// see: https://github.com/etcd-io/etcd/issues/5277
// XXX: Don't allow this function to partially run if it is canceled part way
// through... We don't want an inconsistent state where we did unnominate, but
// didn't remove a member...
// XXX: If the leader changes, do we need to kick the volunteerCb or anything
// else that might have required a leader and which returned because it did not
// have one, thus loosing an event?
func (obj *EmbdEtcd) volunteerCb(ctx context.Context) error {
// Ensure that only one copy of this function is run simultaneously.
// It's not entirely clear if this can ever happen or if it's needed,
// but it's an inexpensive safety check that we can add in for now.
obj.volunteerMutex.Lock()
defer obj.volunteerMutex.Unlock()
// This ordering mutex is being added for safety, since there is no good
// reason for this function and nominateCb to run simultaneously, and it
// might be preventing a race condition that was happening.
obj.orderingMutex.Lock()
defer obj.orderingMutex.Unlock()
if obj.Debug {
obj.Logf("volunteerCb")
defer obj.Logf("volunteerCb: done!")
}
// FIXME: are there any situations where we don't want to short circuit
// here, such as if i'm the last node?
if obj.server == nil {
if obj.Debug {
obj.Logf("i'm not a server yet...")
}
return nil // if i'm not a server, i'm not a leader, return
}
// FIXME: Instead of checking this, assume yes, and use the
// `WithRequireLeader` wrapper, and just ignore the error from that if
// it's wrong... Combined with events that poke this volunteerCb when
// the leader changes, we shouldn't miss any events...
if isLeader, err := obj.isLeader(ctx); err != nil { // XXX: race!
return errwrap.Wrapf(err, "error determining leader")
} else if !isLeader {
if obj.Debug {
obj.Logf("we are not the leader...")
}
return nil
}
// i am the leader!
// Remember that the member* operations return the membership, so this
// means we don't need to run an extra memberList in those scenarios...
// However, this can get out of sync easily, so ensure that our member
// information is very recent.
if err := obj.memberStateFromList(ctx); err != nil {
return errwrap.Wrapf(err, "error during state sync")
}
// XXX: If we have any unstarted members here, do we want to reschedule
// this volunteerCb in a moment? Or will we get another event anyways?
// NOTE: There used to be an is_leader check right here...
// FIXME: Should we use WithRequireLeader instead? Here? Elsewhere?
// https://godoc.org/github.com/etcd-io/etcd/clientv3#WithRequireLeader
// FIXME: can this happen, and if so, is it an error or a pass-through?
if len(obj.volunteers) == 0 {
obj.Logf("list of volunteers is empty")
//return fmt.Errorf("volunteer list is empty")
} else {
obj.Logf("volunteers: %+v", obj.volunteers)
}
// TODO: do we really need to check these errors?
m, err := etcdUtil.CopyURLsMap(obj.membermap) // list of members...
if err != nil {
return err
}
v, err := etcdUtil.CopyURLsMap(obj.volunteers)
if err != nil {
return err
}
// Unnominate anyone that unvolunteers, so they can shutdown cleanly...
// FIXME: one step at a time... do we trigger subsequent steps somehow?
obj.Logf("chooser: (%+v)/(%+v)", m, v)
nominate, unnominate, err := obj.Chooser.Choose(m, v)
if err != nil {
return errwrap.Wrapf(err, "chooser error")
}
// Ensure that we are the *last* in the list if we're unnominating, and
// the *first* in the list if we're nominating. This way, we self-remove
// last, and we self-add first. This is least likely to hurt quorum.
headFn := func(x string) bool {
return x != obj.Hostname
}
tailFn := func(x string) bool {
return x == obj.Hostname
}
nominate = util.PriorityStrSliceSort(nominate, headFn)
unnominate = util.PriorityStrSliceSort(unnominate, tailFn)
obj.Logf("chooser result(+/-): %+v/%+v", nominate, unnominate)
var reterr error
leaderCtx := ctx // default ctx to use
if RequireLeaderCtx {
leaderCtx = etcd.WithRequireLeader(ctx) // FIXME: Is this correct?
}
for i := range nominate {
member := nominate[i]
peerURLs, exists := obj.volunteers[member] // comma separated list of urls
if !exists {
// if this happens, do we have an update race?
return fmt.Errorf("could not find member `%s` in volunteers map", member)
}
// NOTE: storing peerURLs when they're already in volunteers/ is
// redundant, but it seems to be necessary for a sane algorithm.
// nominate before we call the API so that members see it first!
if err := obj.nominate(leaderCtx, member, peerURLs); err != nil {
return errwrap.Wrapf(err, "error nominating: %s", member)
}
// XXX: can we add a ttl here, because once we nominate someone,
// we need to give them up to N seconds to start up after we run
// the MemberAdd API because if they don't, in some situations
// such as if we're adding the second node to the cluster, then
// we've lost quorum until a second member joins! If the TTL
// expires, we need to MemberRemove! In this special case, we
// need to forcefully remove the second member if we don't add
// them, because we'll be in a lack of quorum state and unable
// to do anything... As a result, we should always only add ONE
// member at a time!
// XXX: After we memberAdd, can we wait a timeout, and then undo
// the add if the member doesn't come up? We'd also need to run
// an unnominate too, and mark the node as temporarily failed...
obj.Logf("member add: %s: %v", member, peerURLs)
resp, err := obj.memberAdd(leaderCtx, peerURLs)
if err != nil {
// FIXME: On on error this function needs to run again,
// because we need to make sure to add the member here!
return errwrap.Wrapf(err, "member add error")
}
if resp != nil { // if we're already the right state, we get nil
obj.Logf("member added: %s (%d): %v", member, resp.Member.ID, peerURLs)
if err := obj.updateMemberState(resp.Members); err != nil {
return err
}
if resp.Member.Name == "" { // not started instantly ;)
obj.addMemberState(member, resp.Member.ID, peerURLs, nil)
}
// TODO: would this ever happen or be necessary?
//if member == obj.Hostname {
// obj.addSelfState()
//}
}
}
// we must remove them from the members API or it will look like a crash
if l := len(unnominate); l > 0 {
obj.Logf("unnominated: shutting down %d members...", l)
}
for i := range unnominate {
member := unnominate[i]
memberID, exists := obj.memberIDs[member] // map[string]uint64
if !exists {
// if this happens, do we have an update race?
return fmt.Errorf("could not find member `%s` in memberIDs map", member)
}
// start a watcher to know if member was added
cancelCtx, cancel := context.WithCancel(leaderCtx)
defer cancel()
timeout := util.CloseAfter(cancelCtx, SelfRemoveTimeout) // chan closes
fn := func(members []*pb.Member) error {
for _, m := range members {
if m.Name == member || m.ID == memberID {
return fmt.Errorf("still present")
}
}
return nil // not found!
}
ch, err := obj.memberChange(cancelCtx, fn, MemberChangeInterval)
if err != nil {
return errwrap.Wrapf(err, "error watching for change of: %s", member)
}
if err := obj.nominate(leaderCtx, member, nil); err != nil { // unnominate
return errwrap.Wrapf(err, "error unnominating: %s", member)
}
// Once we issue the above unnominate, that peer will
// shutdown, and this might cause us to loose quorum,
// therefore, let that member remove itself, and then
// double check that it did happen in case delinquent.
// TODO: get built-in transactional member Add/Remove
// functionality to avoid a separate nominate list...
// If we're removing ourself, then let the (un)nominate callback
// do it. That way it removes itself cleanly on server shutdown.
if member == obj.Hostname { // remove in unnominate!
cancel()
obj.Logf("unnominate: removing self...")
continue
}
// cancel remove sleep and unblock early on event...
obj.Logf("waiting %s for %s to self remove...", SelfRemoveTimeout.String(), member)
select {
case <-timeout:
// pass
case err, ok := <-ch:
if ok {
select {
case <-timeout:
// wait until timeout finishes
}
reterr = errwrap.Append(reterr, err)
}
// removed quickly!
}
cancel()
// In case the removed member doesn't remove itself, do it!
resp, err := obj.memberRemove(leaderCtx, memberID)
if err != nil {
return errwrap.Wrapf(err, "member remove error")
}
if resp != nil {
obj.Logf("member removed (forced): %s (%d)", member, memberID)
if err := obj.updateMemberState(resp.Members); err != nil {
return err
}
// Do this I guess, but the TTL will eventually get it.
// Remove the other member to avoid client connections.
if err := obj.advertise(leaderCtx, member, nil); err != nil {
return err
}
}
// Remove the member from our lists to avoid blocking future
// possible MemberList calls which would try and connect to a
// missing member... The lists should get updated from the
// member exiting safely if it doesn't crash, but if it did
// and/or since it's a race to see if the update event will get
// seen before we need the new data, just do it now anyways.
// TODO: Is the above comment still true?
obj.rmMemberState(member) // proactively delete it
obj.Logf("member %s (%d) removed successfully!", member, memberID)
}
// NOTE: We could ensure that etcd reconnects here, but we can just wait
// for the endpoints callback which should see the state change instead.
obj.setEndpoints() // sync client with new endpoints
return reterr
}