Replicating key-value store exercise¤
The original version of the replicating key-value store has the primary server delay any read of a key with a pending write until the ack returns from the secondary. In this exercise, we will modify the server so instead of delaying, the primary forwards the read to the secondary over the forward channel.
This means we have to make several changes:
- Modify the primary to do the forwarding
- Modify the secondary to handle read requests on the forward channel
- Modify
mid_spec, the specification of the interface between primary and secondary to reflect this change.
What do we need to guarantee about reads in the forward channel?
Are any other changes needed to mid_spec?
There are some hints below (search for HINT) on where the code might need to be modified.
Basic data types¤
As with he simple one-process server (see repstore1.ivy), we begin by declaring the data types needed to represent transactions and the messages that carry them.
The key and value types are uninterpreted for the moment.
type key
type value
ghost type txid
type request_kind = {write,read}
object request_body = {
type t = struct {
knd : request_kind,
ky : key,
vl : value
}
}
object request = {
type t = struct {
tx : txid,
bd : request_body.t
}
}
object response_body = {
type t = struct {
vl : value
}
}
object response = {
type t = struct {
tx : txid,
bd : response_body.t
}
}
module replica = {
function store(K:key) : value
after init {
store(K) := 0
}
action exec(inp : request_body.t) returns (out:response_body.t) = {
if inp.knd = write {
store(inp.ky) := inp.vl;
}
else if inp.knd = read {
out.vl := store(inp.ky);
}
}
}
dest.t has two values, one for the primary and one
for the secondary.
object dest = {
ghost type t = {prim,sec}
}
Our reference model is the same as before.
object ref = {
action create(inp : request_body.t, dst:dest.t) returns (tx : txid)
action commit(tx : txid,dst:dest.t)
action eval(tx : txid) returns (res : response_body.t)
instance rep : replica
var next : txid
function txs(X:txid) : request_body.t
function txres(X:txid) : response_body.t
relation committed(X:txid)
after init {
next := 0;
committed(X) := false;
}
implement create {
tx := next;
txs(tx) := inp;
next := next + 1;
}
implement commit {
assert 0 <= tx & tx < next;
assert ~committed(tx);
txres(tx) := rep.exec(txs(tx));
committed(tx) := true;
}
delegate commit
implement eval {
assert committed(tx);
res := txres(tx);
}
delegate eval
interpret txid -> int
}
serialize that indicates when
a write transaction is serialized.
object ser = {
action serialize(tx : txid)
relation serialized(X:txid)
after init {
serialized(X) := false;
}
implement serialize {
assert 0 <= tx & tx < ref.next;
assert ~serialized(tx);
assert ref.txs(tx).knd = write;
serialized(tx) := true;
}
delegate serialize
before ref.commit(tx : txid,dst:dest.t) {
assert ref.txs(tx).knd = write -> serialized(tx);
}
}
Now we are ready to define our system implemention, consisting of client endpoints, the primary server and the secondary server.
Notice we include the tcp module here, since we will need it for
the ordered channels between the servers.
include tcp
include udp
type client_id
type req_msg = struct {
cid : client_id,
req : request.t
}
module client(cid,prim_chan,sec_chan,cl_chans) = {
action client_request(req : request_body.t, the_dst: dest.t)
action client_response(req : response_body.t, tx : txid)
the_dst.
implement client_request {
var m : req_msg;
m.cid := cid;
m.req.tx := ref.create(req,the_dst);
m.req.bd := req;
if the_dst = dest.prim {
call prim_chan.send(m);
} else {
call sec_chan.send(m);
}
}
client_response callback. The transaction id parameter is
"ghost" and is only used for specification.
implement cl_chans.recv(resp : response.t) {
call client_response(resp.bd,resp.tx)
}
}
module primary_node(port, fwd_chan, rev_chan, cl_chans) = {
instance rep : replica
instance req_chan : nondup_endpoint(port,req_msg)
instance counter : unbounded_sequence
function pending(K:key) : counter.t
after init {
pending(K) := 0;
}
implement req_chan.recv(inp : req_msg) {
var rr := inp.req.bd;
if rr.knd = read {
if pending(rr.ky) = 0 {
call ref.commit(inp.req.tx,dest.prim);
var res : response.t;
res.tx := inp.req.tx;
res.bd := rep.exec(rr);
call cl_chans(inp.cid).send(res)
} else {
call req_chan.send(inp); # if cannot execute, recirculate (HINT)
}
} else if rr.knd = write {
call ser.serialize(inp.req.tx); # this is ghost!
call fwd_chan.send(inp);
pending(rr.ky) := pending(rr.ky).next;
var res := rep.exec(inp.req.bd);
}
}
implement rev_chan.recv(inp : req_msg) {
var rr := inp.req.bd;
if rr.knd = write {
pending(rr.ky) := pending(rr.ky).prev;
}
}
}
module secondary_node(port, fwd_chan, rev_chan, cl_chans) = {
instance rep : replica
instance req_chan : nondup_endpoint(port,req_msg)
implement req_chan.recv(inp : req_msg) {
var rr := inp.req.bd;
if rr.knd = read {
var res : response.t;
res.tx := inp.req.tx;
res.bd := rep.exec(rr);
call cl_chans(inp.cid).send(res);
}
}
implement fwd_chan.recv(inp : req_msg) {
var res : response.t;
res.tx := inp.req.tx;
res.bd := rep.exec(inp.req.bd);
call cl_chans(inp.cid).send(res);
call rev_chan.send(inp);
}
}
Now we two servers, a primary and a secondary, and an array of client endpoints, connecting them up with network channels.
We have two TCP (ordered) channels beteween the primary and secondary, one forward (listening on port 44090) and one reverse (listening on port 44091). Both carry request messages.
instance fwd_chan : tcp_channel("localhost:44090",req_msg)
instance rev_chan : tcp_channel("localhost:44091",req_msg)
cl_chans. The endpoints will use a range of
port numbers beginning with 441000.
instance cl_chans : nondup_endpoint_set(client_id,44100,response.t)
instance cl(X:client_id) : client(X,prim.req_chan,sec.req_chan,cl_chans)
instance prim : primary_node(44200,fwd_chan.sndr,rev_chan.rcvr,cl_chans)
instance sec : secondary_node(44201,fwd_chan.rcvr,rev_chan.sndr,cl_chans)
The service specification is unchanged from the simple sequential server.
object service_spec = {
relation responded(X:txid)
after init {
responded(X) := false;
}
before cl_chans.send(p : client_id, m : response.t) {
assert ~responded(m.tx);
assert m.bd = ref.eval(m.tx);
responded(m.tx) := true;
}
}
mid_spec. The state
of this interface keeps a FIFO queue of messages that have been serialized
but not yet transmitted over the interface. When a message is serialize,
we insert it in this queue
object mid_spec = {
instance queue : unbounded_queue(txid)
after ser.serialize(tx:txid) {
call queue.push(tx);
}
before fwd_chan.rcvr.recv(inp : req_msg) {
assert inp.req.bd = ref.txs(inp.req.tx);
assert inp.req.tx = queue.pop;
call ref.commit(inp.req.tx,dest.sec);
}
fwd_chan.rcvr.recv. This means that the assertions
will be verified when we test the primary. Usually, IVy's
default assignment of guarantees to object is what we want, but
in this case the caller is not the guarantor of the assertions.
delegate fwd_chan_rcvr_recv[before] -> prim
acked of
acknowledgef transactions.
relation acked(X:txid)
after init {
acked(X) := false;
}
before rev_chan.sndr.send(inp : req_msg) {
assert ref.committed(inp.req.tx);
assert ref.txs(inp.req.tx) = inp.req.bd;
assert ~acked(inp.req.tx);
acked(inp.req.tx) := true;
}
Hint
what happens to the request destination? When does it need to change?
function req_dest(X:txid) : dest.t
after ref.create(inp : request_body.t, dst:dest.t) returns (tx : txid) {
req_dest(tx) := dst
}
before ref.commit(tx : txid,dst:dest.t) {
assert ref.txs(tx).knd = read -> req_dest(tx) = dst;
assert ref.txs(tx).knd = write -> dst = dest.sec
}
before sec.req_chan.recv(inp : req_msg) {
if inp.req.bd.knd = read {
call ref.commit(inp.req.tx,dest.sec)
}
}
}
export cl.client_request
import cl.client_response
mid_spec at the secondary end of the channels fwd_chan and
rev_chan, those channels are used in the primary's isolate, but
not in the secondary's. Notice also that the client endpoints cl
are present in both isolates. Since they are trivial "stubs", there
is no reason to separate them into their own isolate.
trusted isolate iso_prim = prim with cl,cl_chans,ref,ser,service_spec,mid_spec,fwd_chan,rev_chan
trusted isolate iso_sec = sec with cl,cl_chans,ref,ser,service_spec,mid_spec
interpret txid -> int
interpret key -> strbv[1]
interpret value -> bv[16]
interpret client_id -> bv[1]
$ ivy_to_cpp target=test isolate=iso_prim build=true repstore2.ivy
$ ./repstore1 iters=100 runs=10 out=file.iev
This tests the primary server. Look at the trace file file.iev
and notice which actions are generated by the tester and which by
the code.
To test the secondary, use these commands:
$ ivy_to_cpp target=test isolate=iso_sec build=true repstore2.ivy
$ ./repstore1 iters=100 runs=10 out=file.iev
Again, see what actions the tester is generating. Try putting some errors in this example and see if they produce assertions failures. If not, why not?