Bonus exercise: Replicating key-value store with fail-over¤
To make things more interesting, we will try to make our replicating key-value store handle fail-over. To make things easier, however, we'll consider only the case of failure of the secondary, and we'll assume that when the secondary fails, it sends a message to the primary and stops. In reality, we would need a master server to monitor secondary and manage the fail-over. Also, should inform the client endpoints that the secondary has failed, but we won't bother.
Have a look at the code below to see what changes were made. Sadly,
our attempt has gone wrong and our test of the primary server is
giving an assertion failure. Try testing the primary and see if you
can figure out what is going wrong. How could we change the mid_spec
interface specification to fix this problem?
Bonus bonus exercise: look at the end of this page for some hints you can use to fix the spec. Can you fix it?
Basic data types¤
These are the same as beore.
type key
type value
ghost type txid
type request_kind = {write,read}
object request_body = {
type t = struct {
knd : request_kind,
ky : key,
vl : value
}
}
object request = {
type t = struct {
tx : txid,
bd : request_body.t
}
}
object response_body = {
type t = struct {
vl : value
}
}
object response = {
type t = struct {
tx : txid,
bd : response_body.t
}
}
module replica = {
function store(K:key) : value
after init {
store(K) := 0
}
action exec(inp : request_body.t) returns (out:response_body.t) = {
if inp.knd = write {
store(inp.ky) := inp.vl;
}
else if inp.knd = read {
out.vl := store(inp.ky);
}
}
}
object dest = {
ghost type t = {prim,sec}
}
Our reference model is the same as before.
object ref = {
action create(inp : request_body.t, dst:dest.t) returns (tx : txid)
action commit(tx : txid,dst:dest.t)
action eval(tx : txid) returns (res : response_body.t)
instance rep : replica
individual next : txid
function txs(X:txid) : request_body.t
function txres(X:txid) : response_body.t
relation committed(X:txid)
after init {
next := 0;
committed(X) := false;
}
implement create {
tx := next;
txs(tx) := inp;
next := next + 1;
}
implement commit {
assert 0 <= tx & tx < next;
assert ~committed(tx);
txres(tx) := rep.exec(txs(tx));
committed(tx) := true;
}
delegate commit
implement eval {
assert committed(tx);
res := txres(tx);
}
delegate eval
interpret txid -> int
}
object ser = {
action serialize(tx : txid)
relation serialized(X:txid)
after init {
serialized(X) := false;
}
implement serialize {
assert 0 <= tx & tx < ref.next;
assert ~serialized(tx);
assert ref.txs(tx).knd = write;
serialized(tx) := true;
}
delegate serialize
before ref.commit(tx : txid,dst:dest.t) {
assert ref.txs(tx).knd = write -> serialized(tx);
}
}
include tcp
include udp
type client_id
type req_msg = struct {
cid : client_id,
req : request.t
}
module client(cid,prim_chan,sec_chan,cl_chans) = {
action client_request(req : request_body.t, the_dst: dest.t)
action client_response(req : response_body.t, tx : txid)
implement client_request {
local m : req_msg {
m.cid := cid;
m.req.tx := ref.create(req,the_dst);
m.req.bd := req;
if the_dst = dest.prim {
call prim_chan.send(m);
} else {
call sec_chan.send(m);
}
}
}
implement cl_chans.recv(resp : response.t) {
call client_response(resp.bd,resp.tx)
}
}
fail_chan that listens for a failure
message from the secondary. The message is an empty struct. We keep a boolean
sec_failed to indicate the secondary has failed.
type fail_msg = struct {}
module primary_node(port, fwd_chan, rev_chan, cl_chans) = {
instance rep : replica
instance req_chan : nondup_endpoint(port,req_msg)
instance fail_chan : nondup_endpoint(port,fail_msg)
instance counter : unbounded_sequence
function pending(K:key) : counter.t
individual sec_failed : bool
after init {
pending(K) := 0;
sec_failed := false;
}
sec_failed.
implement fail_chan.recv(inp : fail_msg) {
sec_failed := true;
}
implement req_chan.recv(inp : req_msg) {
var rr := inp.req.bd;
if rr.knd = read {
if pending(rr.ky) = 0 | sec_failed {
call ref.commit(inp.req.tx,dest.prim);
var res : response.t;
res.tx := inp.req.tx;
res.bd := rep.exec(rr);
call cl_chans(inp.cid).send(res)
} else {
call req_chan.send(inp); # if cannot execute, recirculate
}
} else if rr.knd = write {
call ser.serialize(inp.req.tx); # this is ghost!
call fwd_chan.send(inp);
if ~sec_failed {
pending(rr.ky) := pending(rr.ky).next;
var res := rep.exec(inp.req.bd);
}
}
}
implement rev_chan.recv(inp : req_msg) {
var rr := inp.req.bd;
if rr.knd = write {
pending(rr.ky) := pending(rr.ky).prev;
}
}
}
fail from the environment indicating it should
fail. When this happens it sends a fail message to the primary and shuts down.
module secondary_node(port, fwd_chan, rev_chan, fail_chan, cl_chans) = {
instance rep : replica
instance req_chan : nondup_endpoint(port,req_msg)
individual failed : bool
after init {
failed := false;
}
action fail = {
failed := true;
var m : fail_msg;
call fail_chan.send(m);
}
implement req_chan.recv(inp : req_msg) {
if ~failed {
var rr := inp.req.bd;
if rr.knd = read {
call ref.commit(inp.req.tx,dest.sec);
var res : response.t;
res.tx := inp.req.tx;
res.bd := rep.exec(rr);
call cl_chans(inp.cid).send(res);
}
}
}
implement fwd_chan.recv(inp : req_msg) {
if ~failed {
var res : response.t;
res.tx := inp.req.tx;
res.bd := rep.exec(inp.req.bd);
call cl_chans(inp.cid).send(res);
call rev_chan.send(inp);
}
}
}
instance fwd_chan : tcp_channel("localhost:44090",req_msg)
instance rev_chan : tcp_channel("localhost:44091",req_msg)
instance cl_chans : nondup_endpoint_set(client_id,44100,response.t)
instance cl(X:client_id) : client(X,prim.req_chan,sec.req_chan,cl_chans)
instance prim : primary_node(44200,fwd_chan.sndr,rev_chan.rcvr,cl_chans)
instance sec : secondary_node(44201,fwd_chan.rcvr,rev_chan.sndr,prim.fail_chan,cl_chans)
The service specification is the same.
object service_spec = {
relation responded(X:txid)
after init {
responded(X) := false;
}
before cl_chans.send(p : client_id, m : response.t) {
assert ~responded(m.tx);
assert m.bd = ref.eval(m.tx);
responded(m.tx) := true;
}
}
object mid_spec = {
instance queue : unbounded_queue(txid)
before fwd_chan.rcvr.recv(inp : req_msg) {
assert inp.req.bd = ref.txs(inp.req.tx);
assert inp.req.tx = queue.pop;
call ref.commit(inp.req.tx,dest.sec);
}
delegate fwd_chan_rcvr_recv[before] -> prim
relation acked(X:txid)
individual sec_failed : bool
after init {
acked(X) := false;
sec_failed := false;
}
before rev_chan.sndr.send(inp : req_msg) {
assert ref.committed(inp.req.tx);
assert ref.txs(inp.req.tx) = inp.req.bd;
assert ~acked(inp.req.tx);
acked(inp.req.tx) := true;
}
after ser.serialize(tx:txid) {
call queue.push(tx);
if sec_failed {
call ref.commit(tx)
}
}
function req_dest(X:txid) : dest.t
after ref.create(inp : request_body.t, dst:dest.t) returns (tx : txid) {
req_dest(tx) := dst
}
sec_failed flag.
after prim.fail_chan.send(m : fail_msg) {
sec_failed := true;
}
before ref.commit(tx : txid,dst:dest.t) {
assert ref.txs(tx).knd = read -> req_dest(tx) = dst;
if sec_failed {
assert dst = dst.prim
} else {
assert ref.txs(tx).knd = write -> dst = dest.sec
}
}
}
export cl.client_request
import cl.client_response
trusted isolate iso_prim = prim with cl,cl_chans,ref,ser,service_spec,mid_spec,fwd_chan,rev_chan
trusted isolate iso_sec = sec with cl,cl_chans,ref,ser,service_spec,mid_spec
interpret txid -> int
interpret key -> strbv[1]
interpret value -> bv[16]
interpret client_id -> bv[1]
Exercise hints¤
You'll need to know some things about the Ivy language to fix this problem.
First, you can write a while loop in IVy like this:
while <condition> { <action> }
Also, you can test whether a queue q is empty with q.empty. Try
using a queue to keep track of pending transaction commits and
perform them at the right moment.