Wednesday, June 1, 2011

A Tour of the Multi-update For Zookeeper

I have recently been working on some new features for Apache Zookeeper that represent probably the largest change in the Zookeeper client API in several years.  Other recent changes such as observers and read-only clusters have changed the server-side semantics, but the client API is nearly unchanged since the original public release of Zookeeper several years ago.  The JIRA covering the overall issue is Zookeeper-965 and you can look at the code as it is being refined into committable state in my fork of Zookeeper that I keep on github.

(related announcement)

The Problem

Almost from the beginning of the Zookeeper project, there have been repeated questions on the mailing about how to update several nodes at once.  The answer has always been to consolidate the structures that you need to update atomically into the contents of a single znode and then update those contents atomically using standard methods.  This update problem is often exacerbated by a desire to use ephemeral nodes so that state disappears correctly when a service crashes.

This is a pretty reasonable answer and it handle many important cases fairly well.  For instance, in the common case of servers that are assigned certain roles and then in turn advertise which roles they have successfully taken on, this pattern can be used by giving each server a single file of assignments and a single file of advertised capabilities.  Each of these files can be ephemerally created by the server so they will both vanish when the server disappears.  Assignments can be added atomically and only the server ever modifies the advertised roles.  Zookeeper works quite well in these cases.

Keeping track of a graph full of nodes with directional connections to other nodes is a good example of where Zookeeper's update model is not so nice.   In this case, nodes have a list of connections to other nodes and a list of connections from other nodes.  If any node has a list of outgoing connections that are not matched by the corresponding list of incoming connections on other nodes, then the graph is referentially inconsistent.  Keeping the graph consistent under changes is not possible with normal Zookeeper updates unless you store the entire graph in a single file.

Lazy Operations

The new multi() method allows such a data structure to be updated with strong guarantees of consistency.  This is done by currying the normal database mutation operators and then passing all of the resulting closures to Zookeeper at once for execution.  Obviously this requires a bit of syntactic sugar in languages like Java and C which don't like to express closures directly.

The way that this works is that there is a new class call Op.  There are sub-classes of Op called Op.Create, Op.SetData, Op.Check and Op.Delete that correspond to the operations  normally invoked by calling the similarly named methods on a ZooKeeper object.  In essence, these sub-classes of Op represent reified methods or closures that can be frozen at one point in time and then executed later.  These sub-classes are instantiated using factory methods on Op class.  The names and signatures of these factory methods mimic the corresponding methods on ZooKeeper.

Once you have all of the operations you would like to perform, you can execute them all in a single transaction using ZooKeeper.multi().  This will either execute all of the Op's or none of them.


An Example

I have placed an example program for doing a graph-based computation over on github.  This program builds a graph consisting of 32 nodes in the form of a 5-dimensional hyper-cube and then uses a numerical technique called over-relaxation to solve for the voltages that would result if all the links in the hyper-cube were replaced by unit resistors.  A picture of the graph is just to the right. In this graph, the voltage for node 0x1F is labeled as $V_5$ and the voltage for node 0x0 is labeled as $V_0$.  There are lots of connections and solving this problem analytically is difficult unless you twig to the trick of using symmetry.  Real-world networks generally don't exhibit such congenial properties and can be nearly impossible to solve analytically.

Normally, of course, we wouldn't actually use Zookeeper for this computation, but the point here isn't so much a realistic computation as a test-bed for the new multi() method and a demonstration of how the closure based style associated with multi-ops makes code easier to read and understand.

The Code


To read the code, start with the class ZGraphTest.  This is a JUnit test that drives all the rest of the code.  In this test, a graph is constructed (named zg in the code), nodes are connected in the pattern of a hyper-cube which means that nodes are labeled with integers from 0 to 31 and a node $n_1$ is connected to another node $n_2$ if $n_1$ and $n_2$ differ in exactly one bit.

In each iteration of the code, the ZGraph.reweight() method of the graph is called on a randomly selected node.  This sets the value at that node to be the average of the values at the neighbors of that node.  This computation converges geometrically to the correct value with errors decreasing by a factor of 5-10 with every 1000 iterations.  As the actual computation proceeds the error versus the theoretically known correct values is shown every 1000 iterations and  you can see the errors go to zero with 6 significant figures at about 7000 iterations.

Internally, a ZGraph keeps a connection to Zookeeper and the name of the root directory for all of the nodes in the graph.  Methods like connect(), reweight() and update() all work by defining lazy update or version check operations on Node objects and then executing all of the update or version checks at one time in a transaction.  For instance, in the reweight() method, this code gets the weight of all of the neighbors of node g1:

        double mean = 0;
        Set neighbors = g1.getIn();
        List ops = Lists.newArrayList();
        for (Integer neighbor : neighbors) {
          Node n = getNode(neighbor);
          ops.add(n.checkOp(root));
          mean += n.getWeight();
        }

As a side effect, we also collect a list of version check operations into the list ops.  Then in this code we add one more operation to set the weight on g1 and add the update operation to the list ops:

        // set up the update to the node itself
        g1.setWeight(mean / neighbors.size());
        ops.add(g1.updateOp(root));

Finally, multi() is called to actually do all of the version checks and updates that we have collected,

        zk.multi(ops);

The essential point here is how the list of operations was collected a bit at a time and then executed all at once.  The versions of the nodes that were inputs into the operation were collected in the form of check operations.  When those check operations are executed along with the update of g1, we guarantee that g1's new weight will accurately reflect the average of its neighbors and if somebody changes the value of any of the neighbors in between the time that we read the value and the time we actually do the update, we will run the update again.

Similarly, and closer to the idea of maintaining referential integrity, the ZGraph.connect() collects update operations for the two nodes being connected and executes both updates together to avoid the possibility that anyone would see a situation where one node connects to a second but the second doesn't have the reverse connection.  This code does the job,

        Node g1 = Node.readNode(zk, root, id1);
        Node g2 = Node.readNode(zk, root, id2);
        g1.connectTo(id2);
        g2.connectFrom(id1);
        zk.multi(Arrays.asList(g1.updateOp(root), g2.updateOp(root)), results);

Again, the closure passing style makes this operation very easy.

One additional point to be noted is that having each Node return closures for updates that can be executed in any context the caller would like has the effect of removing all direct Zookeeper operations from the Node other than for reading or creating a node.  It also removes all references to serialization of a Node from any outside code.  This simplifies both the caller and the Node because the Node doesn't have to implement all plausible combinations of multiple updates and the caller doesn't have to worry about any aspect of serialization and can focus on the appropriate task of arranging the transactional semantics for updates.


Credit Where Credit is Due

Of course, this new Zookeeper capability wouldn't be possible if the Apache Zookeeper project team hadn't built Zookeeper in a very flexible way in the first place.  Kudos to Ben Reed and Mahadev and the other early committers on the project.

Also, the actual Zookeeper-965 project would have stalled out if Marshall McMullen and Camille Fournier hadn't stepped in with a bit of extra momentum.  I had completed the wire protocols and all of the client side work, but Marshall did all of server side work and Camille provided really excellent advice about how the changes should be done.

The final credit goes to MapR Technologies for supporting open source by allowing capabilities like multi to be contributed back.

No comments: