Skip to content

Bonus exercise: Replicating key-value store with fail-over¤

To make things more interesting, we will try to make our replicating key-value store handle fail-over. To make things easier, however, we'll consider only the case of failure of the secondary, and we'll assume that when the secondary fails, it sends a message to the primary and stops. In reality, we would need a master server to monitor secondary and manage the fail-over. Also, should inform the client endpoints that the secondary has failed, but we won't bother.

Have a look at the code below to see what changes were made. Sadly, our attempt has gone wrong and our test of the primary server is giving an assertion failure. Try testing the primary and see if you can figure out what is going wrong. How could we change the mid_spec interface specification to fix this problem?

Bonus bonus exercise: look at the end of this page for some hints you can use to fix the spec. Can you fix it?

Basic data types¤

These are the same as beore.

type key
type value
ghost type txid

type request_kind = {write,read}

object request_body = {
    type t = struct {
        knd : request_kind,
        ky : key,
        vl : value
    }
}

object request = {
    type t = struct {
        tx : txid,
        bd : request_body.t
    }
}

object response_body = {
    type t = struct {
        vl : value
    }
}

object response = {
    type t = struct {
        tx : txid,
        bd : response_body.t
    }
}
The definition of a replica is also the same as before.

module replica = {
    function store(K:key) : value
    after init {
        store(K) := 0
    }
    action exec(inp : request_body.t) returns (out:response_body.t) = {
        if inp.knd = write {
            store(inp.ky) := inp.vl;
        }
        else if inp.knd = read {
            out.vl := store(inp.ky);
        }
    }
}


object dest = {
    ghost type t = {prim,sec}
}
Reference object


Our reference model is the same as before.

object ref = {
    action create(inp : request_body.t, dst:dest.t) returns (tx : txid)
    action commit(tx : txid,dst:dest.t)
    action eval(tx : txid) returns (res : response_body.t)

    instance rep : replica

    var next : txid
    function txs(X:txid) : request_body.t
    function txres(X:txid) : response_body.t
    relation committed(X:txid)

    after init {
        next := 0;
        committed(X) := false;
    }

    implement create {
        tx := next;
        txs(tx) := inp;
        next := next + 1;
    }

    implement commit {
        assert 0 <= tx & tx < next;
        assert ~committed(tx);
        txres(tx) := rep.exec(txs(tx));
        committed(tx) := true;
    }
    delegate commit

    implement eval {
        assert committed(tx);
        res := txres(tx);
    }
    delegate eval

    interpret txid -> int
}
The serialization object is the same as before

object ser = {

    action serialize(tx : txid)
The serializer object keeps track of which transactions have been serialized.

    relation serialized(X:txid)

    after init {
        serialized(X) := false;
    }
To serialzie a write, we must guarantee that transaction exists, and that it has not already been serialized. Further, we can only serialize write transactions.

    implement serialize {
        assert 0 <= tx & tx < ref.next;
        assert ~serialized(tx);
        assert ref.txs(tx).knd = write;
        serialized(tx) := true;
    }
    delegate serialize
Further,we specify that a write transaction cannot be committed until it is serialized.

    before ref.commit(tx : txid,dst:dest.t) {
        assert ref.txs(tx).knd = write -> serialized(tx);
    }
}
The implementation


include tcp
include udp
Our client endpoints are unchanged

type client_id

type req_msg = struct {
    cid : client_id,
    req : request.t
}

module client(cid,prim_chan,sec_chan,cl_chans) = {

    action client_request(req : request_body.t, the_dst: dest.t)
    action client_response(req : response_body.t, tx : txid)

    implement client_request {
        var m : req_msg;
        m.cid := cid;
        m.req.tx := ref.create(req,the_dst);
        m.req.bd := req;
        if the_dst = dest.prim {
            call prim_chan.send(m);
        } else {
            call sec_chan.send(m);
        }
    }

    implement cl_chans.recv(resp : response.t) {
        call client_response(resp.bd,resp.tx)
    }
}
Our primary server has a new endpoint fail_chan that listens for a failure message from the secondary. The message is an empty struct. We keep a boolean sec_failed to indicate the secondary has failed.

type fail_msg = struct {}

module primary_node(port, fwd_chan, rev_chan, cl_chans) = {
    instance rep : replica

    instance req_chan : nondup_endpoint(port,req_msg)

    instance fail_chan : nondup_endpoint(port,fail_msg)

    instance counter : unbounded_sequence
    function pending(K:key) : counter.t

    var sec_failed : bool

    after init {
        pending(K) := 0;
        sec_failed := false;
    }
If we receive a failure message, set sec_failed.

    implement fail_chan.recv(inp : fail_msg) {
        sec_failed := true;
    }
We no longer foward write requests if the secondary has failed, and we no longer wait for acks.

    implement req_chan.recv(inp : req_msg) {
        var rr := inp.req.bd;
        if rr.knd = read {
            if pending(rr.ky) = 0 | sec_failed {
                call ref.commit(inp.req.tx,dest.prim);
                var res : response.t;
                res.tx := inp.req.tx;
                res.bd := rep.exec(rr);
                call cl_chans(inp.cid).send(res)
            } else {
                call req_chan.send(inp);  # if cannot execute, recirculate
            }
        } else if rr.knd = write {
            call ser.serialize(inp.req.tx);           # this is ghost!
            call fwd_chan.send(inp);
            if ~sec_failed {
                pending(rr.ky) := pending(rr.ky).next;
                var res := rep.exec(inp.req.bd);
            }
        }
    }
When we receive a write request on the acknowledgement channel, we decrement the pending count of the key.

