// Mgmt // Copyright (C) 2013-2019+ James Shubin and the project contributors // Written by James Shubin and the project contributors // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program. If not, see . package etcd import ( "context" "fmt" "sync" "github.com/purpleidea/mgmt/etcd/interfaces" "github.com/purpleidea/mgmt/util" "github.com/purpleidea/mgmt/util/errwrap" etcd "go.etcd.io/etcd/clientv3" // "clientv3" pb "go.etcd.io/etcd/etcdserver/etcdserverpb" ) // nominateApply applies the changed watcher data onto our local caches. func (obj *EmbdEtcd) nominateApply(data *interfaces.WatcherData) error { if data == nil { // ignore empty data return nil } // If we tried to lookup the nominated members here (in etcd v3) this // would sometimes block because we would lose the cluster leader once // the current leader calls the MemberAdd API and it steps down trying // to form a two host cluster. Instead, we can look at the event // response data to read the nominated values! Since we only see what // has *changed* in the response data, we have to keep track of the // original state and apply the deltas. This must be idempotent in case // it errors and is called again. If we're retrying and we get a data // format error, it's probably not the end of the world. nominated, err := applyDeltaEvents(data, obj.nominated) // map[hostname]URLs (URLsMap) if err != nil && err != errInconsistentApply { // allow missing deletes return err // unexpected error, fail } // TODO: do we want to sort this if it becomes a list instead of a map? //sort.Strings(nominated) // deterministic order obj.nominated = nominated return nil } // volunteerApply applies the changed watcher data onto our local caches. func (obj *EmbdEtcd) volunteerApply(data *interfaces.WatcherData) error { if data == nil { // ignore empty data return nil } volunteers, err := applyDeltaEvents(data, obj.volunteers) // map[hostname]URLs (URLsMap) if err != nil && err != errInconsistentApply { // allow missing deletes return err // unexpected error, fail } // TODO: do we want to sort this if it becomes a list instead of a map? //sort.Strings(volunteers) // deterministic order obj.volunteers = volunteers return nil } // endpointApply applies the changed watcher data onto our local caches. In this // particular apply function, it also sets our client with the new endpoints. func (obj *EmbdEtcd) endpointApply(data *interfaces.WatcherData) error { if data == nil { // ignore empty data return nil } endpoints, err := applyDeltaEvents(data, obj.endpoints) // map[hostname]URLs (URLsMap) if err != nil && err != errInconsistentApply { // allow missing deletes return err // unexpected error, fail } // is the endpoint list different? if err := cmpURLsMap(obj.endpoints, endpoints); err != nil { obj.endpoints = endpoints // set // can happen if a server drops out for example obj.Logf("endpoint list changed to: %+v", endpoints) obj.setEndpoints() } return nil } // nominateCb runs to respond to the nomination list change events. // Functionally, it controls the starting and stopping of the server process. If // a nominate message is received for this machine, then it means it is already // being added to the cluster with member add and the cluster is now waiting for // it to start up. When a nominate entry is removed, it's up to this function to // run the member remove right before it shuts its server down. func (obj *EmbdEtcd) nominateCb(ctx context.Context) error { // Ensure that only one copy of this function is run simultaneously. // This is because we don't want to cause runServer to race with // destroyServer. Let us completely start up before we can cancel it. As // a special case, destroyServer itself can race against itself. I don't // think it's possible for contention on this mutex, but we'll leave it // in for safety. obj.nominatedMutex.Lock() defer obj.nominatedMutex.Unlock() // This ordering mutex is being added for safety, since there is no good // reason for this function and volunteerCb to run simultaneously, and // it might be preventing a race condition that was happening. obj.orderingMutex.Lock() defer obj.orderingMutex.Unlock() if obj.Debug { obj.Logf("nominateCb") defer obj.Logf("nominateCb: done!") } // check if i have actually volunteered first of all... if obj.NoServer || len(obj.ServerURLs) == 0 { obj.Logf("inappropriately nominated, rogue or stale server?") // TODO: should we un-nominate ourself? return nil // we've done our job successfully } // This can happen when we're shutting down, build the nominated value. if len(obj.nominated) == 0 { obj.Logf("list of nominations is empty") //return nil // don't exit, we might want to shutdown the server } else { obj.Logf("nominated: %v", obj.nominated) } // if there are no other peers, we create a new server // TODO: do we need an || len(obj.nominated) == 0 if we're the first? _, exists := obj.nominated[obj.Hostname] // am i nominated? newCluster := len(obj.nominated) == 1 && exists if obj.Debug { obj.Logf("nominateCb: newCluster: %t; exists: %t; obj.server == nil: %t", newCluster, exists, obj.server == nil) } // TODO: server start retries should be handled inside of runServer... if obj.serverAction(serverActionStart) { // start // no server is running, but it should be wg := &sync.WaitGroup{} serverReady, ackReady := obj.ServerReady() // must call ack! serverExited, ackExited := obj.ServerExited() // must call ack! var sendError = false var serverErr error obj.Logf("waiting for server...") nominated, err := copyURLsMap(obj.nominated) if err != nil { return err } wg.Add(1) go func() { defer wg.Done() obj.errExitN = make(chan struct{}) defer close(obj.errExitN) // multi-signal for errChan close op // blocks until server exits serverErr = obj.runServer(newCluster, nominated) // in case this exits on its own instead of with destroy defer obj.destroyServer() // run to reset some values if sendError && serverErr != nil { // exited with an error select { case obj.errChan <- errwrap.Wrapf(serverErr, "runServer errored"): } } }() // block until either server is ready or an early exit occurs select { case <-serverReady: // detach from our local return of errors from an early // server exit (pre server ready) and switch to channel sendError = true // gets set before the ackReady() does ackReady() // must be called ackExited() // must be called // pass case <-serverExited: ackExited() // must be called ackReady() // must be called wg.Wait() // wait for server to finish to get early err return serverErr } // Once the server is online, we *must* publish this information // so that (1) others know where to connect to us (2) we provide // an "event" for member add since there is not any event that's // currently built-in to etcd and (3) so we have a key to expire // when we shutdown or crash to give us the member remove event. // please see issue: https://github.com/etcd-io/etcd/issues/5277 } else if obj.serverAction(serverActionStop) { // stop? // server is running, but it should not be // i have been un-nominated, remove self and shutdown server! // we don't need to do a member remove if i'm the last one... if len(obj.nominated) != 0 { // don't call if nobody left but me! // work around: https://github.com/etcd-io/etcd/issues/5482 // and it might make sense to avoid it if we're the last obj.Logf("member remove: removing self: %d", obj.serverID) resp, err := obj.memberRemove(ctx, obj.serverID) if err != nil { if obj.Debug { obj.Logf("error with member remove: %v", err) } return errwrap.Wrapf(err, "member self remove error") } if resp != nil { obj.Logf("member removed (self): %s (%d)", obj.Hostname, obj.serverID) if err := obj.updateMemberState(resp.Members); err != nil { return err } } } // FIXME: if we fail on destroy should we try to run some of the // other cleanup tasks that usually afterwards (below) anyways ? if err := obj.destroyServer(); err != nil { // sync until exited return errwrap.Wrapf(err, "destroyServer errored") } // We close with this special sentinel only during destroy/exit. if obj.closing { return interfaces.ErrShutdown } } return nil } // volunteerCb runs to respond to the volunteer list change events. // Functionally, it controls the nominating and adding of members. It typically // nominates a peer so that it knows it will get to be a server, which causes it // to start up its server. It also runs the member add operation so that the // cluster gets quorum safely. The member remove operation is typically run in // the nominateCb of that server when it is asked to shutdown. This occurs when // the nominate entry for that server is removed. If a server removes its // volunteer entry we must respond by removing the nomination so that it can // receive that message and shutdown. // FIXME: we might need to respond to member change/disconnect/shutdown events, // see: https://github.com/etcd-io/etcd/issues/5277 // XXX: Don't allow this function to partially run if it is canceled part way // through... We don't want an inconsistent state where we did unnominate, but // didn't remove a member... // XXX: If the leader changes, do we need to kick the volunteerCb or anything // else that might have required a leader and which returned because it did not // have one, thus loosing an event? func (obj *EmbdEtcd) volunteerCb(ctx context.Context) error { // Ensure that only one copy of this function is run simultaneously. // It's not entirely clear if this can ever happen or if it's needed, // but it's an inexpensive safety check that we can add in for now. obj.volunteerMutex.Lock() defer obj.volunteerMutex.Unlock() // This ordering mutex is being added for safety, since there is no good // reason for this function and nominateCb to run simultaneously, and it // might be preventing a race condition that was happening. obj.orderingMutex.Lock() defer obj.orderingMutex.Unlock() if obj.Debug { obj.Logf("volunteerCb") defer obj.Logf("volunteerCb: done!") } // FIXME: are there any situations where we don't want to short circuit // here, such as if i'm the last node? if obj.server == nil { if obj.Debug { obj.Logf("i'm not a server yet...") } return nil // if i'm not a server, i'm not a leader, return } // FIXME: Instead of checking this, assume yes, and use the // `WithRequireLeader` wrapper, and just ignore the error from that if // it's wrong... Combined with events that poke this volunteerCb when // the leader changes, we shouldn't miss any events... if isLeader, err := obj.isLeader(ctx); err != nil { // XXX: race! return errwrap.Wrapf(err, "error determining leader") } else if !isLeader { if obj.Debug { obj.Logf("we are not the leader...") } return nil } // i am the leader! // Remember that the member* operations return the membership, so this // means we don't need to run an extra memberList in those scenarios... // However, this can get out of sync easily, so ensure that our member // information is very recent. if err := obj.memberStateFromList(ctx); err != nil { return errwrap.Wrapf(err, "error during state sync") } // XXX: If we have any unstarted members here, do we want to reschedule // this volunteerCb in a moment? Or will we get another event anyways? // NOTE: There used to be an is_leader check right here... // FIXME: Should we use WithRequireLeader instead? Here? Elsewhere? // https://godoc.org/github.com/etcd-io/etcd/clientv3#WithRequireLeader // FIXME: can this happen, and if so, is it an error or a pass-through? if len(obj.volunteers) == 0 { obj.Logf("list of volunteers is empty") //return fmt.Errorf("volunteer list is empty") } else { obj.Logf("volunteers: %+v", obj.volunteers) } // TODO: do we really need to check these errors? m, err := copyURLsMap(obj.membermap) // list of members... if err != nil { return err } v, err := copyURLsMap(obj.volunteers) if err != nil { return err } // Unnominate anyone that unvolunteers, so they can shutdown cleanly... // FIXME: one step at a time... do we trigger subsequent steps somehow? obj.Logf("chooser: (%+v)/(%+v)", m, v) nominate, unnominate, err := obj.Chooser.Choose(m, v) if err != nil { return errwrap.Wrapf(err, "chooser error") } // Ensure that we are the *last* in the list if we're unnominating, and // the *first* in the list if we're nominating. This way, we self-remove // last, and we self-add first. This is least likely to hurt quorum. headFn := func(x string) bool { return x != obj.Hostname } tailFn := func(x string) bool { return x == obj.Hostname } nominate = util.PriorityStrSliceSort(nominate, headFn) unnominate = util.PriorityStrSliceSort(unnominate, tailFn) obj.Logf("chooser result(+/-): %+v/%+v", nominate, unnominate) var reterr error leaderCtx := ctx // default ctx to use if RequireLeaderCtx { leaderCtx = etcd.WithRequireLeader(ctx) // FIXME: Is this correct? } for i := range nominate { member := nominate[i] peerURLs, exists := obj.volunteers[member] // comma separated list of urls if !exists { // if this happens, do we have an update race? return fmt.Errorf("could not find member `%s` in volunteers map", member) } // NOTE: storing peerURLs when they're already in volunteers/ is // redundant, but it seems to be necessary for a sane algorithm. // nominate before we call the API so that members see it first! if err := obj.nominate(leaderCtx, member, peerURLs); err != nil { return errwrap.Wrapf(err, "error nominating: %s", member) } // XXX: can we add a ttl here, because once we nominate someone, // we need to give them up to N seconds to start up after we run // the MemberAdd API because if they don't, in some situations // such as if we're adding the second node to the cluster, then // we've lost quorum until a second member joins! If the TTL // expires, we need to MemberRemove! In this special case, we // need to forcefully remove the second member if we don't add // them, because we'll be in a lack of quorum state and unable // to do anything... As a result, we should always only add ONE // member at a time! // XXX: After we memberAdd, can we wait a timeout, and then undo // the add if the member doesn't come up? We'd also need to run // an unnominate too, and mark the node as temporarily failed... obj.Logf("member add: %s: %v", member, peerURLs) resp, err := obj.memberAdd(leaderCtx, peerURLs) if err != nil { // FIXME: On on error this function needs to run again, // because we need to make sure to add the member here! return errwrap.Wrapf(err, "member add error") } if resp != nil { // if we're already the right state, we get nil obj.Logf("member added: %s (%d): %v", member, resp.Member.ID, peerURLs) if err := obj.updateMemberState(resp.Members); err != nil { return err } if resp.Member.Name == "" { // not started instantly ;) obj.addMemberState(member, resp.Member.ID, peerURLs, nil) } // TODO: would this ever happen or be necessary? //if member == obj.Hostname { // obj.addSelfState() //} } } // we must remove them from the members API or it will look like a crash if l := len(unnominate); l > 0 { obj.Logf("unnominated: shutting down %d members...", l) } for i := range unnominate { member := unnominate[i] memberID, exists := obj.memberIDs[member] // map[string]uint64 if !exists { // if this happens, do we have an update race? return fmt.Errorf("could not find member `%s` in memberIDs map", member) } // start a watcher to know if member was added cancelCtx, cancel := context.WithCancel(leaderCtx) defer cancel() timeout := util.CloseAfter(cancelCtx, SelfRemoveTimeout) // chan closes fn := func(members []*pb.Member) error { for _, m := range members { if m.Name == member || m.ID == memberID { return fmt.Errorf("still present") } } return nil // not found! } ch, err := obj.memberChange(cancelCtx, fn, MemberChangeInterval) if err != nil { return errwrap.Wrapf(err, "error watching for change of: %s", member) } if err := obj.nominate(leaderCtx, member, nil); err != nil { // unnominate return errwrap.Wrapf(err, "error unnominating: %s", member) } // Once we issue the above unnominate, that peer will // shutdown, and this might cause us to loose quorum, // therefore, let that member remove itself, and then // double check that it did happen in case delinquent. // TODO: get built-in transactional member Add/Remove // functionality to avoid a separate nominate list... // If we're removing ourself, then let the (un)nominate callback // do it. That way it removes itself cleanly on server shutdown. if member == obj.Hostname { // remove in unnominate! cancel() obj.Logf("unnominate: removing self...") continue } // cancel remove sleep and unblock early on event... obj.Logf("waiting %s for %s to self remove...", SelfRemoveTimeout.String(), member) select { case <-timeout: // pass case err, ok := <-ch: if ok { select { case <-timeout: // wait until timeout finishes } reterr = errwrap.Append(reterr, err) } // removed quickly! } cancel() // In case the removed member doesn't remove itself, do it! resp, err := obj.memberRemove(leaderCtx, memberID) if err != nil { return errwrap.Wrapf(err, "member remove error") } if resp != nil { obj.Logf("member removed (forced): %s (%d)", member, memberID) if err := obj.updateMemberState(resp.Members); err != nil { return err } // Do this I guess, but the TTL will eventually get it. // Remove the other member to avoid client connections. if err := obj.advertise(leaderCtx, member, nil); err != nil { return err } } // Remove the member from our lists to avoid blocking future // possible MemberList calls which would try and connect to a // missing member... The lists should get updated from the // member exiting safely if it doesn't crash, but if it did // and/or since it's a race to see if the update event will get // seen before we need the new data, just do it now anyways. // TODO: Is the above comment still true? obj.rmMemberState(member) // proactively delete it obj.Logf("member %s (%d) removed successfully!", member, memberID) } // NOTE: We could ensure that etcd reconnects here, but we can just wait // for the endpoints callback which should see the state change instead. obj.setEndpoints() // sync client with new endpoints return reterr }