Skip to content

Solution: Replicating key-value store with fail-over¤

Here is one possible fix for the interface specification problem in the replicating key-value store with fail-over.

The problem is the when the secondary fails, the replica on the primary reflects writes that have been serialized but not committed by the secondary. That means that any reads on the primary to these keys must wait for commit events that will never occur, since the secondary has failed. Our primary doesn't wait, and and thus the reads may return "future" values (i.e., values that reflect writes that have not committed).

One possible solution top this is to commit all of the pending writes at the moment when the the fail message is sent. This is done below in the definiton of mid_spec (see object solution).

Basic data types¤

These are the same as beore.

type key
type value
ghost type txid

type request_kind = {write,read}

object request_body = {
    type t = struct {
        knd : request_kind,
        ky : key,
        vl : value
    }
}

object request = {
    type t = struct {
        tx : txid,
        bd : request_body.t
    }
}

object response_body = {
    type t = struct {
        vl : value
    }
}

object response = {
    type t = struct {
        tx : txid,
        bd : response_body.t
    }
}
The definition of a replica is also the same as before.

module replica = {
    function store(K:key) : value
    after init {
        store(K) := 0
    }
    action exec(inp : request_body.t) returns (out:response_body.t) = {
        if inp.knd = write {
            store(inp.ky) := inp.vl;
        }
        else if inp.knd = read {
            out.vl := store(inp.ky);
        }
    }
}


object dest = {
    ghost type t = {prim,sec}
}
Reference object


Our reference model is the same as before.

object ref = {
    action create(inp : request_body.t, dst:dest.t) returns (tx : txid)
    action commit(tx : txid,dst:dest.t)
    action eval(tx : txid) returns (res : response_body.t)

    instance rep : replica

    var next : txid
    function txs(X:txid) : request_body.t
    function txres(X:txid) : response_body.t
    relation committed(X:txid)

    after init {
        next := 0;
        committed(X) := false;
    }

    implement create {
        tx := next;
        txs(tx) := inp;
        next := next + 1;
    }

    implement commit {
        assert 0 <= tx & tx < next;
        assert ~committed(tx);
        txres(tx) := rep.exec(txs(tx));
        committed(tx) := true;
    }
    delegate commit

    implement eval {
        assert committed(tx);
        res := txres(tx);
    }
    delegate eval

    interpret txid -> int
}
The serialization object is the same as before

object ser = {

    action serialize(tx : txid)
The serializer object keeps track of which transactions have been serialized.

    relation serialized(X:txid)

    after init {
        serialized(X) := false;
    }
To serialzie a write, we must guarantee that transaction exists, and that it has not already been serialized. Further, we can only serialize write transactions.

    implement serialize {
        assert 0 <= tx & tx < ref.next;
        assert ~serialized(tx);
        assert ref.txs(tx).knd = write;
        serialized(tx) := true;
    }
    delegate serialize
Further,we specify that a write transaction cannot be committed until it is serialized.

    before ref.commit(tx : txid,dst:dest.t) {
        assert ref.txs(tx).knd = write -> serialized(tx);
    }
}
The implementation


include tcp
include udp
Our client endpoints are unchanged

type client_id

object req_msg = {
    type t = struct {
        cid : client_id,
        req : request.t
    }
}

module client(cid,prim_chan,sec_chan,cl_chans) = {

    action client_request(the_req : request_body.t, the_dst: dest.t)
    action client_response(resp : response_body.t, tx : txid)

    implement client_request {
        var m : req_msg.t;
        m.cid := cid;
        m.req.tx := ref.create(the_req,the_dst);
        m.req.bd := the_req;
        if the_dst = dest.prim {
            call prim_chan.send(m);
        } else {
            call sec_chan.send(m);
        }
    }

    implement cl_chans.recv(resp : response.t) {
        call client_response(resp.bd,resp.tx)
    }
}
Our primary server has a new endpoint fail_chan that listens for a failure message from the secondary. The message is an empty struct. We keep a boolean sec_failed to indicate the secondary has failed.

type fail_msg = struct {}

module primary_node(port, fwd_chan, rev_chan, cl_chans) = {
    instance rep : replica

