include tcp
include udp
type key
type value
ghost type txid
type request_body
object write_req = {
variant t of request_body = struct {
ky : key,
vl : value
}
}
object read_req = {
variant t of request_body = struct {
ky : key
}
}
object request = {
type t = struct {
tx : txid,
bd : request_body
}
}
type response_body
object write_resp = {
variant t of response_body = struct {
}
}
object read_resp = {
variant t of response_body = struct {
vl : value
}
}
object response = {
type t = struct {
tx : txid,
bd : response_body
}
}
module replica = {
function store(K:key) : value
after init {
store(K) := 0
}
action exec(inp : request_body) returns (out:response_body) = {
if some (r:write_req.t) inp *> r {
store(r.ky) := r.vl;
var wr : write_resp.t;
out := wr
}
else if some (r:read_req.t) inp *> r {
var rr : read_resp.t;
rr.vl := store(r.ky);
out := rr
}
}
}
module reference = {
instance rep : replica
action create(inp : request_body) returns (tx : txid)
action serialize(tx : txid)
action commit(tx : txid)
action eval(tx : txid) returns (res : response_body)
var next : txid
function txs(X:txid) : request_body
function txres(X:txid) : response_body
relation serialized(X:txid)
relation committed(X:txid)
after init {
next := 0;
committed(X) := false;
serialized(X) := false;
}
implement create {
tx := next;
txs(tx) := inp;
next := next + 1;
}
implement serialize {
assert 0 <= tx & tx < next;
assert ~serialized(tx);
serialized(tx) := true;
}
delegate serialize
implement commit {
assert 0 <= tx & tx < next;
assert serialized(tx) & ~committed(tx);
txres(tx) := rep.exec(txs(tx));
committed(tx) := true;
}
delegate commit
implement eval {
assert committed(tx);
res := txres(tx);
}
delegate eval
interpret txid -> int
}
instance ref : reference
type client_id
type req_msg = struct {
cid : client_id,
req : request.t
}
module client(cid,srvr_chan,cl_chans) = {
action client_request(req : request_body)
implement client_request {
var m : req_msg;
m.cid := cid;
m.req.tx := ref.create(req);
m.req.bd := req;
call srvr_chan.send(m);
}
}
module primary_node(port, fwd_chan, rev_chan, cl_chans) = {
instance rep : replica
instance req_chan : nondup_endpoint(port,req_msg)
instance counter : unbounded_sequence
function pending(K:key) : counter.t
after init {
pending(K) := 0;
}
implement req_chan.recv(inp : req_msg) {
if some (rr:read_req.t) inp.req.bd *> rr {
if pending(rr.ky) = 0 {
call ref.commit(inp.req.tx);
var res : response.t;
res.tx := inp.req.tx;
res.bd := rep.exec(rr);
call cl_chans(inp.cid).send(res)
} else {
call req_chan.send(inp); # if cannot execute, recirculate
}
} else if some (wr:write_req.t) inp.req.bd *> wr {
call ref.serialize(inp.req.tx); # this is ghost!
call fwd_chan.send(inp);
pending(wr.ky) := pending(wr.ky).next;
var res := rep.exec(inp.req.bd);
}
}
implement rev_chan.recv(inp : req_msg) {
if some (wr:write_req.t) inp.req.bd *> wr {
pending(wr.ky) := pending(wr.ky).prev;
}
}
}
module secondary_node(port, fwd_chan, rev_chan, cl_chans) = {
instance rep : replica
instance req_chan : nondup_endpoint(port,req_msg)
implement req_chan.recv(inp : req_msg) {
if some (rr:read_req.t) inp.req.bd *> rr {
var res : response.t;
res.tx := inp.req.tx;
res.bd := rep.exec(rr);
call cl_chans(inp.cid).send(res);
}
ignore writes!
}
implement fwd_chan.recv(inp : req_msg) {
call ref.commit(inp.req.tx);
var res : response.t;
res.tx := inp.req.tx;
res.bd := rep.exec(inp.req.bd);
call cl_chans(inp.cid).send(res);
call rev_chan.send(inp);
}
}
instance fwd_chan : tcp_channel("localhost:44090",req_msg)
instance rev_chan : tcp_channel("localhost:44091",req_msg)
instance cl_chans : nondup_endpoint_set(client_id,44100,response.t)
instance cl(X:client_id) : client(X,prim.req_chan,cl_chans)
instance prim : primary_node(44200,fwd_chan.sndr,rev_chan.rcvr,cl_chans)
instance sec : secondary_node(44201,fwd_chan.rcvr,rev_chan.sndr,cl_chans)
object service_spec = {
before cl_chans.send(p : client_id, m : response.t) {
assert m.bd = ref.eval(m.tx);
}
}
object mid_spec = {
instance queue : unbounded_queue(txid)
after ref.serialize(tx:txid) {
call queue.push(tx);
}
before fwd_chan.rcvr.recv(inp : req_msg) {
assert inp.req.bd = ref.txs(inp.req.tx);
assert inp.req.tx = queue.pop;
call ref.commit(inp.req.tx);
}
delegate fwd_chan_rcvr_recv[before] -> prim
}
export cl.client_request
import cl_chans.recv
trusted isolate iso_prim = prim with cl,cl_chans,ref,service_spec,mid_spec,fwd_chan,rev_chan
trusted isolate iso_sec = sec with ref,service_spec,mid_spec
interpret value -> bv[16]