    implement rev_chan.recv(inp : req_msg) {
        var rr := inp.req.bd;
        if rr.knd = write {
            pending(rr.ky) := pending(rr.ky).prev;
        }
    }
}
The secondary receives a signal fail from the environment indicating it should fail. When this happens it sends a fail message to the primary and shuts down.

module secondary_node(port, fwd_chan, rev_chan, fail_chan, cl_chans) = {
    instance rep : replica

    instance req_chan : nondup_endpoint(port,req_msg)

    var failed : bool

    after init {
        failed := false;
    }

    action fail = {
        failed := true;
        var m : fail_msg;
        call fail_chan.send(m);
    }

    implement req_chan.recv(inp : req_msg) {
        if ~failed {
            var rr := inp.req.bd;
            if rr.knd = read {
                call ref.commit(inp.req.tx,dest.sec);
                var res : response.t;
                res.tx := inp.req.tx;
                res.bd := rep.exec(rr);
                call cl_chans(inp.cid).send(res);
            }
ignore writes!
        }
    }
When the secondary receives a forwarded write from the primary, it commits it, executes it, and sends the response to the client.

    implement fwd_chan.recv(inp : req_msg) {
        if ~failed {
            var res : response.t;
            res.tx := inp.req.tx;
            res.bd := rep.exec(inp.req.bd);
            call cl_chans(inp.cid).send(res);
            call rev_chan.send(inp);
        }
    }

}
The plumbing


instance fwd_chan : tcp_channel("localhost:44090",req_msg)
instance rev_chan : tcp_channel("localhost:44091",req_msg)
The client channels

instance cl_chans : nondup_endpoint_set(client_id,44100,response.t)
The clients

instance cl(X:client_id) : client(X,prim.req_chan,sec.req_chan,cl_chans)
The primary and secondary servers. Notice we pass the failure channel as a parameter to the secondary.

instance prim : primary_node(44200,fwd_chan.sndr,rev_chan.rcvr,cl_chans)
instance sec : secondary_node(44201,fwd_chan.rcvr,rev_chan.sndr,prim.fail_chan,cl_chans)
The interface specifications


The service specification is the same.

object service_spec = {

    relation responded(X:txid)

    after init {
        responded(X) := false;
    }

    before cl_chans.send(p : client_id, m : response.t) {
        assert ~responded(m.tx);
        assert m.bd = ref.eval(m.tx);
        responded(m.tx) := true;
    }
}
The interface between primary and secondary is changed a bit to reflect the failure channel.

object mid_spec = {
    instance queue : unbounded_queue(txid)

    before fwd_chan.rcvr.recv(inp : req_msg) {
        assert inp.req.bd = ref.txs(inp.req.tx);
        assert inp.req.tx = queue.pop;
        call ref.commit(inp.req.tx,dest.sec);
    }

    delegate fwd_chan_rcvr_recv[before] -> prim

    relation acked(X:txid)
We add a flag to record when the secondary has sent a fail message.

    var sec_failed : bool

    after init {
        acked(X) := false;
        sec_failed := false;
    }

    before rev_chan.sndr.send(inp : req_msg) {
        assert ref.committed(inp.req.tx);
        assert ref.txs(inp.req.tx) = inp.req.bd;
        assert ~acked(inp.req.tx);
        acked(inp.req.tx) := true;
    }
If the secondary has failed, then write transactions are commited at the moment they are serialized.

    after ser.serialize(tx:txid) {
        call queue.push(tx);
        if sec_failed {
            call ref.commit(tx)
        }
    }

    function req_dest(X:txid) : dest.t

    after ref.create(inp : request_body.t, dst:dest.t) returns (tx : txid) {
        req_dest(tx) := dst
    }
Here, we set the sec_failed flag.

    after prim.fail_chan.send(m : fail_msg) {
        sec_failed := true;
    }
After a failure of the secondary, the primary is allowed to commit writes and the secondary is not allowed to commit anything.

    before ref.commit(tx : txid,dst:dest.t) {
        assert ref.txs(tx).knd = read -> req_dest(tx) = dst;
        if sec_failed {
            assert dst = dst.prim
        } else {
            assert ref.txs(tx).knd = write -> dst = dest.sec
        }
    }


}
We export/import our API.

export cl.client_request
import cl.client_response
Our isolates are as before

trusted isolate iso_prim = prim with cl,cl_chans,ref,ser,service_spec,mid_spec,fwd_chan,rev_chan
trusted isolate iso_sec = sec with cl,cl_chans,ref,ser,service_spec,mid_spec
As before, we have to give concrete interpretations for the abstarct types in order to test.

interpret txid -> int
interpret key -> strbv[1]
interpret value -> bv[16]
interpret client_id -> bv[1]

Exercise hints¤

You'll need to know some things about the Ivy language to fix this problem.

First, you can write a while loop in IVy like this:

while <condition> { <action> }

Also, you can test whether a queue q is empty with q.empty. Try using a queue to keep track of pending transaction commits and perform them at the right moment.