Replicating key-value store¤
Now we will consider a distributed implementation of the key/value store. This version is replicating. We will have two server processes, a primary and a secondary. Each keeps a complete replica of the store. All write requsts go first to the primary and then are passed along to the secondary. Read requests, on the other hand, may be sent to either server. We will assume that the channel between the primary and secondary is ordered.
Now we face a question: since transactions happen in parallel, and no one server sees all the transactions, how do we define commit points? At first we might imagine committing the write transactions when they execute on the primary, and the reads transactions on whichever server they arrive at. Clearly, this won't work, however. Suppose a write has been committed on the primary and is on its way to the secondary. If the secondary now commits a read for the same key, it will return a stale value for that read. The situation is reversed if we commit writes when they execute at the secondary, since a read on the primary could returna "future" value.
In fact, there is no easy way out of this dilemma. If we don't add some additional concurrency control, we will necessarily violate linearizability. To fix the problem, we will add a reverse channel from the secondary to the primary to acknowldege when writes have been committed on the secondary. If there is a pending, unacknowledged write on the primary, it must wait to serve any reads for that key until write is acknowledged.
The key property we must then maintain on the channel is that writes occur in the same order in which they are executed on the primary. This does not mean that forwarding of a write cannot occur before the actual execution on the primrary. It just means the order must be preserved. We will call this the serialization order. Thus, writes are serialized on the primary (meaning their order is determined) but committed on the secondary).
Using this specification, we can test the primary and secondary servers in isolation, and if they are correct according to their assume.guarantee specifications, we can conclude that the together provide a correct service according to our reference object,
Basic data types¤
As with he simple one-process server (see repstore1.ivy), we begin by declaring the data types needed to represent transactions and the messages that carry them.
The key and value types are uninterpreted for the moment.
type key
type value
ghost type txid
type request_kind = {write,read}
object request_body = {
type t = struct {
knd : request_kind,
ky : key,
vl : value
}
}
object request = {
type t = struct {
tx : txid,
bd : request_body.t
}
}
object response_body = {
type t = struct {
vl : value
}
}
object response = {
type t = struct {
tx : txid,
bd : response_body.t
}
}
module replica = {
function store(K:key) : value
after init {
store(K) := 0
}
action exec(inp : request_body.t) returns (out:response_body.t) = {
if inp.knd = write {
store(inp.ky) := inp.vl;
}
else if inp.knd = read {
out.vl := store(inp.ky);
}
}
}
dest.t has two values, one for the primary and one
for the secondary.
object dest = {
ghost type t = {prim,sec}
}
Our reference model is the same as before.
object ref = {
action create(inp : request_body.t, dst:dest.t) returns (tx : txid)
action commit(tx : txid,dst:dest.t)
action eval(tx : txid) returns (res : response_body.t)
instance rep : replica
var next : txid
function txs(X:txid) : request_body.t
function txres(X:txid) : response_body.t
relation committed(X:txid)
after init {
next := 0;
committed(X) := false;
}
implement create {
tx := next;
txs(tx) := inp;
next := next + 1;
}
implement commit {
assert 0 <= tx & tx < next;
assert ~committed(tx);
txres(tx) := rep.exec(txs(tx));
committed(tx) := true;
}
delegate commit
implement eval {
assert committed(tx);
res := txres(tx);
}
delegate eval
interpret txid -> int
}
serialize that indicates when
a write transaction is serialized.
object ser = {
action serialize(tx : txid)
relation serialized(X:txid)
after init {
serialized(X) := false;
}
implement serialize {
assert 0 <= tx & tx < ref.next;
assert ~serialized(tx);
assert ref.txs(tx).knd = write;
serialized(tx) := true;
}
delegate serialize
before ref.commit(tx : txid,dst:dest.t) {
assert ref.txs(tx).knd = write -> serialized(tx);
}
}
Now we are ready to define our system implemention, consisting of client endpoints, the primary server and the secondary server.
Notice we include the tcp module here, since we will need it for
the ordered channels between the servers.
include tcp
include udp
type client_id
type req_msg = struct {
cid : client_id,
req : request.t
}
module client(cid,prim_chan,sec_chan,cl_chans) = {
action client_request(req : request_body.t, the_dst: dest.t)
action client_response(req : response_body.t, tx : txid)
the_dst.
implement client_request {
var m : req_msg;
m.cid := cid;
m.req.tx := ref.create(req,the_dst);
m.req.bd := req;
if the_dst = dest.prim {
call prim_chan.send(m);
} else {
call sec_chan.send(m);
}
}
client_response callback. The transaction id parameter is
"ghost" and is only used for specification.
implement cl_chans.recv(resp : response.t) {
call client_response(resp.bd,resp.tx)
}
}
module primary_node(port, fwd_chan, rev_chan, cl_chans) = {
instance rep : replica
instance req_chan : nondup_endpoint(port,req_msg)
instance counter : unbounded_sequence
function pending(K:key) : counter.t
after init {
pending(K) := 0;
}
implement req_chan.recv(inp : req_msg) {
var rr := inp.req.bd;
if rr.knd = read {
if pending(rr.ky) = 0 {
call ref.commit(inp.req.tx,dest.prim);
var res : response.t;
res.tx := inp.req.tx;
res.bd := rep.exec(rr);
call cl_chans(inp.cid).send(res)
} else {
call req_chan.send(inp); # if cannot execute, recirculate
}
} else if rr.knd = write {
call ser.serialize(inp.req.tx); # this is ghost!
call fwd_chan.send(inp);
pending(rr.ky) := pending(rr.ky).next;
var res := rep.exec(inp.req.bd);
}
}
implement rev_chan.recv(inp : req_msg) {
var rr := inp.req.bd;
if rr.knd = write {
pending(rr.ky) := pending(rr.ky).prev;
}
}
}
module secondary_node(port, fwd_chan, rev_chan, cl_chans) = {
instance rep : replica
instance req_chan : nondup_endpoint(port,req_msg)
implement req_chan.recv(inp : req_msg) {
var rr := inp.req.bd;
if rr.knd = read {
var res : response.t;
res.tx := inp.req.tx;
res.bd := rep.exec(rr);
call cl_chans(inp.cid).send(res);
}
}
implement fwd_chan.recv(inp : req_msg) {
var res : response.t;
res.tx := inp.req.tx;
res.bd := rep.exec(inp.req.bd);
call cl_chans(inp.cid).send(res);
call rev_chan.send(inp);
}
}
Now we two servers, a primary and a secondary, and an array of client endpoints, connecting them up with network channels.
We have two TCP (ordered) channels beteween the primary and secondary, one forward (listening on port 44090) and one reverse (listening on port 44091). Both carry request messages.
instance fwd_chan : tcp_channel("localhost:44090",req_msg)
instance rev_chan : tcp_channel("localhost:44091",req_msg)
cl_chans. The endpoints will use a range of
port numbers beginning with 441000.
instance cl_chans : nondup_endpoint_set(client_id,44100,response.t)
instance cl(X:client_id) : client(X,prim.req_chan,sec.req_chan,cl_chans)
instance prim : primary_node(44200,fwd_chan.sndr,rev_chan.rcvr,cl_chans)
instance sec : secondary_node(44201,fwd_chan.rcvr,rev_chan.sndr,cl_chans)
The service specification is unchanged from the simple sequential server.
object service_spec = {
relation responded(X:txid)
after init {
responded(X) := false;
}
before cl_chans.send(p : client_id, m : response.t) {
assert ~responded(m.tx);
assert m.bd = ref.eval(m.tx);
responded(m.tx) := true;
}
}
mid_spec. The state
of this interface keeps a FIFO queue of messages that have been serialized
but not yet transmitted over the interface. When a message is serialize,
we insert it in this queue
object mid_spec = {
instance queue : unbounded_queue(txid)
after ser.serialize(tx:txid) {
call queue.push(tx);
}
before fwd_chan.rcvr.recv(inp : req_msg) {
assert inp.req.bd = ref.txs(inp.req.tx);
assert inp.req.tx = queue.pop;
call ref.commit(inp.req.tx,dest.sec);
}
fwd_chan.rcvr.recv. This means that the assertions
will be verified when we test the primary. Usually, IVy's
default assignment of guarantees to object is what we want, but
in this case the caller is not the guarantor of the assertions.
delegate fwd_chan_rcvr_recv[before] -> prim
acked of
acknowledgef transactions.
relation acked(X:txid)
after init {
acked(X) := false;
}
before rev_chan.sndr.send(inp : req_msg) {
assert ref.committed(inp.req.tx);
assert ref.txs(inp.req.tx) = inp.req.bd;
assert ~acked(inp.req.tx);
acked(inp.req.tx) := true;
}
function req_dest(X:txid) : dest.t
after ref.create(inp : request_body.t, dst:dest.t) returns (tx : txid) {
req_dest(tx) := dst
}
before ref.commit(tx : txid,dst:dest.t) {
assert ref.txs(tx).knd = read -> req_dest(tx) = dst;
assert ref.txs(tx).knd = write -> dst = dest.sec
}
before sec.req_chan.recv(inp : req_msg) {
if inp.req.bd.knd = read {
call ref.commit(inp.req.tx,dest.sec)
}
}
}
export cl.client_request
import cl.client_response
mid_spec at the secondary end of the channels fwd_chan and
rev_chan, those channels are used in the primary's isolate, but
not in the secondary's. Notice also that the client endpoints cl
are present in both isolates. Since they are trivial "stubs", there
is no reason to separate them into their own isolate.
trusted isolate iso_prim = prim with cl,cl_chans,ref,ser,service_spec,mid_spec,fwd_chan,rev_chan
trusted isolate iso_sec = sec with cl,cl_chans,ref,ser,service_spec,mid_spec
interpret txid -> int
interpret key -> strbv[1]
interpret value -> bv[16]
interpret client_id -> bv[1]
$ ivy_to_cpp target=test isolate=iso_prim build=true repstore2.ivy
$ ./repstore2 iters=100 runs=10 out=file.iev
This tests the primary server. Look at the trace file file.iev
and notice which actions are generated by the tester and which by
the code.
To test the secondary, use these commands:
$ ivy_to_cpp target=test isolate=iso_sec build=true repstore2.ivy
$ ./repstore2 iters=100 runs=10 out=file.iev
Again, see what actions the tester is generating. Try putting some errors in this example and see if they produce assertions failures. If not, why not?