Skip to content

Replicating key-value storeยค

Now we will consider a distributed implementation of the key/value store. This version is replicating. We will have two server processes, a primary and a secondary. Each keeps a complete replica of the store. All write requsts go first to the primary and then are passed along to the secondary. Read requests, on the other hand, may be sent to either server. We will assume that the channel between the primary and secondary is ordered.

Now we face a question: since transactions happen in parallel, and no one server sees all the transactions, how do we define commit points? At first we might imagine committing the write transactions when they execute on the primary, and the reads transactions on whichever server they arrive at. Clearly, this won't work, however. Suppose a write has been committed on the primary and is on its way to the secondary. If the secondary now commits a read for the same key, it will return a stale value for that read. The situation is reversed if we commit writes when they execute at the secondary, since a read on the primary could returna "future" value.

In fact, there is no easy way out of this dilemma. If we don't add some additional concurrency control, we will necessarily violate linearizability. To fix the problem, we will add a reverse channel from the secondary to the primary to acknowldege when writes have been committed on the secondary. If there is a pending, unacknowledged write on the primary, it must wait to serve any reads for that key until write is acknowledged.

The key property we must then maintain on the channel is that writes occur in the same order in which they are executed on the primary. This does not mean that forwarding of a write cannot occur before the actual execution on the primrary. It just means the order must be preserved. We will call this the serialization order. Thus, writes are serialized on the primary (meaning their order is determined) but committed on the secondary).

Using this specification, we can test the primary and secondary servers in isolation, and if they are correct according to their assume.guarantee specifications, we can conclude that the together provide a correct service according to our reference object,

We include the tcp module here, since we will need it for the ordered channels between the servers.

include tcp
include udp
Basic data types


As with he simple one-process server (see repstore1.ivy), we begin by declaring the data types needed to represent transactions and the messages that carry them.

The key and value types are uninterpreted for the moment.

type key
type value
ghost type txid

type request_kind = {write,read}

object request_body = {
    type t = struct {
    knd : request_kind,
    ky : key,
    vl : value
    }
}

object request = {
    type t = struct {
    tx : txid,
    bd : request_body.t
    }
}

object response_body = {
    type t = struct {
    vl : value
    }
}

object response = {
    type t = struct {
    tx : txid,
    bd : response_body.t
    }
}
The definition of a replica is also the same as before.

module replica = {
    function store(K:key) : value
    after init {
    store(K) := 0
    }
    action exec(inp : request_body.t) returns (out:response_body.t) = {
    if inp.knd = write {
        store(inp.ky) := inp.vl;
    }
    else if inp.knd = read {
        out.vl := store(inp.ky);
    }
    }
}
The first difference is that we now have two servers, so the enumerated type dest.t has two values, one for the primary and one for the secondary.

object dest = {
    ghost type t = {prim,sec}
}
Reference object


Our reference model is the same as before.

object ref = {
    action create(inp : request_body.t, dst:dest.t) returns (tx : txid)
    action commit(tx : txid,dst:dest.t)
    action eval(tx : txid) returns (res : response_body.t)

    instance rep : replica

    individual next : txid
    function txs(X:txid) : request_body.t
    function txres(X:txid) : response_body.t
    relation committed(X:txid)

    after init {
    next := 0;
    committed(X) := false;
    }

    implement create {
    tx := next;
    txs(tx) := inp;
    next := next + 1;
    }

    implement commit {
    assert 0 <= tx & tx < next;
    assert ~committed(tx);
    txres(tx) := rep.exec(txs(tx));
    committed(tx) := true;
    }
    delegate commit

    implement eval {
    assert committed(tx);
    res := txres(tx);
    }
    delegate eval

    interpret txid -> int
}
This time, however, we'll add another reference object to keep track of the serialization order, so we can specify the interface between primary and secondary. It has one action serialize that indicates when a write transaction is serialized.

object ser = {

    action serialize(tx : txid)
The serializer object keeps track of which transactions have been serialized.

    relation serialized(X:txid)

    after init {
    serialized(X) := false;
    }
The serializer also keeps track of the order serialization in a queue

    instance queue : unbounded_queue(txid)
To serialzie a write, we must guarantee that transaction exists, and that it has not already been serialized. Further, we can only serialize write transactions.

    implement serialize {
    assert 0 <= tx & tx < ref.next;
    assert ~serialized(tx);
    assert ref.txs(tx).knd = write;
    serialized(tx) := true;
    call queue.push(tx);
    }
    delegate serialize
Further,we specify that a write transactions must be committed in the order in which they are serialized.

    before ref.commit(tx : txid,dst:dest.t) {
    if ref.txs(tx).knd = write {
        assert tx = queue.pop
    }
    }
}
The implementation


Now we are ready to define our system implemention, consisting of client endpoints, the primary server and the secondary server.

