Vanlightly commented on a change in pull request #1690:
URL: https://github.com/apache/zookeeper/pull/1690#discussion_r705458717



##########
File path: zookeeper-specifications/zab-1.0/ZabWithFLE.tla
##########
@@ -0,0 +1,1020 @@
+(*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *)
+
+----------------------------- MODULE ZabWithFLE -----------------------------
+(* This is the formal specification for the Zab consensus algorithm,
+   which means Zookeeper Atomic Broadcast.*)
+
+(* Reference:
+   FLE: FastLeaderElection.java, Vote.java, QuorumPeer.java in 
https://github.com/apache/zookeeper.
+   ZAB: QuorumPeer.java, Learner.java, Follower.java, LearnerHandler.java, 
Leader.java 
+        in https://github.com/apache/zookeeper.
+        https://cwiki.apache.org/confluence/display/ZOOKEEPER/Zab1.0.
+ *)
+EXTENDS FastLeaderElection
+
+-----------------------------------------------------------------------------
+(* Defined in FastLeaderElection.tla:
+\* The set of server identifiers
+CONSTANT Server
+
+\* Server states
+CONSTANTS LOOKING, FOLLOWING, LEADING
+
+\* Message types
+CONSTANTS NOTIFICATION
+
+\* Timeout signal
+CONSTANT NONE
+*)
+\* The set of requests that can go into history
+CONSTANT Value
+
+\* Zab states
+CONSTANTS ELECTION, DISCOVERY, SYNCHRONIZATION, BROADCAST
+
+\* Message types
+CONSTANTS FOLLOWERINFO, LEADERINFO, ACKEPOCH, NEWLEADER, ACKLD, UPTODATE, 
PROPOSAL, ACK, COMMIT
+(* Additional message types used for recovery in 
synchronization(TRUNC/DIFF/SNAP) are not needed
+   since we abstract this part.(see action RECOVERYSYNC) *)
+   
+-----------------------------------------------------------------------------
+(* Defined in FastLeaderElection.tla: Quorums, NullPoint *)
+\* Return the maximum value from the set S
+Maximum(S) == IF S = {} THEN -1
+                        ELSE CHOOSE n \in S: \A m \in S: n >= m
+
+\* Return the minimum value from the set S
+Minimum(S) == IF S = {} THEN -1
+                        ELSE CHOOSE n \in S: \A m \in S: n <= m
+
+MAXEPOCH == 10
+
+-----------------------------------------------------------------------------
+(* Defined in FastLeaderElection.tla:
+   serverVars: <<state, currentEpoch, lastZxid>>,
+   electionVars: <<currentVote, logicalClock, receiveVotes, outOfElection, 
recvQueue, waitNotmsg>>,
+   leaderVars: <<leadingVoteSet>>,
+   electionMsgs,
+   idTable *)
+\* The current phase of server(ELECTION,DISCOVERY,SYNCHRONIZATION,BROADCAST)
+VARIABLE zabState
+
+\* The epoch number of the last NEWEPOCH(LEADERINFO) packet accepted
+\* namely f.p in paper, and currentEpoch in Zab.tla.
+VARIABLE acceptedEpoch
+
+\* The history of servers as the sequence of transactions.
+VARIABLE history
+
+\* commitIndex[i]: The maximum index of transactions that have been saved in a 
quorum of servers
+\*                 in the perspective of server i.(increases monotonically 
before restarting)
+VARIABLE commitIndex
+
+(* These transactions whose index \le commitIndex[i] can be applied to state 
machine immediately.
+   So if we have a variable applyIndex, we can suppose that applyIndex[i] = 
commitIndex[i] when verifying properties.
+   But in phase SYNC, follower will apply all queued proposals to state 
machine when receiving NEWLEADER.
+   But follower only serves traffic after receiving UPTODATE, so sequential 
consistency is not violated.
+   
+   So when we verify properties, we still suppose applyIndex[i] = 
commitIndex[i], because this is an engineering detail.*)
+
+\* learners[i]: The set of servers which leader i think are connected wich i.
+VARIABLE learners
+
+\* The messages representing requests and responses sent from one server to 
another.
+\* msgs[i][j] means the input buffer of server j from server i.
+VARIABLE msgs
+
+\* The set of followers who has successfully sent CEPOCH(FOLLOWERINFO) to 
leader.(equals to connectingFollowers in code)
+VARIABLE cepochRecv
+
+\* The set of followers who has successfully sent ACK-E to leader.(equals to 
electingFollowers in code)
+VARIABLE ackeRecv
+
+\* The set of followers who has successfully sent ACK-LD to leader in 
leader.(equals to newLeaderProposal in code)
+VARIABLE ackldRecv
+
+\* The set of servers which leader i broadcasts PROPOSAL and COMMIT to.(equals 
to forwardingFollowers in code)
+VARIABLE forwarding
+
+\* ackIndex[i][j]: The latest index that leader i has received from follower j 
via ACK.
+VARIABLE ackIndex
+
+\* currentCounter[i]: The count of transactions that clients request leader i.
+VARIABLE currentCounter
+
+\* sendCounter[i]: The count of transactions that leader i has broadcast in 
PROPOSAL.
+VARIABLE sendCounter
+
+\* committedIndex[i]: The maximum index of trasactions that leader i has 
broadcast in COMMIT.
+VARIABLE committedIndex
+
+\* committedCounter[i][j]: The latest counter of transaction that leader i has 
confirmed that follower j has committed.
+VARIABLE committedCounter
+
+\* initialHistory[i]: The initial history if leader i in epoch 
acceptedEpoch[i].
+VARIABLE initialHistory
+
+\* the maximum epoch in CEPOCH the prospective leader received from followers.
+VARIABLE tempMaxEpoch
+
+\* cepochSent[i] = TRUE means follower i has sent CEPOCH(FOLLOWERINFO) to 
leader.
+VARIABLE cepochSent
+
+\* leaderAddr[i]: The leader id of follower i. We use leaderAddr to express 
whether follower i has connected or lost connection.
+VARIABLE leaderAddr
+
+\* synced[i] = TRUE: follower i has completed sync with leader.
+VARIABLE synced
+
+\* The set of leaders in every epoch, only used in verifying properties.
+VARIABLE epochLeader
+
+\* The set of all broadcast messages, only used in verifying properties.
+VARIABLE proposalMsgsLog
+
+\* A variable used to check whether there are conditions contrary to the facts.
+VARIABLE inherentViolated
+
+serverVarsZ == <<state, currentEpoch, lastZxid, zabState, acceptedEpoch, 
history, commitIndex>>         \* 7 variables
+
+electionVarsZ == electionVars  \* 6 variables
+
+leaderVarsZ == <<leadingVoteSet, learners, cepochRecv, ackeRecv, ackldRecv, 
forwarding, 
+                 ackIndex, currentCounter, sendCounter, committedIndex, 
committedCounter>>              \* 11 variables
+
+tempVarsZ == <<initialHistory, tempMaxEpoch>>     \* 2 variables
+  
+followerVarsZ == <<cepochSent, leaderAddr, synced>>                 \* 3 
variables
+
+verifyVarsZ == <<proposalMsgsLog, epochLeader, inherentViolated>>   \* 3 
variables
+
+msgVarsZ == <<msgs, electionMsgs>>                \* 2 variables
+
+vars == <<serverVarsZ, electionVarsZ, leaderVarsZ, tempVarsZ, followerVarsZ, 
verifyVarsZ, msgVarsZ, idTable>>  \* 35 variables
+-----------------------------------------------------------------------------
+\* Add a message to msgs - add a message m to msgs[i][j].
+Send(i, j, m) == msgs' = [msgs EXCEPT ![i][j] = Append(msgs[i][j], m)]
+
+\* Remove a message from msgs - discard head of msgs[i][j].
+Discard(i, j) == msgs' = IF msgs[i][j] /= << >> THEN [msgs EXCEPT ![i][j] = 
Tail(msgs[i][j])]
+                                                ELSE msgs
+
+\* Leader broadcasts a message(PROPOSAL/COMMIT) to all other servers in 
forwardingFollowers.
+Broadcast(i, m) == msgs' = [msgs EXCEPT ![i] = [v \in Server |-> IF /\ v \in 
forwarding[i]
+                                                                    /\ v /= i
+                                                                    /\ \/ /\ 
m.mtype = PROPOSAL
+                                                                          /\ 
ackIndex[i][v] < Len(initialHistory[i]) + m.mproposal.counter
+                                                                       \/ /\ 
m.mtype = COMMIT
+                                                                          /\ 
committedCounter[i][v] < m.mzxid[2]
+                                                                  THEN 
Append(msgs[i][v], m)
+                                                                  ELSE 
msgs[i][v]]]
+
+BroadcastLEADERINFO(i, m) == msgs' = [msgs EXCEPT ![i] = [v \in Server |-> IF 
/\ v \in cepochRecv[i]
+                                                                              
/\ v \in learners[i]
+                                                                              
/\ v /= i THEN Append(msgs[i][v], m)
+                                                                               
         ELSE msgs[i][v]]]
