Trans
include queue
include timeout
module sht_transport(lower,req,shard,seq_num,id) = {
relation requested(D:id,R:req)
relation replied(D:id,R:req)
relation delegated(D:id,S:shard)
action send_request(src:id,dst:id,rq:req) returns (ok:bool)
action send_delegate(src:id,dst:id,s:shard) returns (ok:bool)
action send_reply(src:id, dst:id, rq:req) returns (ok:bool)
action recv_request(dst:id,rq:req)
action recv_reply(dst:id,rq:req)
action recv_delegate(dst:id,s:shard)
object spec = {
init ~requested(D,R)
before send_request {
assert ~requested(dst,rq)
}
after send_request {
if ok {
requested(dst,rq) := true
}
}
before recv_request {
assert requested(dst,rq);
requested(dst,rq) := false
}
init ~replied(D,R)
before send_reply {
assert ~replied(dst,rq)
}
after send_reply {
if ok {
replied(dst,rq) := true
}
}
before recv_reply {
assert replied(dst,rq);
replied(dst,rq) := false
}
init ~delegated(D,S)
before send_delegate {
assert ~delegated(dst,s)
}
after send_delegate {
if ok {
delegated(dst,s) := true
}
}
before recv_delegate {
assert delegated(dst,s);
delegated(dst,s) := false
}
}
type mtype = {request_t, reply_t, delegate_t, ack_t}
object net_msg = {
type t = struct {
mty : mtype,
src : id,
rq : req,
num : seq_num.t,
sh : shard
}
}
object impl(me:id) = {
instance mq(D:id) : message_queue(net_msg,seq_num)
instance timer(D:id) : timeout_sec
individual send_seq(S:id) : seq_num.t
individual recv_seq(S:id) : seq_num.t
init recv_seq(S) = 0 & send_seq(S) = 0
implement send_request(dst:id,rq:req) {
local msg : net_msg.t, seq : seq_num.t {
net_msg.mty(msg) := request_t;
net_msg.src(msg) := me;
net_msg.rq(msg) := rq;
net_msg.num(msg) := send_seq(dst);
send_seq(dst) := seq_num.next(send_seq(dst));
ok := mq(dst).enqueue(msg);
if ok {
call lower.send(me,dst,msg)
}
}
}
implement send_delegate(dst:id,sh:shard) {
local msg : net_msg.t, seq : seq_num.t {
net_msg.mty(msg) := delegate_t;
net_msg.src(msg) := me;
net_msg.sh(msg) := sh;
net_msg.num(msg) := send_seq(dst);
send_seq(dst) := seq_num.next(send_seq(dst));
ok := mq(dst).enqueue(msg);
if ok {
call lower.send(me,dst,msg)
}
}
}
implement send_reply(dst:id,rq:req) {
local msg : net_msg.t, seq : seq_num.t {
net_msg.mty(msg) := reply_t;
net_msg.src(msg) := me;
net_msg.rq(msg) := rq;
net_msg.num(msg) := send_seq(dst);
send_seq(dst) := seq_num.next(send_seq(dst));
ok := mq(dst).enqueue(msg);
if ok {
call lower.send(me,dst,msg)
}
}
}
implement lower.recv(msg:net_msg.t) {
local src:id,seq:seq_num.t {
seq := net_msg.num(msg);
src := net_msg.src(msg);
if seq <= recv_seq(src) & net_msg.mty(msg) ~= ack_t {
local ack : net_msg.t {
net_msg.mty(ack) := ack_t;
net_msg.src(ack) := me;
net_msg.num(ack) := seq;
call lower.send(me,src,ack)
}
};
if net_msg.mty(msg) = ack_t {
call mq(src).delete_all(seq)
}
else if seq = recv_seq(src) {
recv_seq(src) := seq_num.next(recv_seq(src));
if net_msg.mty(msg) = request_t {
call recv_request(me,net_msg.rq(msg))
}
else if net_msg.mty(msg) = reply_t {
call recv_reply(me,net_msg.rq(msg))
}
else if net_msg.mty(msg) = delegate_t {
call recv_delegate(me,net_msg.sh(msg))
}
}
}
}
implement timer.timeout(dst:id) {
if ~mq(dst).empty {
call lower.send(me,dst,mq(dst).pick_one)
}
}
= D's receive sequence number, then the message is pending.
conjecture mq(D).contents(M) & impl(D).recv_seq(me) <= net_msg.num(M)
& net_msg.mty(M) = request_t -> requested(D,net_msg.rq(M))
= D's receive sequence number, then the message is pending.
conjecture mq(D).contents(M) & impl(D).recv_seq(me) <= net_msg.num(M)
& net_msg.mty(M) = reply_t -> replied(D,net_msg.rq(M))
= D's receive sequence number, then the message is pending.
conjecture mq(D).contents(M) & impl(D).recv_seq(me) <= net_msg.num(M)
& net_msg.mty(M) = delegate_t -> delegated(D,net_msg.sh(M))
conjecture impl(S1).mq(D).contents(M1) & impl(D).recv_seq(S1) <= net_msg.num(M1)
& impl(S2).mq(D).contents(M2) & impl(D).recv_seq(S2) <= net_msg.num(M2)
& net_msg.mty(M1) = request_t & net_msg.mty(M2) = request_t
& (S1 ~= S2 | net_msg.num(M1) ~= net_msg.num(M2))
-> net_msg.rq(M1) ~= net_msg.rq(M2)
conjecture impl(S1).mq(D).contents(M1) & impl(D).recv_seq(S1) <= net_msg.num(M1)
& impl(S2).mq(D).contents(M2) & impl(D).recv_seq(S2) <= net_msg.num(M2)
& net_msg.mty(M1) = reply_t & net_msg.mty(M2) = reply_t
& (S1 ~= S2 | net_msg.num(M1) ~= net_msg.num(M2))
-> net_msg.rq(M1) ~= net_msg.rq(M2)
conjecture impl(S1).mq(D).contents(M1) & impl(D).recv_seq(S1) <= net_msg.num(M1)
& impl(S2).mq(D).contents(M2) & impl(D).recv_seq(S2) <= net_msg.num(M2)
& net_msg.mty(M1) = delegate_t & net_msg.mty(M2) = delegate_t
& (S1 ~= S2 | net_msg.num(M1) ~= net_msg.num(M2))
-> net_msg.sh(M1) ~= net_msg.sh(M2)
conjecture mq(D).contents(M) -> ~(send_seq(D) <= net_msg.num(M))
conjecture mq(D).contents(M1) & mq(D).contents(M2) & M1 ~= M2
-> net_msg.num(M1) ~= net_msg.num(M2)
conjecture lower.spec.sent(M,D) & net_msg.src(M) = me
& mq(D).contents(M2) & net_msg.num(M2) = net_msg.num(M)
& net_msg.mty(M) ~= ack_t -> M = M2
A sent non-ack message with seq num >= receiver must be in the corresponding queue
conjecture lower.spec.sent(M,D) & net_msg.src(M) = S
& impl(D).recv_seq(S) <= net_msg.num(M) & net_msg.mty(M) ~= ack_t
-> impl(S).mq(D).contents(M)
conjecture lower.spec.sent(M,D) & net_msg.src(M) = S
& net_msg.mty(M) = ack_t -> ~(impl(S).recv_seq(D) <= net_msg.num(M))
conjecture lower.spec.sent(M,D) & net_msg.src(M) = me & net_msg.mty(M) ~= ack_t
-> ~(send_seq(D) <= net_msg.num(M))
conjecture mq(D).contents(M) -> net_msg.src(M) = me & net_msg.mty(M) ~= ack_t
isolate iso_mq(mq_me:id) = mq(mq_me) with seq_num
}
isolate iso = impl with spec,udp,num
}