Wednesday, April 21, 2021

System design: Book reading | Chapter 9 ZooKeeper Internals | Distributed protocols - Durability etc.

 April 21, 2021


What is most important content in the book called ZooKeeper - distributed process coordination for me as a learner? It is chapter 9 ZooKeeper internals. I think that I should have learned and reviewed Chapter 9 ZooKeeper internals early in 2016. I just could not believe that I can quickly learn and I like to take some notes in my blog here as well. 

ZooKeeper Internals

Page  172 (188/238)


Snapshots are copies of the ZooKeeper data tree. Each server frequently takes a snapshot of the data tree by serializing the whole data tree and writing it to a file. The servers do not need to coordinate to take snapshots, nor do they have to stop processing requests. 

Because servers keep executing requests while taking a snapshot, the data tree changes as the snapshot is taken. We call such snapshots fuzzy, because they do not necessarily reflect the exact state of the data tree at any particular point in time.

Let’s walk through an example to illustrate this. Say that a data tree has only two znodes: /z and /z'. Initially, the data of both /z and /z' is the integer 1. Now consider the following sequence of steps:

  1. Start a snapshot.
  2. Serialize and write /z = 1 to the snapshot.
  3. Set the data of /z to 2 (transaction T).
  4. Set the data of /z' to 2 (transaction Tʹ ).
  5. Serialize and write /z' = 2 to the snapshot.

This snapshot contains /z = 1 and /z' = 2. However, there has never been a point in time in which the values of both znodes were like that. This is not a problem, though, because the server replays transactions. It tags each snapshot with the last transaction that has been committed when the snapshot starts—call it TS. If the server eventually loads the snapshot, it replays all transactions in the transaction log that come after TS.

In this case, they are T and Tʹ . After replaying T and Tʹ on top of the snapshot, the server obtains /z = 2 and /z' = 2, which is a valid state.

An important follow-up question to ask is whether there is any problem with applying Tʹ again because it had already been applied by the time the snapshot was taken. As we noted earlier, transactions are idempotent, so as long as we apply the same transactions in the same order, we will get the same result even if some of them have already been applied to the snapshot.

To understand this process, assume that applying a transaction consists of reexecuting the corresponding operation. In the case just described, the operation sets the data of the znode to a specific value, and the value is not dependent on anything else. Say that we are setting the data of /z' unconditionally (the version number is -1 in the setData request). Reapplying the operation succeeds, but we end up with the wrong version number because we increment it twice. This can cause problems in the following way.

Suppose that these three operations are submitted and executed successfully:

  • setData /z', 2, -1
  • setData /z', 3, 2
  • setData /a, 0, -1

The first setData operation is the same one we described earlier, but we’ve added two more setData operations to show that we can end up in a situation in which the second operation is not executed during a replay because of an incorrect version number. By assumption, all three requests were executed correctly when they were submitted. Suppose that a server loads the latest snapshot, which already contains the first setData. 

The server still replays the first setData operation because the snapshot is tagged with an earlier zxid. Because it reexecutes the first setData, the version does not match the one the second setData operation expects, so this operation does not go through. The third setData executes regularly because it is also unconditional.

After loading the snapshot and replaying the log, the state of the server is incorrect because it does not include the second setData request. This execution violates durability and the property that there are no gaps in the sequence of requests executed.

Such problems with reapplying requests are taken care of by turning transactions into state deltas generated by the leader. When the leader generates a transaction for a given request, as part of generating the transaction, it includes the changes in the request to the znode or its data and specifies a fixed version number. Reapplying a transaction consequently does not induce inconsistent version numbers.

Servers and Sessions

Sessions constitute an important abstraction in ZooKeeper. Ordering guarantees, ephemeral znodes, and watches are tightly coupled to sessions. The session tracking mechanism is consequently very important to ZooKeeper.

One important task of ZooKeeper servers is to keep track of sessions. The single server tracks all sessions when running in standalone mode, whereas the leader tracks them in quorum mode. The leader server and the standalone server in fact run the same session tracker (see SessionTracker and SessionTrackerImpl). A follower server simply forwards session information for all the clients that connect to it to the leader (see LearnerSessionTracker).

