Skip to content

Exercise: A buggy replicating key-value store¤

In this version of the replicating key-value store, we've added a bug as an exercise. Try to find the bug using compositional testing (don't use diff, it's too easy). Fix the bug and test again.

Bonus exercise¤

Change the interpretation of type key to give it a larger range (say, strbv[4]). Try testing isolates iso_prim and iso_sec. Do you still find the bug? If not, can you explain why not?

UNIX ONLY: At the end of the file, there is an isolate iso_sys that includes the complete implmentation (including both servers and the network). Try testing this isolate to see if you can find error. If not, can you explain why not?

Basic data types¤

As with he simple one-process server (see repstore1.ivy), we begin by declaring the data types needed to represent transactions and the messages that carry them.

The key and value types are uninterpreted for the moment.

type key
type value
ghost type txid

type request_kind = {write,read}

object request_body = {
    type t = struct {
        knd : request_kind,
        ky : key,
        vl : value
    }
}

object request = {
    type t = struct {
        tx : txid,
        bd : request_body.t
    }
}

object response_body = {
    type t = struct {
        vl : value
    }
}

object response = {
    type t = struct {
        tx : txid,
        bd : response_body.t
    }
}
The definition of a replica is also the same as before.

module replica = {
    function store(K:key) : value
    after init {
        store(K) := 0
    }
    action exec(inp : request_body.t) returns (out:response_body.t) = {
        if inp.knd = write {
            store(inp.ky) := inp.vl;
        }
        else if inp.knd = read {
            out.vl := store(inp.ky);
        }
    }
}
The first difference is that we now have two servers, so the enumerated type dest.t has two values, one for the primary and one for the secondary.

object dest = {
    ghost type t = {prim,sec}
}
Reference object


Our reference model is the same as before.

object ref = {
    action create(inp : request_body.t, dst:dest.t) returns (tx : txid)
    action commit(tx : txid,dst:dest.t)
    action eval(tx : txid) returns (res : response_body.t)

    instance rep : replica

    var next : txid
    function txs(X:txid) : request_body.t
    function txres(X:txid) : response_body.t
    relation committed(X:txid)

    after init {
        next := 0;
        committed(X) := false;
    }

    implement create {
        tx := next;
        txs(tx) := inp;
        next := next + 1;
    }

    implement commit {
        assert 0 <= tx & tx < next;
        assert ~committed(tx);
        txres(tx) := rep.exec(txs(tx));
        committed(tx) := true;
    }
    delegate commit

    implement eval {
        assert committed(tx);
        res := txres(tx);
    }
    delegate eval

    interpret txid -> int
}
This time, however, we'll add another reference object to keep track of the serialization order, so we can specify the interface between primary and secondary. It has one action serialize that indicates when a write transaction is serialized.

object ser = {

    action serialize(tx : txid)
The serializer object keeps track of which transactions have been serialized.

    relation serialized(X:txid)

    after init {
        serialized(X) := false;
    }
To serialzie a write, we must guarantee that transaction exists, and that it has not already been serialized. Further, we can only serialize write transactions.

    implement serialize {
        assert 0 <= tx & tx < ref.next;
        assert ~serialized(tx);
        assert ref.txs(tx).knd = write;
        serialized(tx) := true;
    }
    delegate serialize
Further,we specify that a write transaction cannot be committed until it is serialized.

    before ref.commit(tx : txid,dst:dest.t) {
        assert ref.txs(tx).knd = write -> serialized(tx);
    }
}
The implementation


Now we are ready to define our system implemention, consisting of client endpoints, the primary server and the secondary server.

Notice we include the tcp module here, since we will need it for the ordered channels between the servers.

include tcp
include udp
Again, we have an uninterpreted type of client ids, and a request message structure that encapsulates the request with its client id for routing the response.

type client_id

type req_msg = struct {
    cid : client_id,
    req : request.t
}
The client endpoint is the same as before, except that we now have two servers, so the client must decide which of the servers to send a request to.

module client(cid,prim_chan,sec_chan,cl_chans) = {

    action client_request(req : request_body.t, the_dst: dest.t)
    action client_response(req : response_body.t, tx : txid)
To generate a request, we build a request message and send it to the server. The server endpoint we send to is determined by the parameter the_dst.

    implement client_request {
        var m : req_msg;
        m.cid := cid;
        m.req.tx := ref.create(req,the_dst);
        m.req.bd := req;
        if the_dst = dest.prim {
            call prim_chan.send(m);
        } else {
            call sec_chan.send(m);
        }
    }
To handle a response from the server, we simply pass it to the client_response callback. The transaction id parameter is "ghost" and is only used for specification.

    implement cl_chans.recv(resp : response.t) {
        call client_response(resp.bd,resp.tx)
    }
}
The primary server module now has two additional parameters, for the forward channel (forwarding writes to the secondary) and the reverse channel (returning acks).

module primary_node(port, fwd_chan, rev_chan, cl_chans) = {
    instance rep : replica
Again, we have an endpoint for receiving requests from clients.

    instance req_chan : nondup_endpoint(port,req_msg)
We have to remember how many pending writes each key has. We use a map from keys to counters for this.

    instance counter : unbounded_sequence
    function pending(K:key) : counter.t
Initially, all the counters are zero.

    after init {
        pending(K) := 0;
    }
When receiving a request message from a client, the primary node must first check whether it is a read or a write. In the case of a read, we check if the key has a pending write. If not, we commit the read, execute it, and return the response to the client. If there is a pending write, we postpone the read by forwarding it to ourself. In case of a write, we serialize the write, forward it to the secondary, increment the pending count of the key, and finally execute it.

    implement req_chan.recv(inp : req_msg) {
        var rr := inp.req.bd;
        if rr.knd = read {
            if pending(rr.ky) = 0 {
                call ref.commit(inp.req.tx,dest.prim);
                var res : response.t;
                res.tx := inp.req.tx;
                res.bd := rep.exec(rr);
                call cl_chans(inp.cid).send(res)
            } else {
                call req_chan.send(inp);  # if cannot execute, recirculate
            }
        } else if rr.knd = write {
            call ser.serialize(inp.req.tx);           # this is ghost!
            call fwd_chan.send(inp);
            var res := rep.exec(inp.req.bd);
        }
    }
When we receive a write request on the acknowledgement channel, we decrement the pending count of the key.

    implement rev_chan.recv(inp : req_msg) {
        var rr := inp.req.bd;
        if rr.knd = write {
            pending(rr.ky) := pending(rr.ky).prev;
        }
    }
}
The secondary server handles only read request from clients. Since the secondary's replica refelects only committed writes, we cann immediately response to a read. We commit the read, build a response message by executing the read on the replica, and send the response back to the client.

module secondary_node(port, fwd_chan, rev_chan, cl_chans) = {
    instance rep : replica

    instance req_chan : nondup_endpoint(port,req_msg)

    implement req_chan.recv(inp : req_msg) {
        var rr := inp.req.bd;
        if rr.knd = read {
            var res : response.t;
            res.tx := inp.req.tx;
            res.bd := rep.exec(rr);
            call cl_chans(inp.cid).send(res);
        }
ignore writes!
    }
When the secondary receives a forwarded write from the primary, it commits it, executes it, and sends the response to the client.

    implement fwd_chan.recv(inp : req_msg) {
        var res : response.t;
        res.tx := inp.req.tx;
        res.bd := rep.exec(inp.req.bd);
        call cl_chans(inp.cid).send(res);
        call rev_chan.send(inp);
    }

}
The plumbing


Now we two servers, a primary and a secondary, and an array of client endpoints, connecting them up with network channels.

We have two TCP (ordered) channels beteween the primary and secondary, one forward (listening on port 44090) and one reverse (listening on port 44091). Both carry request messages.

instance fwd_chan : tcp_channel("localhost:44090",req_msg)
instance rev_chan : tcp_channel("localhost:44091",req_msg)
As before, we use a parameterized array of (unordered) endpoints for the clients called cl_chans. The endpoints will use a range of port numbers beginning with 441000.

instance cl_chans : nondup_endpoint_set(client_id,44100,response.t)
We create a corresponding array of clients.
instance cl(X:client_id) : client(X,prim.req_chan,sec.req_chan,cl_chans)
We create the primary and secondary servers, connecting them with channels. The port numbers are for the client request channels of the servers.

instance prim : primary_node(44200,fwd_chan.sndr,rev_chan.rcvr,cl_chans)
instance sec : secondary_node(44201,fwd_chan.rcvr,rev_chan.sndr,cl_chans)
The interface specifications


The service specification is unchanged from the simple sequential server.

object service_spec = {

    relation responded(X:txid)

    after init {
        responded(X) := false;
    }

    before cl_chans.send(p : client_id, m : response.t) {
        assert ~responded(m.tx);
        assert m.bd = ref.eval(m.tx);
        responded(m.tx) := true;
    }
}
Now, however, we need a specification for the interface between the primary and the secondary. This will allow us to test the two servers in isolation. We call this specification mid_spec. The state of this interface keeps a FIFO queue of messages that have been serialized but not yet transmitted over the interface. When a message is serialize, we insert it in this queue

object mid_spec = {
    instance queue : unbounded_queue(txid)

    after ser.serialize(tx:txid) {
        call queue.push(tx);
    }
When a message is received at the secondary on the forward channel, its contents must be correct for the given transaction id. Moreover, the transaction id must be the next on in the FIFO queue (in other words, the writes must arrive in serialization order). To make things a little easier, we commit the message when it is received. This gurantees that commits from the secondary always occur in order, without our having to explicitly specify that. Because the commit operation is "ghost", it doesn't matter whether we execute in implementation or specification code. We can put it wherever it is most convenient.

    before fwd_chan.rcvr.recv(inp : req_msg) {
        assert inp.req.bd = ref.txs(inp.req.tx);
        assert inp.req.tx = queue.pop;
        call ref.commit(inp.req.tx,dest.sec);
    }
This tells IVy that the above specification is a guarantee for the primary server, not for the network, which is the actual caller of fwd_chan.rcvr.recv. This means that the assertions will be verified when we test the primary. Usually, IVy's default assignment of guarantees to object is what we want, but in this case the caller is not the guarantor of the assertions.

    delegate fwd_chan_rcvr_recv[before] -> prim
The interface also requires that a given write message be acked at most once. Otherwise, the pending counts in the primary will be wrong. To ensure this, we maintain a set acked of acknowledgef transactions.

    relation acked(X:txid)

    after init {
        acked(X) := false;
    }
When an ack message is sent on the reverse channel, must have the correct body, it must have been committed (this is the meaning of the message) and it must not already be acked. We record the fact that it is now acked.

    before rev_chan.sndr.send(inp : req_msg) {
        assert ref.txs(inp.req.tx) = inp.req.bd;
        assert ref.committed(inp.req.tx);
        assert ~acked(inp.req.tx);
        acked(inp.req.tx) := true;
    }
When a request is created, we record the destination server. In this way we can determine which server is allowed to commit the request. This is a very common situation in specifications of distributed services. Much of the interface state tends to be related to the question of who is allowed to commit shich transations.

    function req_dest(X:txid) : dest.t

    after ref.create(inp : request_body.t, dst:dest.t) returns (tx : txid) {
        req_dest(tx) := dst
    }
We now specify that all reads must be commited by the server they are addressed to, while all writes are committed by the secondary

    before ref.commit(tx : txid,dst:dest.t) {
        assert ref.txs(tx).knd = read -> req_dest(tx) = dst;
        assert ref.txs(tx).knd = write -> dst = dest.sec
    }
Commit immediately all read requests from client to secondary

    before sec.req_chan.recv(inp : req_msg) {
        if inp.req.bd.knd = read {
                call ref.commit(inp.req.tx,dest.sec)
        }
    }

}
We export/import our API.

export cl.client_request
import cl.client_response
Finally, we create two isolates the allow us to verify our two servers in isolation. Each isolate verifies just the the guarentees of given server, either the primary or secondary. In both cases we use our interface specifications and our reference object. The other server and the network are abstracted away. Their role is played by the test generator. Because we placed our interface specification mid_spec at the secondary end of the channels fwd_chan and rev_chan, those channels are used in the primary's isolate, but not in the secondary's. Notice also that the client endpoints cl are present in both isolates. Since they are trivial "stubs", there is no reason to separate them into their own isolate.

trusted isolate iso_prim = prim with cl,cl_chans,ref,ser,service_spec,mid_spec,fwd_chan,rev_chan
trusted isolate iso_sec = sec with cl,cl_chans,ref,ser,service_spec,mid_spec
Here is an extra isolate you can use for integration testing:

trusted isolate iso_sys = this
As before, we have to give concrete interpretations for the abstract types in order to test.

interpret txid -> int
interpret key -> strbv[1]
interpret value -> bv[16]
interpret client_id -> bv[1]
The source file for this example is here. Try running this example using commands like this:

$ ivy_to_cpp target=test isolate=iso_prim build=true repstore2.ivy
$ ./repstore1 iters=100 runs=10 out=file.iev

This tests the primary server. Look at the trace file file.iev and notice which actions are generated by the tester and which by the code.

To test the secondary, use these commands:

$ ivy_to_cpp target=test isolate=iso_sec build=true repstore2.ivy
$ ./repstore1 iters=100 runs=10 out=file.iev

Again, see what actions the tester is generating. Try putting some errors in this example and see if they produce assertions failures. If not, why not?