Solution: Replicating key-value store with fail-over¤
Here is one possible fix for the interface specification problem in the replicating key-value store with fail-over.
The problem is the when the secondary fails, the replica on the primary reflects writes that have been serialized but not committed by the secondary. That means that any reads on the primary to these keys must wait for commit events that will never occur, since the secondary has failed. Our primary doesn't wait, and and thus the reads may return "future" values (i.e., values that reflect writes that have not committed).
One possible solution top this is to commit all of the pending
writes at the moment when the the fail message is sent. This is done
below in the definiton of mid_spec (see object solution).
Basic data types¤
These are the same as beore.
type key
type value
ghost type txid
type request_kind = {write,read}
object request_body = {
type t = struct {
knd : request_kind,
ky : key,
vl : value
}
}
object request = {
type t = struct {
tx : txid,
bd : request_body.t
}
}
object response_body = {
type t = struct {
vl : value
}
}
object response = {
type t = struct {
tx : txid,
bd : response_body.t
}
}
module replica = {
function store(K:key) : value
after init {
store(K) := 0
}
action exec(inp : request_body.t) returns (out:response_body.t) = {
if inp.knd = write {
store(inp.ky) := inp.vl;
}
else if inp.knd = read {
out.vl := store(inp.ky);
}
}
}
object dest = {
ghost type t = {prim,sec}
}
Our reference model is the same as before.
object ref = {
action create(inp : request_body.t, dst:dest.t) returns (tx : txid)
action commit(tx : txid,dst:dest.t)
action eval(tx : txid) returns (res : response_body.t)
instance rep : replica
individual next : txid
function txs(X:txid) : request_body.t
function txres(X:txid) : response_body.t
relation committed(X:txid)
after init {
next := 0;
committed(X) := false;
}
implement create {
tx := next;
txs(tx) := inp;
next := next + 1;
}
implement commit {
assert 0 <= tx & tx < next;
assert ~committed(tx);
txres(tx) := rep.exec(txs(tx));
committed(tx) := true;
}
delegate commit
implement eval {
assert committed(tx);
res := txres(tx);
}
delegate eval
interpret txid -> int
}
object ser = {
action serialize(tx : txid)
relation serialized(X:txid)
after init {
serialized(X) := false;
}
implement serialize {
assert 0 <= tx & tx < ref.next;
assert ~serialized(tx);
assert ref.txs(tx).knd = write;
serialized(tx) := true;
}
delegate serialize
before ref.commit(tx : txid,dst:dest.t) {
assert ref.txs(tx).knd = write -> serialized(tx);
}
}
include tcp
include udp
type client_id
object req_msg = {
type t = struct {
cid : client_id,
req : request.t
}
}
module client(cid,prim_chan,sec_chan,cl_chans) = {
action client_request(the_req : request_body.t, the_dst: dest.t)
action client_response(resp : response_body.t, tx : txid)
implement client_request {
local m : req_msg.t {
m.cid := cid;
m.req.tx := ref.create(the_req,the_dst);
m.req.bd := the_req;
if the_dst = dest.prim {
call prim_chan.send(m);
} else {
call sec_chan.send(m);
}
}
}
implement cl_chans.recv(resp : response.t) {
call client_response(resp.bd,resp.tx)
}
}
fail_chan that listens for a failure
message from the secondary. The message is an empty struct. We keep a boolean
sec_failed to indicate the secondary has failed.
type fail_msg = struct {}
module primary_node(port, fwd_chan, rev_chan, cl_chans) = {
instance rep : replica
instance req_chan : nondup_endpoint(port,req_msg.t)
instance fail_chan : nondup_endpoint(port,fail_msg)
instance counter : unbounded_sequence
function pending(K:key) : counter.t
individual sec_failed : bool
after init {
pending(K) := 0;
sec_failed := false;
}
sec_failed.
implement fail_chan.recv(inp : fail_msg) {
sec_failed := true;
}
implement req_chan.recv(inp : req_msg.t) {
var rr := inp.req.bd;
if rr.knd = read {
if pending(rr.ky) = 0 | sec_failed {
call ref.commit(inp.req.tx,dest.prim);
var res : response.t;
res.tx := inp.req.tx;
res.bd := rep.exec(rr);
call cl_chans(inp.cid).send(res)
} else {
call req_chan.send(inp); # if cannot execute, recirculate
}
} else if rr.knd = write {
call ser.serialize(inp.req.tx); # this is ghost!
if ~sec_failed {
call fwd_chan.send(inp);
pending(rr.ky) := pending(rr.ky).next;
};
var res := rep.exec(inp.req.bd);
}
}
implement rev_chan.recv(inp : req_msg.t) {
var rr := inp.req.bd;
if rr.knd = write {
pending(rr.ky) := pending(rr.ky).prev;
}
}
}
fail from the environment indicating it should
fail. When this happens it sends a fail message to the primary and shuts down.
module secondary_node(port, fwd_chan, rev_chan, fail_chan, cl_chans) = {
instance rep : replica
instance req_chan : nondup_endpoint(port,req_msg.t)
individual failed : bool
after init {
failed := false;
}
action fail = {
if ~failed {
failed := true;
var m : fail_msg;
call fail_chan.send(m);
}
}
implement req_chan.recv(inp : req_msg.t) {
if ~failed {
var rr := inp.req.bd;
if rr.knd = read {
call ref.commit(inp.req.tx,dest.sec);
var res : response.t;
res.tx := inp.req.tx;
res.bd := rep.exec(rr);
call cl_chans(inp.cid).send(res);
}
}
}
implement fwd_chan.recv(inp : req_msg.t) {
if ~failed {
var res : response.t;
res.tx := inp.req.tx;
res.bd := rep.exec(inp.req.bd);
call cl_chans(inp.cid).send(res);
call rev_chan.send(inp);
}
}
}
instance fwd_chan : tcp_channel("localhost:44090",req_msg.t)
instance rev_chan : tcp_channel("localhost:44091",req_msg.t)
instance cl_chans : nondup_endpoint_set(client_id,44100,response.t)
instance cl(X:client_id) : client(X,prim.req_chan,sec.req_chan,cl_chans)
instance prim : primary_node(44200,fwd_chan.sndr,rev_chan.rcvr,cl_chans)
instance sec : secondary_node(44201,fwd_chan.rcvr,rev_chan.sndr,prim.fail_chan,cl_chans)
The service specification is the same.
object service_spec = {
relation responded(X:txid)
after init {
responded(X) := false;
}
before cl_chans.send(p : client_id, m : response.t) {
assert ~responded(m.tx);
assert m.bd = ref.eval(m.tx);
responded(m.tx) := true;
}
}
object mid_spec = {
instance queue : unbounded_queue(txid)
after ser.serialize(tx:txid) {
call queue.push(tx);
}
before fwd_chan.rcvr.recv(inp : req_msg.t) {
assert inp.req.bd = ref.txs(inp.req.tx);
assert inp.req.tx = queue.pop;
call ref.commit(inp.req.tx,dest.sec);
}
delegate fwd_chan_rcvr_recv[before] -> prim
relation acked(X:txid)
individual sec_failed : bool
after init {
acked(X) := false;
sec_failed := false;
}
before rev_chan.sndr.send(inp : req_msg.t) {
assert ref.committed(inp.req.tx);
assert ref.txs(inp.req.tx) = inp.req.bd;
assert ~acked(inp.req.tx);
assert ~sec_failed;
acked(inp.req.tx) := true;
}
function req_dest(X:txid) : dest.t
after ref.create(inp : request_body.t, dst:dest.t) returns (tx : txid) {
req_dest(tx) := dst
}
before prim.fail_chan.send(m : fail_msg) {
assert ~sec_failed;
}
sec_failed flag.
after prim.fail_chan.send(m : fail_msg) {
sec_failed := true;
}
before ref.commit(tx : txid,dst:dest.t) {
assert ref.txs(tx).knd = read -> req_dest(tx) = dst;
if sec_failed {
assert dst = dst.prim
} else {
assert ref.txs(tx).knd = write -> dst = dest.sec
}
}
object solution = {
instance pending : unbounded_queue(txid)
after ser.serialize(tx:txid) {
call pending.push(tx);
if sec_failed {
call ref.commit(tx,dest.prim)
}
}
after ref.commit(tx : txid,dst:dest.t) {
if ref.txs(tx).knd = write {
var tmp := pending.pop;
}
}
after prim.fail_chan.send(m : fail_msg) {
while ~pending.empty {
call ref.commit(pending.first,dest.prim)
}
}
}
}
export cl.client_request
import cl.client_response
export sec.fail
trusted isolate iso_prim = prim with cl,cl_chans,ref,ser,service_spec,mid_spec,fwd_chan,rev_chan
trusted isolate iso_sec = sec with cl,cl_chans,ref,ser,service_spec,mid_spec
interpret txid -> int
interpret key -> strbv[1]
interpret value -> bv[16]
interpret client_id -> bv[1]
Adjusting the test probability distribution¤
Here's one more detail to help make sure we thoroughly test our servers. Actions like failure of the secondary cause somce code to become unreachable. This means that if failures occur too frequently, we won't test those parts of the code well. We can fix that by reducing the probability that the tester will choose a fail action. The statements below do this. By making the "weight" of an action less than one, we reduce the relative probability of the tester choosing that action. We adjust the wieght so that emprically failures occur in only about half the runs. That way, we know that both the failure code and the non-failure code will be well tested.
Notice another detail: when we're testing the primary, the tester
simulates a failure by calling the action that sends a failure
message prim.fail_chan as it simulates the environment of the
primary. This means we need to adjust the probability of this action
in addition to the exported action sec.fail, which the tester will
call when testing the secondary.
Other sorts of actions that we typically have to reduce the probabilites of are resets and errors, both of which tend to cause work to be cancelled, so they reduce test coverage if they occur too frequently. In general, it's a good idea to examine the test traces to make sure that all the functionality of the system is being exercized.
attribute prim.fail_chan.send.weight = "0.05"
attribute sec.fail.weight = "0.05"
Exercise hints¤
You'll need to know some things about the Ivy language to fix this problem.
First, you can write a while loop in IVy like this:
while <condition> { <action> }
Also, you can test whether a queue q is empty with q.empty. Try
using a queue to keep track of pending transaction commits and
perform them at the right moment.