Llqueue2 bounded
A model of two linked-list queues sharing a memory. An empty queue reserves one memort cell to avoid being starved.
Properties:
1) Progress -- if we infinitely often call receive, then every sent message is received 2) Non-blocking -- if we infinitely often send and receive, send eventually succeeds 3) No memory leaks -- when the queue is empty there is always a free cell
include order
include collections
instance nat : unbounded_sequence
module signal = {
action raise
specification {
relation now
after init { now := false; }
before raise {
now := true;
now := false;
}
invariant ~now
}
}
module signal1(data) = {
action raise(val:data)
specification {
relation now
var value : data
after init { now := false; }
before raise {
value := val;
now := true;
now := false;
}
invariant ~now
}
}
type id
type cell
module memory = {
var full(X:cell):bool # Full bit for each cell
var val(X:cell):id # Message contents of cell, if any
instance link : partial_function(cell,cell) # Links
after init {
full(C) := false;
}
}
module bounded_ll_queue(mem,other) = {
action send(x:id) returns (ok:bool)
action recv returns (x:id, ok:bool)
specification {
var head : nat
var tail : nat
var queue(X:nat) : id
var full(X:cell):bool # Ghost full bit for each cell for this queue
var head_cell : cell # Pointer to head cell if any
var tail_cell : cell # Pointer to tail cell if any
var empty : bool # The queue is empty
var index(X:cell) : nat
after init {
head := 0;
tail := 0;
full(C) := false;
empty := true;
head_cell := tail_cell;
}
instance send_trying : signal
instance sending : signal1(id)
instance recv_trying : signal
instance receiving : signal1(id)
before send {
send_trying.raise;
if some c:cell. ~mem.full(c) & c ~= other.tail_cell {
ok := true;
queue(tail) := x;
full(c) := true; # ghost
mem.full(c) := true;
mem.val(c) := x;
if ~empty {
mem.link.remap(tail_cell,c);
} else {
head_cell := c;
}
tail_cell := c;
empty := false;
index(c) := tail;
tail := tail.next;
sending.raise(x);
} else {
ok := false;
}
}
before recv {
recv_trying.raise; # ghost action signalling polling of queue
ok := ~empty;
if ok {
call receiving.raise(mem.val(head_cell));
head := head.next;
mem.full(head_cell) := false;
full(head_cell) := false; # ghost
if head_cell = tail_cell {
empty := true;
} else {
var c := mem.link.get(head_cell);
mem.link.remove(head_cell);
head_cell := c;
}
}
}
temporal property [lpq]
forall X. ((globally eventually receiving.now)
& (eventually sending.now & sending.value = X) ->
(eventually receiving.now & receiving.value = X))
proof {
tactic skolemize;
tactic l2s_auto with {
definition work_created(X) = (X < tail)
definition work_needed(X) = ~exists Y. Y < X & queue(Y) = _X
definition work_done(X) = (X < head)
definition work_start = (sending.now & sending.value = _X)
definition work_progress = receiving.now
}
showgoals
}
This is an example of a two-phase eventuality proof. The first phase empties the queue. This guarantees progress when we received from the queue. The second phase sends a message on an empty queue. This phase guarantees progress when we send and message, but only if phase 1 is complete (i.e., if the queue is empty). For each phase, we define work_created, work_needed, work_done and work_progress.
The phases are ordered lexically by their names. That is, phase 2 starts when phase 1 is complete.
temporal property [nb]
((globally eventually recv_trying.now)
& (globally eventually send_trying.now)
-> (globally eventually sending.now))
proof {
tactic skolemize;
tactic l2s_auto with {
definition work_start = ($l2s_g. ~sending.now)
definition work_created[1](X) = (X <= tail)
definition work_needed[1](X) = (X <= tail)
definition work_done[1](X) = (X <= head)
definition work_progress[1] = recv_trying.now
definition work_created[2] = true
definition work_needed[2] = true
definition work_done[2] = false
definition work_progress[2] = send_trying.now
}
showgoals
}
invariant empty -> exists X. ~mem.full(X) & X ~= other.tail_cell
invariant empty <-> head = tail
invariant empty -> ~mem.full(tail_cell)
invariant head <= tail
invariant full(C) -> index(C) < tail
invariant full(C) -> head <= index(C)
invariant ~empty -> full(head_cell) & index(head_cell) = head
invariant ~empty -> full(tail_cell) & nat.succ(index(tail_cell),tail)
invariant mem.link.pre(C) -> mem.full(C)
invariant mem.link.map(C,D) -> mem.link.pre(C)
invariant full(C) -> (mem.link.pre(C) <-> C ~= tail_cell)
invariant full(C) -> (mem.link.map(C,D) <-> full(D) & nat.succ(index(C),index(D)))
invariant full(C) & full(D) & index(C) = index(D) -> C = D
invariant mem.full(C) <-> (full(C) | other.full(C))
invariant ~(full(C) & other.full(C))
invariant full(C) -> mem.val(C) = queue(index(C))
}
}
instance mem : memory
instance q1: bounded_ll_queue(mem,q2)
instance q2: bounded_ll_queue(mem,q1)
after init {
assume q1.tail_cell ~= q2.tail_cell;
}
export q1.send
export q1.recv
export q2.send
export q2.recv