+
+BroadcastUPTODATE(i, m) == msgs' = [msgs EXCEPT ![i] = [v \in Server |-> IF /\ 
v \in ackldRecv[i]
+                                                                            /\ 
v \in learners[i]
+                                                                            /\ 
v /= i THEN Append(msgs[i][v], m)
+                                                                               
       ELSE msgs[i][v]]]
+
+\* Combination of Send and Discard - discard head of msgs[j][i] and add m into 
msgs[i][j].
+Reply(i, j, m) == msgs' = [msgs EXCEPT ![j][i] = Tail(msgs[j][i]),
+                                       ![i][j] = Append(msgs[i][j], m)]
+
+\* shuffle the input buffer from server j(i) in server i(j).
+Clean(i, j) == msgs' = [msgs EXCEPT ![j][i] = << >>, ![i][j] = << >>]
+
+-----------------------------------------------------------------------------
+PZxidEqual(p, z) == p.epoch = z[1] /\ p.counter = z[2]
+
+TransactionEqual(t1, t2) == /\ t1.epoch   = t2.epoch
+                            /\ t1.counter = t2.counter
+                            
+TransactionPrecede(t1, t2) == \/ t1.epoch < t2.epoch
+                              \/ /\ t1.epoch   = t2.epoch
+                                 /\ t1.counter < t2.counter
+                                 
+-----------------------------------------------------------------------------
+\* Define initial values for all variables 
+InitServerVarsZ == /\ InitServerVars
+                   /\ zabState      = [s \in Server |-> ELECTION]
+                   /\ acceptedEpoch = [s \in Server |-> 0]
+                   /\ history       = [s \in Server |-> << >>]
+                   /\ commitIndex   = [s \in Server |-> 0]
+
+InitLeaderVarsZ == /\ InitLeaderVars
+                   /\ learners         = [s \in Server |-> {}]
+                   /\ cepochRecv       = [s \in Server |-> {}]
+                   /\ ackeRecv         = [s \in Server |-> {}]
+                   /\ ackldRecv        = [s \in Server |-> {}]
+                   /\ ackIndex         = [s \in Server |-> [v \in Server |-> 
0]]
+                   /\ currentCounter   = [s \in Server |-> 0]
+                   /\ sendCounter      = [s \in Server |-> 0]
+                   /\ committedIndex   = [s \in Server |-> 0]
+                   /\ committedCounter = [s \in Server |-> [v \in Server |-> 
0]]
+                   /\ forwarding       = [s \in Server |-> {}]
+
+InitElectionVarsZ == InitElectionVars
+
+InitTempVarsZ == /\ initialHistory = [s \in Server |-> << >>]
+                 /\ tempMaxEpoch   = [s \in Server |-> 0]
+
+InitFollowerVarsZ == /\ cepochSent = [s \in Server |-> FALSE]
+                     /\ leaderAddr = [s \in Server |-> NullPoint]
+                     /\ synced     = [s \in Server |-> FALSE]
+
+InitVerifyVarsZ == /\ proposalMsgsLog  = {}
+                   /\ epochLeader      = [i \in 1..MAXEPOCH |-> {}]
+                   /\ inherentViolated = FALSE
+                   
+InitMsgVarsZ == /\ msgs         = [s \in Server |-> [v \in Server |-> << >>]]
+                /\ electionMsgs = [s \in Server |-> [v \in Server |-> << >>]]
+
+InitZ == /\ InitServerVarsZ
+         /\ InitLeaderVarsZ
+         /\ InitElectionVarsZ
+         /\ InitTempVarsZ
+         /\ InitFollowerVarsZ
+         /\ InitVerifyVarsZ
+         /\ InitMsgVarsZ
+         /\ idTable = InitializeIdTable(Server)
+         
+-----------------------------------------------------------------------------
+ZabTurnToLeading(i) ==
+        /\ zabState'       = [zabState   EXCEPT ![i] = DISCOVERY]
+        /\ learners'       = [learners   EXCEPT ![i] = {i}]
+        /\ cepochRecv'     = [cepochRecv EXCEPT ![i] = {i}]
+        /\ ackeRecv'       = [ackeRecv   EXCEPT ![i] = {i}]
+        /\ ackldRecv'      = [ackldRecv  EXCEPT ![i] = {i}]
+        /\ forwarding'     = [forwarding EXCEPT ![i] = {}]
+        /\ ackIndex'       = [ackIndex   EXCEPT ![i] = [v \in Server |-> IF v 
= i THEN Len(history[i])
+                                                                               
   ELSE 0]]