Again, we have an uninterpreted type of client ids, and a request message structure that encapsulates the request with its client id for routing the response.

type client_id

type req_msg = struct {
    cid : client_id,
    req : request.t
}
The client endpoint is the same as before, except that we now have two servers, so the client must decide which of the servers to send a request to.

module client(cid,prim_chan,sec_chan,cl_chans) = {

    action client_request(req : request_body.t, the_dst: dest.t)
    action client_response(req : response_body.t, tx : txid)
To generate a request, we build a request message and send it to the server. The server endpoint we send to is determined by the parameter the_dst.

    implement client_request {
    var m : req_msg;
    m.cid := cid;
    m.req.tx := ref.create(req,the_dst);
    m.req.bd := req;
    if the_dst = dest.prim {
        call prim_chan.send(m);
    } else {
        call sec_chan.send(m);
    }
    }
To handle a response from the server, we simply pass it to the client_response callback. The transaction id parameter is "ghost" and is only used for specification.

    implement cl_chans.recv(resp : response.t) {
    call client_response(resp.bd,resp.tx)
    }
}
The primary server module now has two additional parameters, for the forward channel (forwarding writes to the secondary) and the reverse channel (returning acks).

module primary_node(port, fwd_chan, rev_chan, cl_chans) = {
    instance rep : replica
Again, we have an endpoint for receiving requests from clients.

    instance req_chan : nondup_endpoint(port,req_msg)
We have to remember how many pending writes each key has. We use a map from keys to counters for this.

    instance counter : unbounded_sequence
    function pending(K:key) : counter.t
Initially, all the counters are zero.

    after init {
    pending(K) := 0;
    }
When receiving a request message from a client, the primary node must first check whether it is a read or a write. In the case of a read, we check if the key has a pending write. If not, we commit the read, execute it, and return the response to the client. If there is a pending write, we postpone the read by forwarding it to ourself. In case of a write, we serialize the write, forward it to the secondary, increment the pending count of the key, and finally execute it.

    implement req_chan.recv(inp : req_msg) {
    var rr := inp.req.bd;
        if rr.knd = read {
        if pending(rr.ky) = 0 {
        call ref.commit(inp.req.tx,dest.prim);
        var res : response.t;
        res.tx := inp.req.tx;
        res.bd := rep.exec(rr);
        call cl_chans(inp.cid).send(res)
        } else {
                call req_chan.send(inp);  # if cannot execute, recirculate
        }
    } else if rr.knd = write {
        call ser.serialize(inp.req.tx);           # this is ghost!
        call fwd_chan.send(inp);
        pending(rr.ky) := pending(rr.ky).next;
        var res := rep.exec(inp.req.bd);
    }
    }
When we receive a write request on the acknowledgement channel, we decrement the pending count of the key.

    implement rev_chan.recv(inp : req_msg) {
    var rr := inp.req.bd;
    if rr.knd = write {
        pending(rr.ky) := pending(rr.ky).prev;
    }
    }
}
The secondary server handles only read request from clients. Since the secondary's replica refelects only committed writes, we cann immediately response to a read. We commit the read, build a response message by executing the read on the replica, and send the response back to the client.

module secondary_node(port, fwd_chan, rev_chan, cl_chans) = {
    instance rep : replica

    instance req_chan : nondup_endpoint(port,req_msg)

    implement req_chan.recv(inp : req_msg) {
    var rr := inp.req.bd;
        if rr.knd = read {
        var res : response.t;
        res.tx := inp.req.tx;
        res.bd := rep.exec(rr);
            call ref.commit(inp.req.tx,dest.sec);
        call cl_chans(inp.cid).send(res);
    }
ignore writes!
    }
When the secondary receives a forwarded write from the primary, it commits it, executes it, and sends the response to the client.

    implement fwd_chan.recv(inp : req_msg) {
    var res : response.t;
    res.tx := inp.req.tx;
    res.bd := rep.exec(inp.req.bd);
        call ref.commit(inp.req.tx,dest.sec);
    call cl_chans(inp.cid).send(res);
    call rev_chan.send(inp);
    }

}
The plumbing


Now we two servers, a primary and a secondary, and an array of client endpoints, connecting them up with network channels.

We have two TCP (ordered) channels beteween the primary and secondary, one forward (listening on port 44090) and one reverse (listening on port 44091). Both carry request messages.

instance fwd_chan : tcp_channel("localhost:44090",req_msg)
instance rev_chan : tcp_channel("localhost:44091",req_msg)
As before, we use a parameterized array of (unordered) endpoints for the clients called cl_chans. The endpoints will use a range of port numbers beginning with 441000.

instance cl_chans : nondup_endpoint_set(client_id,44100,response.t)
We create a corresponding array of clients.
instance cl(X:client_id) : client(X,prim.req_chan,sec.req_chan,cl_chans)
We create the primary and secondary servers, connecting them with channels. The port numbers are for the client request channels of the servers.