    instance req_chan : nondup_endpoint(port,req_msg.t)

    instance fail_chan : nondup_endpoint(port,fail_msg)

    instance counter : unbounded_sequence
    function pending(K:key) : counter.t

    var sec_failed : bool

    after init {
        pending(K) := 0;
        sec_failed := false;
    }
If we receive a failure message, set sec_failed.

    implement fail_chan.recv(inp : fail_msg) {
        sec_failed := true;
    }
We no longer foward write requests if the secondary has failed, and we no longer wait for acks.

    implement req_chan.recv(inp : req_msg.t) {
        var rr := inp.req.bd;
        if rr.knd = read {
            if pending(rr.ky) = 0 | sec_failed {
                call ref.commit(inp.req.tx,dest.prim);
                var res : response.t;
                res.tx := inp.req.tx;
                res.bd := rep.exec(rr);
                call cl_chans(inp.cid).send(res)
            } else {
                call req_chan.send(inp);  # if cannot execute, recirculate
            }
        } else if rr.knd = write {
            call ser.serialize(inp.req.tx);           # this is ghost!
            if ~sec_failed {
                call fwd_chan.send(inp);
                pending(rr.ky) := pending(rr.ky).next;
            };
            var res := rep.exec(inp.req.bd);
        }
    }
When we receive a write request on the acknowledgement channel, we decrement the pending count of the key.

    implement rev_chan.recv(inp : req_msg.t) {
        var rr := inp.req.bd;
        if rr.knd = write {
            pending(rr.ky) := pending(rr.ky).prev;
        }
    }
}
The secondary receives a signal fail from the environment indicating it should fail. When this happens it sends a fail message to the primary and shuts down.

module secondary_node(port, fwd_chan, rev_chan, fail_chan, cl_chans) = {
    instance rep : replica

    instance req_chan : nondup_endpoint(port,req_msg.t)

    var failed : bool

    after init {
        failed := false;
    }

    action fail = {
        if ~failed {
            failed := true;
            var m : fail_msg;
            call fail_chan.send(m);
        }
    }

    implement req_chan.recv(inp : req_msg.t) {
        if ~failed {
            var rr := inp.req.bd;
            if rr.knd = read {
                call ref.commit(inp.req.tx,dest.sec);
                var res : response.t;
                res.tx := inp.req.tx;
                res.bd := rep.exec(rr);
                call cl_chans(inp.cid).send(res);
            }
ignore writes!
        }
    }
When the secondary receives a forwarded write from the primary, it commits it, executes it, and sends the response to the client.

    implement fwd_chan.recv(inp : req_msg.t) {
        if ~failed {
            var res : response.t;
            res.tx := inp.req.tx;
            res.bd := rep.exec(inp.req.bd);
            call cl_chans(inp.cid).send(res);
            call rev_chan.send(inp);
        }
    }

}
The plumbing


instance fwd_chan : tcp_channel("localhost:44090",req_msg.t)
instance rev_chan : tcp_channel("localhost:44091",req_msg.t)
The client channels

instance cl_chans : nondup_endpoint_set(client_id,44100,response.t)
The clients

instance cl(X:client_id) : client(X,prim.req_chan,sec.req_chan,cl_chans)
The primary and secondary servers. Notice we pass the failure channel as a parameter to the secondary.

instance prim : primary_node(44200,fwd_chan.sndr,rev_chan.rcvr,cl_chans)
instance sec : secondary_node(44201,fwd_chan.rcvr,rev_chan.sndr,prim.fail_chan,cl_chans)
The interface specifications


The service specification is the same.

object service_spec = {

    relation responded(X:txid)

    after init {
        responded(X) := false;
    }

    before cl_chans.send(p : client_id, m : response.t) {
        assert ~responded(m.tx);
        assert m.bd = ref.eval(m.tx);
        responded(m.tx) := true;
    }
}
The interface between primary and secondary is changed a bit to reflect the failure channel.

object mid_spec = {
    instance queue : unbounded_queue(txid)

    after ser.serialize(tx:txid) {
        call queue.push(tx);
    }