+        /\ currentCounter'   = [currentCounter   EXCEPT ![i] = 0]
+        /\ sendCounter'      = [sendCounter      EXCEPT ![i] = 0]
+        /\ commitIndex'      = [commitIndex      EXCEPT ![i] = 0]
+        /\ committedIndex'   = [committedIndex   EXCEPT ![i] = 0]
+        /\ committedCounter' = [committedCounter EXCEPT ![i] = [v \in Server 
|-> IF v = i THEN Len(history[i])
+                                                                               
           ELSE 0]]
+        /\ initialHistory' = [initialHistory EXCEPT ![i] = history[i]]
+        /\ tempMaxEpoch'   = [tempMaxEpoch   EXCEPT ![i] = acceptedEpoch[i]]
+
+ZabTurnToFollowing(i) ==
+        /\ zabState'    = [zabState   EXCEPT ![i] = DISCOVERY]
+        /\ cepochSent'  = [cepochSent EXCEPT ![i] = FALSE]
+        /\ synced'      = [synced     EXCEPT ![i] = FALSE]
+        /\ commitIndex' = [commitIndex      EXCEPT ![i] = 0]
+        
+          
+(* Fast Leader Election *)
+FLEReceiveNotmsg(i, j) ==
+        /\ ReceiveNotmsg(i, j)
+        /\ UNCHANGED <<zabState, acceptedEpoch, history, commitIndex,learners, 
cepochRecv, ackeRecv, ackldRecv, forwarding, ackIndex, currentCounter,
+                       sendCounter, committedIndex, committedCounter, 
tempVarsZ, followerVarsZ, verifyVarsZ, msgs>>
+
+FLENotmsgTimeout(i) ==
+        /\ NotmsgTimeout(i)
+        /\ UNCHANGED <<zabState, acceptedEpoch, history, commitIndex,learners, 
cepochRecv, ackeRecv, ackldRecv, forwarding, ackIndex, currentCounter,
+                       sendCounter, committedIndex, committedCounter, 
tempVarsZ, followerVarsZ, verifyVarsZ, msgs>>
+
+FLEHandleNotmsg(i) ==
+        /\ HandleNotmsg(i)
+        /\ LET newState == state'[i]
+           IN
+           \/ /\ newState = LEADING
+              /\ ZabTurnToLeading(i)
+              /\ UNCHANGED <<cepochSent, synced>>
+           \/ /\ newState = FOLLOWING
+              /\ ZabTurnToFollowing(i)
+              /\ UNCHANGED <<learners, cepochRecv, ackeRecv, ackldRecv, 
forwarding, ackIndex, currentCounter, sendCounter, committedIndex, 
committedCounter, tempVarsZ>>
+           \/ /\ newState = LOOKING
+              /\ UNCHANGED <<zabState, learners, cepochRecv, ackeRecv, 
ackldRecv, forwarding, ackIndex, currentCounter, sendCounter, commitIndex,
+                             committedIndex, committedCounter, tempVarsZ, 
cepochSent, synced>>
+        /\ UNCHANGED <<acceptedEpoch, history, leaderAddr, verifyVarsZ, msgs>>
+
+\* On the premise that ReceiveVotes.HasQuorums = TRUE, corresponding to logic 
in line 1050-1055 in LFE.java.
+FLEWaitNewNotmsg(i) ==
+        /\ WaitNewNotmsg(i)
+        /\ UNCHANGED <<zabState, acceptedEpoch, history, commitIndex,learners, 
cepochRecv, ackeRecv, ackldRecv, forwarding, ackIndex, currentCounter,
+                       sendCounter, committedIndex, committedCounter, 
tempVarsZ, followerVarsZ, verifyVarsZ, msgs>>
+
+\* On the premise that ReceiveVotes.HasQuorums = TRUE, corresponding to logic 
in line 1061-1066 in LFE.java.
+FLEWaitNewNotmsgEnd(i) ==
+        /\ WaitNewNotmsgEnd(i)
+        /\ LET newState == state'[i]
+           IN
+           \/ /\ newState = LEADING
+              /\ ZabTurnToLeading(i)
+              /\ UNCHANGED <<cepochSent, synced>>
+           \/ /\ newState = FOLLOWING
+              /\ ZabTurnToFollowing(i)
+              /\ UNCHANGED <<learners, cepochRecv, ackeRecv, ackldRecv, 
forwarding, ackIndex, currentCounter, sendCounter, committedIndex, 
committedCounter, tempVarsZ>>
+           \/ /\ newState = LOOKING
+              /\ PrintT("New state is LOOKING in FLEWaitNewNotmsgEnd, which 
should not happen.")
+              /\ UNCHANGED <<zabState, learners, cepochRecv, ackeRecv, 
ackldRecv, forwarding, ackIndex, currentCounter, sendCounter, commitIndex,
+                             committedIndex, committedCounter, tempVarsZ, 
cepochSent, synced>>
+        /\ UNCHANGED <<acceptedEpoch, history, leaderAddr, verifyVarsZ, msgs>>
+              
+-----------------------------------------------------------------------------
+(* A sub-action describing how a server transitions from LEADING/FOLLOWING to 
LOOKING.
+   Initially I call it 'ZabTimeoutZ', but it will be called not only when 
timeout, 
+   but also when finding a low epoch from leader.*)
+FollowerShutdown(i) ==
+        /\ ZabTimeout(i)
+        /\ zabState'   = [zabState   EXCEPT ![i] = ELECTION]
+        /\ leaderAddr' = [leaderAddr EXCEPT ![i] = NullPoint]
+
+LeaderShutdown(i) ==
+        /\ ZabTimeout(i)
+        /\ zabState'   = [zabState   EXCEPT ![i] = ELECTION]
+        /\ leaderAddr' = [s \in Server |-> IF s \in learners[i] THEN NullPoint 
ELSE leaderAddr[s]]
+        /\ learners'   = [learners   EXCEPT ![i] = {}]
+        /\ forwarding' = [forwarding EXCEPT ![i] = {}]
+        /\ msgs'       = [s \in Server |-> [v \in Server |-> IF v \in 
learners[i] \/ s \in learners[i] THEN << >> ELSE msgs[s][v]]]
+
+FollowerTimout(i) ==
+        /\ state[i]      = FOLLOWING
+        /\ leaderAddr[i] = NullPoint
+        /\ FollowerShutdown(i)
+        /\ msgs' = [s \in Server |-> [v \in Server |-> IF v = i THEN << >> 
ELSE msgs[s][v]]]
+        /\ UNCHANGED <<acceptedEpoch, history, commitIndex, learners, 
cepochRecv, ackeRecv, ackldRecv, forwarding, ackIndex, 
+                       currentCounter, sendCounter, committedIndex, 
committedCounter, tempVarsZ, cepochSent, synced, verifyVarsZ>>
+
+LeaderTimeout(i) ==
+        /\ state[i] = LEADING
+        /\ learners[i] \notin Quorums
+        /\ LeaderShutdown(i)
+        /\ UNCHANGED <<acceptedEpoch, history, commitIndex, cepochRecv, 
ackeRecv, ackldRecv, ackIndex, currentCounter, sendCounter, committedIndex, 
committedCounter,
+                       tempVarsZ, cepochSent, synced, verifyVarsZ>>
+    
+-----------------------------------------------------------------------------
+(* Establish connection between leader i and follower j. 
+   It means i creates a learnerHandler for communicating with j, and j finds 
i's address.*)
+EstablishConnection(i, j) ==
+        /\ state[i] = LEADING   /\ state[j] = FOLLOWING
+        /\ j \notin learners[i] /\ leaderAddr[j] = NullPoint
+        /\ currentVote[j].proposedLeader = i
+        /\ learners'   = [learners   EXCEPT ![i] = learners[i] \union {j}] \* 
Leader:   'addLearnerHandler(peer)'
+        /\ leaderAddr' = [leaderAddr EXCEPT ![j] = i]                      \* 
Follower: 'connectToLeader(addr, hostname)'
+        /\ UNCHANGED <<serverVarsZ, electionVarsZ, leadingVoteSet, cepochRecv, 
ackeRecv, ackldRecv, forwarding, ackIndex, 
+                       currentCounter, sendCounter, committedIndex, 
committedCounter, tempVarsZ, cepochSent, synced, verifyVarsZ, msgVarsZ, 
idTable>>
+        
+(* The leader i finds timeout and TCP connection between i and j closes.*)     
  
+Timeout(i, j) ==
+        /\ state[i] = LEADING /\ state[j] = FOLLOWING
+        /\ j \in learners[i]  /\ leaderAddr[j] = i
+        (* The action of leader i.(corresponding to function 
'removeLearnerHandler(peer)'.) *)
+        /\ learners'   = [learners   EXCEPT ![i] = learners[i] \ {j}] 
+        /\ forwarding' = [forwarding EXCEPT ![i] = IF j \in forwarding[i] THEN 
forwarding[i] \ {j} ELSE forwarding[i]]
+        /\ cepochRecv' = [cepochRecv EXCEPT ![i] = IF j \in cepochRecv[i] THEN 
cepochRecv[i] \ {j} ELSE cepochRecv[i]]
+        /\ ackeRecv'   = [ackeRecv   EXCEPT ![i] = IF j \in ackeRecv[i] THEN 
ackeRecv[i] \ {j} ELSE ackeRecv[i]]
+        /\ ackldRecv'  = [ackldRecv  EXCEPT ![i] = IF j \in ackldRecv[i] THEN 
ackldRecv[i] \ {j} ELSE ackldRecv[i]]
+        /\ ackIndex'   = [ackIndex   EXCEPT ![i][j] = 0]
+        /\ committedCounter' = [committedCounter EXCEPT ![i][j] = 0]
+        (* The action of follower j. *)
+        /\ FollowerShutdown(j)
+        (* Clean input buffer.*)
+        /\ Clean(i, j)
+        /\ UNCHANGED <<acceptedEpoch, history, commitIndex, currentCounter, 
sendCounter, committedIndex,
+                       tempVarsZ, cepochSent, synced, verifyVarsZ>>
+-----------------------------------------------------------------------------
+\* In phase f11, follower sends f.p to leader via FOLLOWERINFO(CEPOCH).
+FollowerSendFOLLOWERINFO(i) ==
+        /\ state[i]    = FOLLOWING
+        /\ zabState[i] = DISCOVERY
+        /\ leaderAddr[i] /= NullPoint
+        /\ \lnot cepochSent[i]
+        /\ Send(i, leaderAddr[i], [mtype  |-> FOLLOWERINFO,
+                                   mepoch |-> acceptedEpoch[i]])
+        /\ cepochSent' = [cepochSent EXCEPT ![i] = TRUE]
+        /\ UNCHANGED <<serverVarsZ, leaderVarsZ, electionVarsZ, tempVarsZ, 
leaderAddr, synced, verifyVarsZ, electionMsgs, idTable>>
+        
+(* In phase l11, leader waits for receiving FOLLOWERINFO from a quorum,
+   and then chooses a new epoch e' as its own epoch and broadcasts LEADERINFO. 
*)
+LeaderHandleFOLLOWERINFO(i, j) ==
+        /\ state[i] = LEADING
+        /\ msgs[j][i] /= << >>
+        /\ msgs[j][i][1].mtype = FOLLOWERINFO
+        /\ LET msg == msgs[j][i][1]
+           IN \/ /\ NullPoint \notin cepochRecv[i]   \* 1. has not broadcast 
LEADERINFO - modify tempMaxEpoch
+                 /\ LET newEpoch == Maximum({tempMaxEpoch[i], msg.mepoch})
+                    IN tempMaxEpoch' = [tempMaxEpoch EXCEPT ![i] = newEpoch]
+                 /\ Discard(j, i)
+              \/ /\ NullPoint \in cepochRecv[i]      \* 2. has broadcast 
LEADERINFO - no need to handle the msg, just send LEADERINFO to corresponding 
server
+                 /\ Reply(i, j, [mtype  |-> LEADERINFO,
+                                 mepoch |-> acceptedEpoch[i]])
+                 /\ UNCHANGED tempMaxEpoch
+        /\ cepochRecv' = [cepochRecv EXCEPT ![i] = IF j \in cepochRecv[i] THEN 
cepochRecv[i]
+                                                                          ELSE 
cepochRecv[i] \union {j}]
+        /\ UNCHANGED <<serverVarsZ, followerVarsZ, electionVarsZ, 
initialHistory, leadingVoteSet, learners, ackeRecv, ackldRecv, 
+                       forwarding, ackIndex, currentCounter, sendCounter, 
committedIndex, committedCounter, verifyVarsZ, electionMsgs, idTable>>
+
+LeaderDiscovery1(i) ==
+        /\ state[i]    = LEADING
+        /\ zabState[i] = DISCOVERY
+        /\ cepochRecv[i] \in Quorums
+        /\ acceptedEpoch' = [acceptedEpoch EXCEPT ![i] = tempMaxEpoch[i] + 1]
+        /\ cepochRecv'    = [cepochRecv    EXCEPT ![i] = cepochRecv[i] \union 
{NullPoint}]
+        /\ BroadcastLEADERINFO(i, [mtype  |-> LEADERINFO,
+                                   mepoch |-> acceptedEpoch'[i]])
+        /\ UNCHANGED <<state, currentEpoch, lastZxid, zabState, history, 
commitIndex, electionVarsZ, leadingVoteSet, learners, ackeRecv, ackldRecv, 
+                       forwarding, ackIndex, currentCounter, sendCounter, 
committedIndex, committedCounter, 
+                       tempVarsZ, followerVarsZ, verifyVarsZ, electionMsgs, 
idTable>>
+
+(* In phase f12, follower receives NEWEPOCH. If e' > f.p, then follower sends 
ACK-E back,
+   and ACK-E contains f.a and lastZxid to let leader judge whether it is the 
latest.
+   After handling NEWEPOCH, follower's zabState turns to SYNCHRONIZATION. *)
+FollowerHandleLEADERINFO(i, j) ==
+        /\ state[i] = FOLLOWING
+        /\ msgs[j][i] /= << >>
+        /\ msgs[j][i][1].mtype = LEADERINFO
+        /\ LET msg     == msgs[j][i][1]
+               infoOk  == j = leaderAddr[i]
+               epochOk == /\ infoOk
+                          /\ msg.mepoch >= acceptedEpoch[i]
+               correct == /\ epochOk
+                          /\ zabState[i] = DISCOVERY
+           IN /\ infoOk
+              /\ \/ /\ epochOk                      \* 1. Normal case
+                    /\ \/ /\ correct
+                          /\ acceptedEpoch' = [acceptedEpoch EXCEPT ![i] = 
msg.mepoch]
+                          /\ Reply(i, j, [mtype      |-> ACKEPOCH,
+                                          mepoch     |-> msg.mepoch,
+                                          mlastEpoch |-> currentEpoch[i],
+                                          mlastZxid  |-> lastZxid[i]])
+                          /\ cepochSent' = [cepochSent EXCEPT ![i] = TRUE]
+                          /\ UNCHANGED inherentViolated
+                       \/ /\ ~correct
+                          /\ PrintT("Exception: Condition correct is false in 
FollowerHandleLEADERINFO(" \o ToString(i) \o ", " \o ToString(j) \o ").")
+                          /\ inherentViolated' = TRUE
+                          /\ Discard(j, i)
+                          /\ UNCHANGED <<acceptedEpoch, cepochSent>>
+                    /\ zabState' = [zabState EXCEPT ![i] = IF zabState[i] = 
DISCOVERY THEN SYNCHRONIZATION
+                                                                               
       ELSE zabState[i]]
+                    /\ UNCHANGED <<varsL, leaderAddr>>
+                 \/ /\ ~epochOk                    \* 2. Abnormal case - go 
back to election
+                    /\ FollowerShutdown(i)
+                    /\ Clean(i, j)
+                    /\ UNCHANGED <<acceptedEpoch, cepochSent, 
inherentViolated>>
+        /\ UNCHANGED <<history, commitIndex, learners, cepochRecv, ackeRecv, 
ackldRecv, forwarding, ackIndex, currentCounter, sendCounter, committedIndex,
+                       committedCounter, tempVarsZ, synced, proposalMsgsLog, 
epochLeader>>
+
+\* Abstraction of actions making follower synced with leader before leader 
sending NEWLEADER.
+subRECOVERYSYNC(i, j) ==
+        LET canSync == /\ state[i] = LEADING   /\ zabState[i] /= DISCOVERY     
 /\ j \in learners[i]  /\ j \in ackeRecv[i]
+                       /\ state[j] = FOLLOWING /\ zabState[j] = 
SYNCHRONIZATION /\ leaderAddr[j] = i  /\ synced[j] = FALSE
+        IN
+        \/ /\ canSync
+           /\ history'     = [history     EXCEPT ![j] = history[i]]
+           /\ lastZxid'    = [lastZxid    EXCEPT ![j] = lastZxid[i]]
+           /\ UpdateProposal(j, leaderAddr[j], lastZxid'[j], currentEpoch[j])
+           /\ commitIndex' = [commitIndex EXCEPT ![j] = commitIndex[i]]
+           /\ synced'      = [synced      EXCEPT ![j] = TRUE]
+           /\ forwarding'  = [forwarding  EXCEPT ![i] = forwarding[i] \union 
{j}]    \* j will receive PROPOSAL and COMMIT
+           /\ ackIndex'    = [ackIndex    EXCEPT ![i][j] = Len(history[i])]
+           /\ committedCounter' = [committedCounter EXCEPT ![i][j] = 
Maximum({commitIndex[i] - Len(initialHistory[i]), 0})]
+           /\ LET ms == [msource|->i, mtype|->"RECOVERYSYNC", 
mepoch|->acceptedEpoch[i], mproposals|->history[i]]
+              IN proposalMsgsLog' = IF ms \in proposalMsgsLog THEN 
proposalMsgsLog
+                                                              ELSE 
proposalMsgsLog \union {ms}
+           /\ Reply(i, j, [mtype     |-> NEWLEADER,
+                           mepoch    |-> acceptedEpoch[i],
+                           mlastZxid |-> lastZxid[i]])
+        \/ /\ ~canSync
+           /\ Discard(j, i)
+           /\ UNCHANGED <<history, lastZxid, currentVote, commitIndex, synced, 
forwarding, ackIndex, committedCounter, proposalMsgsLog>>
+        
+(* In phase l12, leader waits for receiving ACKEPOPCH from a quorum,
+   and check whether it has the latest history and epoch from them.
+   If so, leader's zabState turns to SYNCHRONIZATION. *)
+LeaderHandleACKEPOCH(i, j) ==
+        /\ state[i] = LEADING
+        /\ msgs[j][i] /= << >>
+        /\ msgs[j][i][1].mtype = ACKEPOCH
+        /\ LET msg == msgs[j][i][1]
+               infoOk == /\ j \in learners[i]            
+                         /\ acceptedEpoch[i] = msg.mepoch
+               logOk  == /\ infoOk                          \* logOk = TRUE 
means leader is more up-to-date than follower j
+                         /\ \/ currentEpoch[i] > msg.mlastEpoch
+                            \/ /\ currentEpoch[i] = msg.mlastEpoch
+                               /\ \/ lastZxid[i][1] > msg.mlastZxid[1]
+                                  \/ /\ lastZxid[i][1] = msg.mlastZxid[1]
+                                     /\ lastZxid[i][2] >= msg.mlastZxid[2]
+               replyOk == /\ infoOk
+                          /\ NullPoint \in ackeRecv[i]
+           IN /\ infoOk
+              /\ \/ /\ replyOk
+                    /\ subRECOVERYSYNC(i, j)
+                    /\ ackeRecv' = [ackeRecv EXCEPT ![i] = IF j \notin 
ackeRecv[i] THEN ackeRecv[i] \union {j}
+                                                                               
    ELSE ackeRecv[i]]
+                    /\ UNCHANGED <<state, currentEpoch,logicalClock, 
receiveVotes, outOfElection, recvQueue, waitNotmsg, leadingVoteSet, 
electionMsgs, idTable,
+                                   zabState, leaderAddr, learners>>
+                 \/ /\ ~replyOk
+                    /\ \/ /\ logOk
+                          /\ ackeRecv' = [ackeRecv EXCEPT ![i] = IF j \notin 
ackeRecv[i] THEN ackeRecv[i] \union {j}
+                                                                               
          ELSE ackeRecv[i]]