To keep a session alive, a server needs to receive heartbeats for the session. Heartbeats come in the form of new requests or explicit ping messages (see LearnerHan In both cases, the server touches sessions by updating the session expiration time (see SessionTrackerImpl.touchSession()). In quorum mode, a leader sends a PING message to learners and the learners send back the list of sessions that have been touched since the last PING. The leader sends a ping to learners every half a tick. A tick (described in “Basic Configuration” on page 179) is the minimum unit of time that Zoo‐Keeper uses, expressed in milliseconds. So, if the tick is set to be 2 seconds, then the leader sends a ping every second.

Two important points govern session expiration. A data structure called the expiry queue (see ExpiryQueue) keeps session information for the purposes of expiration. The data structure keeps sessions in buckets, each bucket corresponding to a range of time during which the sessions are supposed to expire, and the leader expires the sessions in one bucket at a time. To determine which bucket to expire, if any, a thread checks the expiry queue to find out when the next deadline is. The thread sleeps until this deadline, and when it wakes up it polls the expiry queue for a new batch of sessions to expire.

This batch can, of course, be empty. To maintain the buckets, the leader splits time into expirationInterval units and assigns each session to the next bucket that expires after the session expiration time. 

The function doing the assignment essentially rounds the expiration time of a session up to the next higher interval. More concretely, the function evaluates this expression to determine which bucket a session belongs in when its session expiration time is updated:

(expirationTime / expirationInterval + 1) * expirationInterval

To provide an example, say that expirationInterval is 2 and the expirationTime for a given session occurs at time 10. We assign this session to bucket 12 (the result of (10/2 + 1) * 2). Note that expirationTime keeps increasing as we touch the session, so we move the session to buckets that expire later accordingly.

One major reason for using a scheme of buckets is to reduce the overhead of checking for session expiration. A ZooKeeper deployment might have thousands of clients and consequently thousands of sessions. Checking for session expiration in a fine-grained manner is not suitable in such situations. Related to this comment, note that if the expirationInterval is short, ZooKeeper ends up performing session expiration checks in a fine-grained manner. The expirationInterval is currently one tick, which is typically on the order of seconds.

Servers and Watches

Watches (see “Watches and Notifications” on page 20) are one-time triggers set by read operations, and each watch is triggered by a specific operation. To manage watches on the server side, a ZooKeeper server implements watch managers. An instance of the WatchManager class is responsible for keeping a list of current watches that are registered and for triggering them. All types of servers (standalone, leader, follower, and observer) process watches in the same way.

The DataTree class keeps a watch manager for child watches and another for data watches, the two types of watches discussed in “Getting More Concrete: How to Set Watches” on page 71. When processing a read operation that sets a watch, the class adds the watch to the manager’s list of watches. Similarly, when processing a transaction, the class finds out whether any watches are to be triggered for the corresponding modification. If there are watches to be triggered, the class calls the trigger method of the manager. Both adding a watch and triggering a watch start with the execution of a read request or a transaction in FinalRequestProcessor.

A watch triggered on the server side is propagated to the client. The class responsible for this is the server cnxn object (see the ServerCnxn class), which represents the connection between the client and the server and implements the Watcher interface. The Watcher.process method serializes the watch event to a format that can be used to transfer it over the wire. The ZooKeeper client receives the serialized version of the watch event, transforms it back to a watch event, and propagates it to the application.

Watches are tracked only in memory. They are never persisted to the disk. When a client disconnects from a server, all its watches are removed from memory. Because client libraries also keep track of their outstanding watches, they will reestablish any outstanding watches on the new server that they connect with.


There are two main classes in the client library: ZooKeeper and ClientCnxn. The Zoo Keeper class implements most of the API, and this is the class a client application must instantiate to create a session. Upon creating a session, ZooKeeper associates a session identifier to it. The identifier is actually generated on the server side of the service (see SessionTrackerImpl).

The ClientCnx class manages the client socket connection with a server. It maintains a list of ZooKeeper servers it can connect to and transparently switches to a different server when a disconnection takes place. Upon reconnecting a session to a different server, the client also resets pending watches (see ClientCnxn.SendThread.primeConnection()). This reset is enabled by default, but can be disabled by setting disableAutoWatchReset.


For the serialization of messages and transactions to send over the network and to store on disk, ZooKeeper uses Jute, which grew out of Hadoop. Now the two code bases have evolved separately. Check the org.apache.jute package in the ZooKeeper code base for the Jute compiler code. (For a long time the ZooKeeper developer team has been discussing options for replacing Jute, but we haven’t found a suitable replacement so far.

It has served us well, though, and it hasn’t been critical to replace it.)

The main definition file for Jute is zookeeper.jute. It contains all definitions of messages and file records. Here is an example of a Jute definition we have in this file:

module org.apache.zookeeper.txn {



class CreateTxn {

ustring path;

buffer data;

vector<> acl;

boolean ephemeral;

int parentCVersion;




This example defines a module containing the definition of a create transaction. The module maps to a ZooKeeper package.

Takeaway Messages

This chapter has discussed core ZooKeeper mechanisms. Leader election is critical for availability. Without it, a ZooKeeper ensemble cannot stay up reliably. Having a leader is necessary but not sufficient. ZooKeeper also needs the Zab protocol to propagate state updates, which guarantees a consistent state despite possible crashes of the ZooKeeper servers.

We have reviewed the types of servers: standalone, leader, follower, and observer. They differ in important ways with respect to the mechanisms they implement and the protocols they execute. Their use also has implications for a given deployment. For example, adding observers enables higher read throughput without affecting write throughput

Adding observers, however, does not increase the overall availability of the system. Internally, ZooKeeper servers implement a number of mechanisms and data structures. Here we have focused on the implementation of sessions and watchers, important concepts to understand when implementing ZooKeeper applications.

Although we have provided pointers to the code in this chapter, the goal was not to provide an exhaustive view of the source code. We strongly encourage the reader to fetch a copy of the code and go over it, using the pointers here as starting points.

No comments:

Post a Comment