    before fwd_chan.rcvr.recv(inp : req_msg.t) {
        assert inp.req.bd = ref.txs(inp.req.tx);
        assert inp.req.tx = queue.pop;
        call ref.commit(inp.req.tx,dest.sec);
    }

    delegate fwd_chan_rcvr_recv[before] -> prim

    relation acked(X:txid)
We add a flag to record when the secondary has sent a fail message.

    var sec_failed : bool

    after init {
        acked(X) := false;
        sec_failed := false;
    }

    before rev_chan.sndr.send(inp : req_msg.t) {
        assert ref.committed(inp.req.tx);
        assert ref.txs(inp.req.tx) = inp.req.bd;
        assert ~acked(inp.req.tx);
        assert ~sec_failed;
        acked(inp.req.tx) := true;
    }

    function req_dest(X:txid) : dest.t

    after ref.create(inp : request_body.t, dst:dest.t) returns (tx : txid) {
        req_dest(tx) := dst
    }

    before prim.fail_chan.send(m : fail_msg) {
        assert ~sec_failed;
    }
Here, we set the sec_failed flag.

    after prim.fail_chan.send(m : fail_msg) {
        sec_failed := true;
    }
After a failure of the secondary, the primary is allowed to commit writes and the secondary is not allowed to commit anything.

    before ref.commit(tx : txid,dst:dest.t) {
        assert ref.txs(tx).knd = read -> req_dest(tx) = dst;
        if sec_failed {
            assert dst = dst.prim
        } else {
            assert ref.txs(tx).knd = write -> dst = dest.sec
        }
    }

    object solution = {
Exercise solution: We maintain an ordered queue of all the write requests that have been serialized but not commited.

        instance pending : unbounded_queue(txid)

        after ser.serialize(tx:txid) {
            call pending.push(tx);
            if sec_failed {
                call ref.commit(tx,dest.prim)
            }
        }

        after ref.commit(tx : txid,dst:dest.t)  {
            if ref.txs(tx).knd = write {
                var tmp := pending.pop;
            }
        }
When the secondary says it has failed, we commit all of the pending write requests.

        after prim.fail_chan.send(m : fail_msg) {
            while ~pending.empty {
                call ref.commit(pending.first,dest.prim)
            }
        }
    }
}
We export/import our API.

export cl.client_request
import cl.client_response
The environment can cause the secondary to fail

export sec.fail
Our isolates are as before

trusted isolate iso_prim = prim with cl,cl_chans,ref,ser,service_spec,mid_spec,fwd_chan,rev_chan
trusted isolate iso_sec = sec with cl,cl_chans,ref,ser,service_spec,mid_spec
As before, we have to give concrete interpretations for the abstarct types in order to test.

interpret txid -> int
interpret key -> strbv[1]
interpret value -> bv[16]
interpret client_id -> bv[1]

Adjusting the test probability distribution¤

Here's one more detail to help make sure we thoroughly test our servers. Actions like failure of the secondary cause somce code to become unreachable. This means that if failures occur too frequently, we won't test those parts of the code well. We can fix that by reducing the probability that the tester will choose a fail action. The statements below do this. By making the "weight" of an action less than one, we reduce the relative probability of the tester choosing that action. We adjust the wieght so that emprically failures occur in only about half the runs. That way, we know that both the failure code and the non-failure code will be well tested.

Notice another detail: when we're testing the primary, the tester simulates a failure by calling the action that sends a failure message prim.fail_chan as it simulates the environment of the primary. This means we need to adjust the probability of this action in addition to the exported action sec.fail, which the tester will call when testing the secondary.

Other sorts of actions that we typically have to reduce the probabilites of are resets and errors, both of which tend to cause work to be cancelled, so they reduce test coverage if they occur too frequently. In general, it's a good idea to examine the test traces to make sure that all the functionality of the system is being exercized.

attribute prim.fail_chan.send.weight = "0.05"
attribute sec.fail.weight = "0.05"

Exercise hints¤

You'll need to know some things about the Ivy language to fix this problem.

First, you can write a while loop in IVy like this:

while <condition> { <action> }

Also, you can test whether a queue q is empty with q.empty. Try using a queue to keep track of pending transaction commits and perform them at the right moment.