+                          /\ Discard(j, i)
+                          /\ UNCHANGED <<varsL, zabState, leaderAddr, 
learners, forwarding>>
+                       \/ /\ ~logOk            \* go back to election
+                          /\ LeaderShutdown(i)
+                          /\ UNCHANGED ackeRecv
+                    /\ UNCHANGED <<history, commitIndex, synced, forwarding, 
ackIndex, committedCounter, proposalMsgsLog>>
+        /\ UNCHANGED <<acceptedEpoch, cepochRecv, ackldRecv,currentCounter, 
sendCounter, committedIndex, tempVarsZ, cepochSent, epochLeader, 
inherentViolated>>
+                          
+LeaderDiscovery2(i) ==
+        /\ state[i] = LEADING
+        /\ zabState[i] = DISCOVERY
+        /\ ackeRecv[i] \in Quorums
+        /\ zabState'       = [zabState       EXCEPT ![i] = SYNCHRONIZATION]
+        /\ currentEpoch'   = [currentEpoch   EXCEPT ![i] = acceptedEpoch[i]]
+        /\ initialHistory' = [initialHistory EXCEPT ![i] = history[i]]
+        /\ ackeRecv'       = [ackeRecv       EXCEPT ![i] = ackeRecv[i] \union 
{NullPoint}]
+        /\ ackIndex'       = [ackIndex       EXCEPT ![i][i] = Len(history[i])]
+        /\ UpdateProposal(i, i, lastZxid[i], currentEpoch'[i])
+        /\ LET epoch == acceptedEpoch[i]
+           IN epochLeader' = [epochLeader EXCEPT ![epoch] = epochLeader[epoch] 
\union {i}]
+        /\ UNCHANGED <<state, lastZxid, acceptedEpoch, history, commitIndex, 
logicalClock, receiveVotes, outOfElection, recvQueue, waitNotmsg,
+                       leadingVoteSet, learners, cepochRecv, ackldRecv, 
forwarding, currentCounter, sendCounter, committedIndex, committedCounter,
+                       tempMaxEpoch, followerVarsZ, proposalMsgsLog, 
inherentViolated, msgVarsZ, idTable>>
+
+(* Note:  Set cepochRecv, ackeRecv, ackldRecv to {NullPoint} in corresponding 
three actions to
+          make sure that the prospective leader will not broadcast 
NEWEPOCH/NEWLEADER/COMMITLD twice.*)
+-----------------------------------------------------------------------------
+RECOVERYSYNC(i, j) ==
+        /\ state[i] = LEADING   /\ zabState[i] /= DISCOVERY      /\ j \in 
learners[i] /\ j \in ackeRecv[i]
+        /\ state[j] = FOLLOWING /\ zabState[j] = SYNCHRONIZATION /\ 
leaderAddr[j] = i /\ synced[j] = FALSE
+        \* /\ acceptedEpoch[i] = acceptedEpoch[j] \* This condition is 
unnecessary.
+        /\ history'     = [history     EXCEPT ![j] = history[i]]
+        /\ lastZxid'    = [lastZxid    EXCEPT ![j] = lastZxid[i]]
+        /\ UpdateProposal(j, leaderAddr[j], lastZxid'[j], currentEpoch[j])
+        /\ commitIndex' = [commitIndex EXCEPT ![j] = commitIndex[i]]
+        /\ synced'      = [synced      EXCEPT ![j] = TRUE]
+        /\ forwarding'  = [forwarding  EXCEPT ![i] = forwarding[i] \union {j}]
+        /\ ackIndex'    = [ackIndex    EXCEPT ![i][j] = Len(history[i])]
+        /\ committedCounter' = [committedCounter EXCEPT ![i][j] = 
Maximum({commitIndex[i] - Len(initialHistory[i]), 0})]
+        /\ LET ms == [msource|->i, mtype|->"RECOVERYSYNC", 
mepoch|->acceptedEpoch[i], mproposals|->history[i]]
+           IN proposalMsgsLog' = IF ms \in proposalMsgsLog THEN proposalMsgsLog
+                                                           ELSE 
proposalMsgsLog \union {ms}
+        /\ Send(i, j, [mtype     |-> NEWLEADER,
+                       mepoch    |-> acceptedEpoch[i],
+                       mlastZxid |-> lastZxid[i]])
+        /\ UNCHANGED <<state, zabState, acceptedEpoch, currentEpoch, 
logicalClock, receiveVotes, outOfElection, recvQueue, waitNotmsg, 
+                       leadingVoteSet, learners, cepochRecv, ackeRecv, 
ackldRecv, currentCounter, sendCounter, committedIndex, 
+                       tempVarsZ, cepochSent, leaderAddr, epochLeader, 
inherentViolated, electionMsgs, idTable>>
+
+(* In phase f21, follower receives NEWLEADER. The follower updates its epoch 
and history,
+   and sends back ACK-LD to leader. *)
+FollowerHandleNEWLEADER(i, j) ==
+        /\ state[i] = FOLLOWING
+        /\ msgs[j][i] /= << >>
+        /\ msgs[j][i][1].mtype = NEWLEADER
+        /\ LET msg     == msgs[j][i][1]
+               infoOk  == /\ leaderAddr[i] = j
+                          /\ acceptedEpoch[i] = msg.mepoch
+               correct == /\ infoOk
+                          /\ zabState[i] = SYNCHRONIZATION
+                          /\ synced[i]
+                          /\ ZxidEqual(lastZxid[i], msg.mlastZxid)
+           IN /\ infoOk
+              /\ currentEpoch' = [currentEpoch EXCEPT ![i] = msg.mepoch]
+              /\ UpdateProposal(i, j, lastZxid[i], currentEpoch'[i])
+              /\ \/ /\ correct
+                    /\ Reply(i, j, [mtype  |-> ACKLD,
+                                    mepoch |-> msg.mepoch])
+                    /\ UNCHANGED inherentViolated
+                 \/ /\ ~correct
+                    /\ PrintT("Exception: Condition correct is false in 
FollowerHandleNEWLEADER(" \o ToString(i) \o ", " \o ToString(j) \o ").")
+                    /\ inherentViolated' = TRUE
+                    /\ Discard(j, i)
+        /\ UNCHANGED <<state, lastZxid, zabState, acceptedEpoch, history, 
commitIndex, logicalClock, receiveVotes, outOfElection, recvQueue, waitNotmsg,
+                       leaderVarsZ, tempVarsZ, followerVarsZ, proposalMsgsLog, 
epochLeader, electionMsgs, idTable>>
+
+\* In phase l22, leader receives ACK-LD from a quorum of followers, and sends 
COMMIT-LD(UPTODATE) to followers.
+LeaderHandleACKLD(i, j) ==
+        /\ state[i] = LEADING
+        /\ msgs[j][i] /= << >>
+        /\ msgs[j][i][1].mtype = ACKLD
+        /\ LET msg     == msgs[j][i][1]
+               infoOk  == /\ acceptedEpoch[i] = msg.mepoch
+                          /\ j \in learners[i]
+               replyOk == /\ infoOk
+                          /\ NullPoint \in ackldRecv[i]
+           IN /\ infoOk
+              /\ \/ /\ replyOk
+                    /\ Reply(i, j, [mtype   |-> UPTODATE,
+                                    mepoch  |-> acceptedEpoch[i],
+                                    mcommit |-> commitIndex[i]])
+                    /\ committedCounter' = [committedCounter EXCEPT ![i][j] = 
Maximum({commitIndex[i] - Len(initialHistory[i]), committedCounter[i][j]})]
+                 \/ /\ ~replyOk
+                    /\ Discard(j, i)
+                    /\ UNCHANGED committedCounter
+              /\ ackldRecv' = [ackldRecv EXCEPT ![i] = IF j \notin 
ackldRecv[i] THEN ackldRecv[i] \union {j}
+                                                                               
 ELSE ackldRecv[i]]
+        /\ UNCHANGED <<serverVarsZ, electionVarsZ, leadingVoteSet, learners, 
cepochRecv, ackeRecv, forwarding, 
+                       ackIndex, currentCounter, sendCounter, committedIndex, 
tempVarsZ, followerVarsZ, verifyVarsZ, electionMsgs, idTable>>
+
+LeaderSync2(i) ==

Review comment:
       Action name is not descriptive, perhaps rename to something like 
LeaderTransitionsToBroadcast




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to