instance prim : primary_node(44200,fwd_chan.sndr,rev_chan.rcvr,cl_chans)
instance sec : secondary_node(44201,fwd_chan.rcvr,rev_chan.sndr,cl_chans)
The interface specifications


The service specification is unchanged from the simple sequential server.

object service_spec = {

    relation responded(X:txid)

    after init {
    responded(X) := false;
    }

    before cl_chans.send(p : client_id, m : response.t) {
    assert ~responded(m.tx);
    assert m.bd = ref.eval(m.tx);
    responded(m.tx) := true;
    }
}
Now, however, we need a specification for the interface between the primary and the secondary. This will allow us to test the two servers in isolation. We call this specification mid_spec. The state of this interface keeps a FIFO queue of messages that have been serialized but not yet transmitted over the interface. When a message is serialize, we insert it in this queue

object mid_spec = {
    instance queue : unbounded_queue(txid)

    after ser.serialize(tx:txid) {
    call queue.push(tx);
    }
When a message is received at the secondary on the forward channel, its contents must be correct for the given transaction id. Moreover, the transaction id must be the next on in the FIFO queue (in other words, the writes must arrive in serialization order).

    before fwd_chan.rcvr.recv(inp : req_msg) {
    assert inp.req.bd = ref.txs(inp.req.tx);
    assert inp.req.tx = queue.pop;
    }
This tells IVy that the above specification is a guarantee for the primary server, not for the network, which is the actual caller of fwd_chan.rcvr.recv. This means that the assertions will be verified when we test the primary. Usually, IVy's default assignment of guarantees to object is what we want, but in this case the caller is not the guarantor of the assertions.

    delegate fwd_chan_rcvr_recv[before] -> prim
The interface also requires that a given write message be acked at most once. Otherwise, the pending counts in the primary will be wrong. To ensure this, we maintain a set acked of acknowledgef transactions.

    relation acked(X:txid)

    after init {
    acked(X) := false;
    }
When an ack message is sent on the reverse channel, it must have the correct body, must have been committed (this is the meaning of the message) and it must not already be acked. We record the fact that it is now acked.

    before rev_chan.sndr.send(inp : req_msg) {
    assert ref.committed(inp.req.tx);
    assert ref.txs(inp.req.tx) = inp.req.bd;
    assert ~acked(inp.req.tx);
    acked(inp.req.tx) := true;
    }
When a request is created, we record the destination server. In this way we can determine which server is allowed to commit the request. This is a very common situation in specifications of distributed services. Much of the interface state tends to be related to the question of who is allowed to commit which transations.

    function req_dest(X:txid) : dest.t

    after ref.create(inp : request_body.t, dst:dest.t) returns (tx : txid) {
    req_dest(tx) := dst
    }
We now specify that all reads must be commited by the server they are addressed to, while all writes are committed by the secondary

    before ref.commit(tx : txid,dst:dest.t) {
    assert ref.txs(tx).knd = read -> req_dest(tx) = dst;
    assert ref.txs(tx).knd = write -> dst = dest.sec
    }

}
We export/import our API.

export cl.client_request
import cl.client_response
Finally, we create two isolates the allow us to verify our two servers in isolation. Each isolate verifies just the the guarentees of given server, either the primary or secondary. In both cases we use our interface specifications and our reference object. The other server and the network are abstracted away. Their role is played by the test generator. Because we placed our interface specification mid_spec at the secondary end of the channels fwd_chan and rev_chan, those channels are used in the primary's isolate, but not in the secondary's. Notice also that the client endpoints cl are present in both isolates. Since they are trivial "stubs", there is no reason to separate them into their own isolate.

trusted isolate iso_prim = prim with cl,cl_chans,ref,ser,service_spec,mid_spec,fwd_chan,rev_chan
trusted isolate iso_sec = sec with cl,cl_chans,ref,ser,service_spec,mid_spec
As before, we have to give concrete interpretations for the abstarct types in order to test.

interpret txid -> int
interpret key -> strbv[1]
interpret value -> bv[16]
interpret client_id -> bv[1]
The source file for this example is here. Try running this example using commands like this:

$ ivy_to_cpp target=test isolate=iso_prim build=true repstore2.ivy
$ ./repstore1 iters=100 runs=10 out=file.iev

This tests the primary server. Look at the trace file file.iev and notice which actions are generated by the tester and which by the code.

To test the secondary, use these commands:

$ ivy_to_cpp target=test isolate=iso_sec build=true repstore2.ivy
$ ./repstore1 iters=100 runs=10 out=file.iev

Again, see what actions the tester is generating. Try putting some errors in this example and see if they produce assertions failures. If not, why not?