Skip to content

This file contains a first attempt at refining the network model to¤

a sort of toy TCP. Two conjectures were added about the messages in¤

the lower (UDP) network layer (that is, they must obey the same¤

invariants are the message in the transport layer messages queues.¤

¤

This is the abstract specification of the sharded hash table protocol. It defines the consistency reference model (reference)

¤
¤

Global type declarations

¤
¤

total order theory

module total_order(carrier) = {
    relation (T:carrier < U:carrier)
note: because < is polymorphic, we need to use the type here
    axiom (T:carrier < U & U < V) -> (T < V)
    axiom ~(T:carrier < U & U < T)
    axiom T:carrier < U | T = U | U < T
could add <= as derived relation here
}

module total_non_strict_order(carrier) = {
    relation (T:carrier <= U:carrier)
note: because <= is polymorphic, we need to use the type here
    axiom (T:carrier <= U & U <= V) -> (T <= V)
    axiom T:carrier <= T
    axiom (T:carrier <= U & U <= T) -> T = U
    axiom T:carrier <= U | U <= T
could add <= as derived relation here
}

¤

type of local time

type ltime
instantiate total_order(ltime)

¤

type of hash table data

type data

¤

type of hash table keys

type key
instantiate total_non_strict_order(key)

¤

type of message types

type mtype = {request_t, reply_t, delegate_t, ack_t}

¤

type of hash table ops

type otype = {read, write}

¤

id's of protocol agents

type id

¤

Shards contain a subset of a map from a low to a hi key

type shard
function shard_lo(S:shard) : key
function shard_hi(S:shard) : key
function shard_value(S:shard,K:key) : data
¤

A request message has a source, key, op data and ltime

type req
function req_src(R:req) : id
function req_key(R:req) : key
function req_otype(R:req) : otype
function req_data(R:req) : data
function req_ltime(R:req) : ltime
request messages are extensional

axiom
req_src(R) = req_src(S) &
req_key(R) = req_key(S) &
req_otype(R) = req_otype(S) &
req_data(R) = req_data(S) &
req_ltime(R) = req_ltime(S) &
req_src(R) = req_src(S)
-> R = S

object reqs = {
    action make(src:id,k:key,op:otype,d:data,lt:ltime) returns(res:req) = {
    assume req_src(R) = src &
    req_key(R) = k &
    req_otype(R) = op &
    req_data(R) = d &
    req_ltime(R) = lt
    }
}
¤

structure of hash table events to serialize

module hashev = {
    individual type_ : otype
    individual key_ : key
    individual data_  : data # data for write and cas
    individual id_ : id      # process id of op
    relation serialized      # this event has happened

    init ~serialized
}

¤

Reference specification

This module describes a set of memory events ordered by local time. It provides actions that assign global time to events in increasing order. The pre-conditions of these actions enforce the consistency model (that is, what orderings in local time must be preserved in global time).

¤
object reference = {
¤

memory events by local time TODO: can we export this read-only?

    instantiate evs(T:ltime) : hashev
¤

global memory state obeys partial function axiom

    individual hash(A:key) : data
An aribtrary init value
    init hash(A) = 0

¤

serialize an event lt at current global time. The "id" parameter tells us what process is serializing the event.

    action serialize(lt:ltime, id_:id) = {

    assert ~evs(lt).serialized;
serialize at current global time
    evs(lt).serialized := true;
update the global memory state
    local a : key, d : data {
            a := evs(lt).key_;
        d := evs(lt).data_;
        if evs(lt).type_ = read {
        evs(lt).data_ := hash(a)
        }
        else { # write
        hash(a) := d
        }
    }
    }
    delegate serialize

¤

Note: if we want linearizability, we can add begin and end transaction events to this interface

}
¤

Specification of hash tables

module hash_top(key,value) = {
The server's hash table

    function hash(X:key) : data
    init hash(X) = 0

    action set_(k:key, v:value) = {
    hash(k) := v
    }

    action get(k:key) returns (v:value) = {
    assume v = hash(k)
    }
This extracts a shard from a hash table

    action extract_(lo:key,hi:key) returns(res:shard) = {
    assume shard_lo(res) = lo;
    assume shard_hi(res) = hi;
    assume (lo <= X & X <= hi) -> shard_value(res,X) = hash(X)
    }
This incorporates a shard into a hash table

    action incorporate(s:shard) = {
    hash(K) := shard_value(s,K) if (shard_lo(s) <= K & K <= shard_hi(s)) else hash(K)
    }

}
¤

Specification of delegation map

module delegation_map = {
The delegation map has to be specified with a relation, since as a function it would be unstratified.

    relation map(K:key, X:id)
    init map(K,X) <-> X = 0

    action set_(lo:key,hi:key,dst:id) = {
    map(K,X) := (X = dst) & (lo <= K & K <= hi)
                | map(K,X) & ~(lo <= K & K <= hi)
    }

    action get(k:key) returns (val:id) = {
    assume map(k,val)   # assume not assign because this code is ghost
    }
}
¤

This is the top-level server description

"ref" is the reference object "me" is the server's id net is the network

module server_top(ref,me,net) = {
The server's hash table

    instance hash : hash_top(key,data)
The server's delegation map

    instance dm : delegation_map

    action handle_request(rq:req) = {
    local src:id, k:key, op:otype, d:data, lt:ltime, ow:id {
        src := req_src(rq);
        k := req_key(rq);
        op := req_otype(rq);
        d := req_data(rq);
        lt := req_ltime(rq);
        ow := dm.get(k);
        if ow = me {
        call ref.serialize(lt,me);  # this is ghost!
        if op = read {
            d := hash.get(k)
        }
        else {
            call hash.set_(k,d)
        };
        call net.send_reply(src, d, lt)
        } else {
        call net.send_request(ow,rq)  # forward request
        }
    }
    }

    action generate_request(rq:req) = {
    call handle_request(rq)
    }

    action recv_request(rq:req) = {
    call handle_request(rq)
    }

    action shard(dst:id, lo:key, hi:key) = {
    assume dst ~= me;
    assume lo <= K & K <= hi -> dm.get(K:key) = me;
    call dm.set_(lo,hi,dst);
    call net.send_delegate(dst,hash.extract_(lo,hi))
    }

    action recv_delegate(s:shard) = {
    call dm.set_(shard_lo(s),shard_hi(s),me);
    call hash.incorporate(s)
    }
If I own this key, then my has table data matches the reference
    conjecture dm.map(K,me) -> hash.hash(K) = ref.hash(K)
If I own this key, then no one else does
    conjecture dm.map(K,me) & X ~= me -> ~servers(X).dm.map(K,X)
If I own this key, then no delegated shard does
    conjecture dm.map(K,me) -> ~(net.delegated(X,S) & shard_lo(S) <= K & K <= shard_hi(S))
No two delegated shards have keys in common
    conjecture net.delegated(X,S) & shard_lo(S) <= K & K <= shard_hi(S)
               & net.delegated(X1,S1) & shard_lo(S1) <= K & K <= shard_hi(S1)
               -> X = X1 & S = S1
Forwarded requests have correct operations relative to the reference

    conjecture net.requested(D,R) & L = req_ltime(R)->
               (req_key(R) = ref.evs(L).key_ &
                req_otype(R) = ref.evs(L).type_ &
                (req_otype(R) = write -> req_data(R) = ref.evs(L).data_))
All forwarded requests have been generated

    conjecture net.requested(D,R) -> net.generated(req_ltime(R))
No two forwarded requests with the same ltime

    conjecture net.requested(D1,R1) & net.requested(D2,R2) & req_ltime(R1) = req_ltime(R2)
               -> D1 = D2 & R1 = R2
Delegated shards have correct data

    conjecture net.delegated(X,S) & shard_lo(S) <= K & K <= shard_hi(S) -> shard_value(S,K) = ref.hash(K)

}
¤

Specification of network top layer

Currently, delivery is out-of-order non-duplicating

module net_top(ref,proto) = {

    relation requested(D:id,R:req)
    relation delegated(D:id,S:shard)

    init ~requested(D,R)
    init ~delegated(D,S)

    relation generated(L:ltime)
    init ~generated(L)

    action send_reply(dst:id, d:data, lt:ltime) = {
    assert ref.evs(lt).data_ = d   # for now, just assert data values are correct
    }
    delegate send_reply

    action send_request(dst:id,rq:req) = {
    assert ~requested(dst,rq);
    requested(dst,rq) := true
    }
    delegate send_request

    action send_delegate(dst:id,s:shard) = {
    assert ~delegated(dst,s);
    delegated(dst,s) := true
    }
    delegate send_delegate

    action recv_request(dst:id,rq:req) = {
    assert requested(dst,rq);
    requested(dst,rq) := false
    }
    execute recv_request before proto.recv_request

    action generate_request(dst:id,rq:req) = {
    assert ~generated(req_ltime(rq));
    assert L = req_ltime(rq)->
               (req_key(rq) = ref.evs(L).key_ &
                req_otype(rq) = ref.evs(L).type_ &
                (req_otype(rq) = write -> req_data(rq) = ref.evs(L).data_));
    generated(req_ltime(rq)) := true
    }
    execute generate_request before proto.generate_request

    action recv_delegate(dst:id,s:shard) = {
    assume delegated(dst,s);
    delegated(dst,s) := false
    }
    execute recv_delegate before proto.recv_delegate
}

type seq_num
instantiate total_non_strict_order(seq_num)

object seq_nums = {
    action next(seq:seq_num) returns (res:seq_num) = {
    assume seq <= res & seq ~= res & (X <= res -> X <= seq)
    }
}

type net_msg

function net_msg_mtype(M:net_msg) : mtype
function net_msg_src(M:net_msg) : id
function net_msg_dst(M:net_msg) : id
function net_msg_req(M:net_msg) : req
function net_msg_seq_num(M:net_msg) : seq_num
function net_msg_shard(M:net_msg) : shard


object encoder = {

    action mk_req(src:id,dst:id,rq:req,seq:seq_num) returns (msg:net_msg) = {
    assume net_msg_src(msg) = src
      & net_msg_dst(msg) = dst
      & net_msg_req(msg) = rq
      & net_msg_seq_num(msg) = seq
      & net_msg_mtype(msg) = request_t
    }

    action mk_delegate(src:id,dst:id,sh:shard,seq:seq_num) returns (msg:net_msg) = {
    assume net_msg_src(msg) = src
      & net_msg_dst(msg) = dst
      & net_msg_shard(msg) = sh
      & net_msg_seq_num(msg) = seq
      & net_msg_mtype(msg) = delegate_t
    }

    action mk_ack(src:id,dst:id,seq:seq_num) returns (msg:net_msg) = {
    assume net_msg_src(msg) = src
      & net_msg_dst(msg) = dst
      & net_msg_seq_num(msg) = seq
      & net_msg_mtype(msg) = ack_t
    }

}

module message_queue_top(me) = {

    relation contents(M:net_msg)
    init ~contents(M)

    action enqueue(msg:net_msg) = {
    assert ~contents(msg);
    contents(msg) := true
    }

    action empty returns (res:bool) = {
    assume contents(M) -> ~res;
    assume res -> ~contents(M)
    }

    action pick_one returns (res:net_msg) = {
    assume ~contents(res) -> ~contents(M)
    }

    action delete_all(seq:seq_num) = {
    contents(M) := contents(M) & ~(net_msg_seq_num(M) <= seq)
    }

}

module net_impl(ref,proto,me,lower) = {

    instance mq(D:id) : message_queue_top(D)
    individual send_seq(S:id) : seq_num
    individual recv_seq(S:id) : seq_num

    init recv_seq(S) = 0 & send_seq(S) = 0

    action send_request(dst:id,rq:req) = {
    local msg : net_msg, seq : seq_num {
        msg := encoder.mk_req(me,dst,rq,send_seq(dst));
        send_seq(dst) := seq_nums.next(send_seq(dst));
        call mq(dst).enqueue(msg);
        call lower.send(msg)
        }
    }

    action send_delegate(dst:id,sh:shard) = {
    local msg : net_msg, seq : seq_num {
        msg := encoder.mk_delegate(me,dst,sh,send_seq(dst));
        send_seq(dst) := seq_nums.next(send_seq(dst));
        call mq(dst).enqueue(msg);
        call lower.send(msg)
        }
    }

    action recv_msg(msg:net_msg) = {
    local src:id,seq:seq_num {
        seq := net_msg_seq_num(msg);
        src := net_msg_src(msg);
        if seq <= recv_seq(src) & net_msg_mtype(msg) ~= ack_t  {
        call lower.send_msg(encoder.mk_ack(me,src,seq))
        };
        if seq = recv_seq(src) {
        recv_seq(src) := seq_nums.next(recv_seq(src));
        if net_msg_mtype(msg) = request_t {
            call proto.recv_request(me,net_msg_req(msg))
        }
        else if net_msg_mtype(msg) = delegate_t {
            call proto.recv_delegate(me,net_msg_shard(msg))
        }
        else if net_msg_mtype(msg) = ack_t {
            call mq(src).delete_all(seq)
        }
        }
    }
    }

    action timeout = {
    if exists D:id,M:net_msg. mq(D).contents(M) {
        local dst:id, msg:net_msg {
        assume ~mq(dst).empty;
        msg := mq(dst).pick_one;
        call lower.send_msg(msg)
        }
    }
    }
If I have a request message for D enqueued and if its sequence number is

= D's receive sequence number, then the message is pending.

    conjecture mq(D).contents(M) & neti(D).recv_seq(me) <= net_msg_seq_num(M)
       & net_msg_mtype(M) = request_t -> net.requested(D,net_msg_req(M))
If I have a delegate message for D enqueued and if its sequence number is

= D's receive sequence number, then the message is pending.

    conjecture mq(D).contents(M) & neti(D).recv_seq(me) <= net_msg_seq_num(M)
       & net_msg_mtype(M) = delegate_t -> net.delegated(D,net_msg_shard(M))
A given request cannot occur twice in the network

    conjecture neti(S1).mq(D).contents(M1) & neti(D).recv_seq(S1) <= net_msg_seq_num(M1)
       & neti(S2).mq(D).contents(M2) & neti(D).recv_seq(S2) <= net_msg_seq_num(M2)
       & net_msg_mtype(M1) = request_t & net_msg_mtype(M2) = request_t
       & (S1 ~= S2 | net_msg_seq_num(M1) ~= net_msg_seq_num(M2))
       -> net_msg_req(M1) ~= net_msg_req(M2)
A given delegation cannot occur twice in the network

    conjecture neti(S1).mq(D).contents(M1) & neti(D).recv_seq(S1) <= net_msg_seq_num(M1)
       & neti(S2).mq(D).contents(M2) & neti(D).recv_seq(S2) <= net_msg_seq_num(M2)
       & net_msg_mtype(M1) = delegate_t & net_msg_mtype(M2) = delegate_t
       & (S1 ~= S2 | net_msg_seq_num(M1) ~= net_msg_seq_num(M2))
       -> net_msg_shard(M1) ~= net_msg_shard(M2)
The sending seq number is greater than any queue entry

    conjecture mq(D).contents(M) -> ~(send_seq(D) <= net_msg_seq_num(M))
No two messages in a queue have the same sequence number

    conjecture mq(D).contents(M1) & mq(D).contents(M2)
        -> net_msg_seq_num(M1) ~= net_msg_seq_num(M2)
A sent message must match any message queue entry with the same sequence number

    conjecture low.sent(M) & net_msg_src(M) = me & net_msg_dst(M) = D
       & mq(D).contents(M2) & net_msg_seq_num(M2) = net_msg_seq_num(M)
       -> M = M2
Following added due to counterexamples

A sent message must with seq num >= receiver must be pending

    conjecture low.sent(M) & net_msg_src(M) = S & net_msg_dst(M) = D
      & neti(D).recv_seq(S) <= net_msg_seq_num(M)
      & net_msg_mtype(M) = request_t -> net.requested(D,net_msg_req(M))

    conjecture low.sent(M) & net_msg_src(M) = S & net_msg_dst(M) = D
      & neti(D).recv_seq(S) <= net_msg_seq_num(M)
      & net_msg_mtype(M) = delegate_t -> net.delegated(D,net_msg_shard(M))

}

module lower(net_impl) = {

    relation sent(M:net_msg)

    init ~sent(M)

    action send_msg(msg:net_msg) = {
    sent(msg) := true
    }

    action recv_msg(dst:id, msg:net_msg) = {
    assert sent(msg)
    }
    execute recv_msg before net_impl.recv_msg
}
¤

instantiate some components

make one server for each id

instance servers(X:id) : server_top(reference,X,net)
make one top level network
instance net : net_top(reference,servers)
network implementation
instance neti(X:id) : net_impl(reference,servers,X,low)
lower network specification
instance low : lower(neti)

¤

export some stuff

export servers.generate_request
export servers.shard
export neti.recv_msg
export neti.timeout
¤

verify the servers

This says verify all the actions of "servers", keeping "ref" and "net" concrete.

isolate iso_s = servers with reference,net
Verify the network implementation, using its upper and lower specifications

isolate iso_n = neti with net,low