Repstore variant

include tcp
include udp

type key
type value
ghost type txid

type request_body

object write_req = {
    variant t of request_body = struct {
    ky : key,
    vl : value
    }
}

object read_req = {
    variant t of request_body = struct {
    ky : key
    }
}

object request = {
    type t = struct {
    tx : txid,
    bd : request_body
    }
}

type response_body

object write_resp = {
    variant t of response_body = struct {
    }
}

object read_resp = {
    variant t of response_body = struct {
    vl : value
    }
}

object response = {
    type t = struct {
    tx : txid,
    bd : response_body
    }
}

module replica = {
    function store(K:key) : value
    after init {
    store(K) := 0
    }
    action exec(inp : request_body) returns (out:response_body) = {
    if some (r:write_req.t) inp *> r {
        store(r.ky) := r.vl;
        var wr : write_resp.t;
        out := wr
    }
    else if some (r:read_req.t) inp *> r {
        var rr : read_resp.t;
        rr.vl := store(r.ky);
        out := rr
    }
    }
}


module reference = {
    instance rep : replica


    action create(inp : request_body) returns (tx : txid)
    action serialize(tx : txid)
    action commit(tx : txid)
    action eval(tx : txid) returns (res : response_body)

    individual next : txid
    function txs(X:txid) : request_body
    function txres(X:txid) : response_body
    relation serialized(X:txid)
    relation committed(X:txid)

    after init {
    next := 0;
    committed(X) := false;
    serialized(X) := false;
    }

    implement create {
    tx := next;
    txs(tx) := inp;
    next := next + 1;
    }

    implement serialize {
    assert 0 <= tx & tx < next;
    assert ~serialized(tx);
    serialized(tx) := true;
    }
    delegate serialize

    implement commit {
    assert 0 <= tx & tx < next;
    assert serialized(tx) & ~committed(tx);
    txres(tx) := rep.exec(txs(tx));
    committed(tx) := true;
    }
    delegate commit

    implement eval {
    assert committed(tx);
    res := txres(tx);
    }
    delegate eval

    interpret txid -> int
}

instance ref : reference

type client_id

type req_msg = struct {
    cid : client_id,
    req : request.t
}

module client(cid,srvr_chan,cl_chans) = {

    action client_request(req : request_body)

    implement client_request {
    local m : req_msg {
        m.cid := cid;
        m.req.tx := ref.create(req);
        m.req.bd := req;
        call srvr_chan.send(m);
    }
    }
}

module primary_node(port, fwd_chan, rev_chan, cl_chans) = {
    instance rep : replica

    instance req_chan : nondup_endpoint(port,req_msg)

    instance counter : unbounded_sequence
    function pending(K:key) : counter.t

    after init {
    pending(K) := 0;
    }

    implement req_chan.recv(inp : req_msg) {
        if some (rr:read_req.t) inp.req.bd *> rr {
        if pending(rr.ky) = 0 {
        call ref.commit(inp.req.tx);
        var res : response.t;
        res.tx := inp.req.tx;
        res.bd := rep.exec(rr);
        call cl_chans(inp.cid).send(res)
        } else {
                call req_chan.send(inp);  # if cannot execute, recirculate
        }
    } else if some (wr:write_req.t) inp.req.bd *> wr {
        call ref.serialize(inp.req.tx);           # this is ghost!
        call fwd_chan.send(inp);
        pending(wr.ky) := pending(wr.ky).next;
        var res := rep.exec(inp.req.bd);
    }
    }

    implement rev_chan.recv(inp : req_msg) {
    if some (wr:write_req.t) inp.req.bd *> wr {
        pending(wr.ky) := pending(wr.ky).prev;
    }
    }
}

module secondary_node(port, fwd_chan, rev_chan, cl_chans) = {
    instance rep : replica

    instance req_chan : nondup_endpoint(port,req_msg)

    implement req_chan.recv(inp : req_msg) {
        if some (rr:read_req.t) inp.req.bd *> rr {
        var res : response.t;
        res.tx := inp.req.tx;
        res.bd := rep.exec(rr);
        call cl_chans(inp.cid).send(res);
    }
ignore writes!
    }

    implement fwd_chan.recv(inp : req_msg) {
    call ref.commit(inp.req.tx);
    var res : response.t;
    res.tx := inp.req.tx;
    res.bd := rep.exec(inp.req.bd);
    call cl_chans(inp.cid).send(res);
    call rev_chan.send(inp);
    }

}


instance fwd_chan : tcp_channel("localhost:44090",req_msg)
instance rev_chan : tcp_channel("localhost:44091",req_msg)

instance cl_chans : nondup_endpoint_set(client_id,44100,response.t)
instance cl(X:client_id) : client(X,prim.req_chan,cl_chans)
instance prim : primary_node(44200,fwd_chan.sndr,rev_chan.rcvr,cl_chans)
instance sec : secondary_node(44201,fwd_chan.rcvr,rev_chan.sndr,cl_chans)

object service_spec = {
    before cl_chans.send(p : client_id, m : response.t) {
    assert m.bd = ref.eval(m.tx);
    }
}

object mid_spec = {
    instance queue : unbounded_queue(txid)

    after ref.serialize(tx:txid) {
    call queue.push(tx);
    }

    before fwd_chan.rcvr.recv(inp : req_msg) {
    assert inp.req.bd = ref.txs(inp.req.tx);
    assert inp.req.tx = queue.pop;
    call ref.commit(inp.req.tx);
    }

    delegate fwd_chan_rcvr_recv[before] -> prim

}

export cl.client_request
import cl_chans.recv

trusted isolate iso_prim = prim with cl,cl_chans,ref,service_spec,mid_spec,fwd_chan,rev_chan
trusted isolate iso_sec = sec with ref,service_spec,mid_spec

interpret value -> bv[16]