Lamport mutex
This is an implememtation of Lamport's distributed mutual excluson (DME) algorithm.
We start by including libraries. In this case we need two:
- The
networklibrary for network communication - The
timoutlibrary to implement a periodic timer - The
numberslibrary to get some basic ordered datatypes
include network
include timeout
include order
global section. This contaions the declarations
of any resources that are used in common by all processes. These
usually include:
- Data types
- Services, such as network services
- Immutable global parameters, such as netwrok addresses
We can't have mutable global variables, since processes, being distributed, don't have shared memory.
global {
0 to node_max, where
node_max is implicitly defined here as global parameter whose
value can be selected a run time.
instance host_id : iterable
alias node_max = host_id.max
Since we have three kinds of messages in our protocol, we define an enumerated type for the message kind with three symbolic values.
type msg_kind = {request,reply,release}
unbounded_sequence template in the order library gives a
discrete totally ordered type with a least value 0 and a
next operator.
instance timestamp : unbounded_sequence
object msg_t = {
type this = struct {
kind : msg_kind,
sender_id : host_id,
ts : timestamp
}
definition (M1:msg_t < M2:msg_t) = ts(M1) < ts(M2)
}
tcp.net is a template defined in the
network library that we included above. The template takes one
parameter, which is the type of messages to be sent. Our instance
of this template is an object called net.
instance net : tcp_test.net(msg_t)
}
host_id
which means there is one process in the system for each value of
host_id in the range 0..node_max. The parameter is named self.
This means that the process can refer to its own host identifier by
the name self.
process node(self:host_id) = {
Our action is an export request_cs, which our client uses to
request to enter the critical section. It takes no parameters.
export action request_cs
enter_cs. This is a callback to
the client indicating that is is safe to enter the critical
section.
import action enter_cs
exit_cs. This is called by the
client when exiting the critical section, indicating it is safe to
another process to enter.
export action exit_cs
net in order to communicate. We declare the socket
here. The socket sock is an instance of the template socket
declared by the network service net.
instance sock : net.socket
type state_t = {idle,waiting,critical}
var state : state_t
var ts : timestamp
0 if none.
var request_ts(X:host_id) : timestamp
var reply_ts(X:host_id) : timestamp
after init section runs on initialization of the process. You
aren't allowed to do much here, just assign values to local
variables.
after init {
state := idle;
ts := 0;
request_ts(X) := 0;
reply_ts(X) := 0;
}
We start with the request_cs action. This builds a request message,
appends it to the request queue, and broadcasts it. The action broadcast is
a local action (i.e., a subroutine) and is defined later.
implement request_cs {
ts := ts.next;
var outgoing : msg_t;
outgoing.kind := request;
outgoing.sender_id := self;
outgoing.ts := ts;
broadcast(outgoing);
request_ts(self) := ts;
state := waiting;
}
recv from our network socket,
indicating we have an incoming message. This is called
sock.recv. It gives us as input parameters the network address
of the sending socket (not useful here) and the incoming
message.
implement sock.recv(src:tcp.endpoint,incoming:msg_t) {
First, we update out timestamp to reflect the incoming message.
ts := timestamp.max(incoming.ts,ts).next;
var outgoing : msg_t;
outgoing.sender_id := self;
outgoing.ts := ts;
When we receive a request message, we put it on our request queue,
and return a reply message to the sender.
if incoming.kind = request {
outgoing.kind := reply;
request_ts(incoming.sender_id) := incoming.ts;
unicast(outgoing,incoming.sender_id);
}
release message, the sender's request
must be at the head of our queue. We dequeue it.
else if incoming.kind = release {
request_ts(incoming.sender_id) := 0;
}
else if incoming.kind = reply {
reply_ts(incoming.sender_id) := incoming.ts;
}
- We are in the waiting state
- Our request message has the least timestamp in lexicographic order
- Every host has sent a reply later than our request
if state = waiting
& forall X. X ~= self ->
(request_ts(X) = 0 | lexord(request_ts(self),self,request_ts(X),X))
& reply_ts(X) > request_ts(self)
{
state := critical;
enter_cs;
}
}
implement exit_cs {
ts := ts.next;
request_ts(self) := 0;
var outgoing : msg_t;
outgoing.sender_id := self;
outgoing.ts := ts;
outgoing.kind := release;
broadcast(outgoing);
state := idle;
}
This function takes two timestamp-host_id pairs and determines whether (X1,Y1) < (X2,Y2) in lexicogrpahic order.
function lexord(X1:timestamp,Y1:host_id,X2:timestamp,Y2:host_id) =
X1 < X2 | X1 = X2 & Y1 < Y2
unicast sends a message to just one process.
To actually send a mesage to a socket, we call the send action
of our socket, giving it the receiving socket's network address
and the message to be sent. Notice we can get the network
address of process with identifier idx with the expression
node(idx).sock.id. This might seem odd, as we asre asking for
the local state of an object in another process. This is allowed
because the network addresses of the sockets are immutable
parameters that are determined at initialization and are
provided to all processes.
action unicast(outgoing:msg_t, dst_id : host_id) = {
sock.send(node(dst_id).sock.id,outgoing);
}
broadcast sends a message to all processes with
identifiers not equal to self. Notice the second while loop is
written in a funny way, that is, pre-incrementing the index
variable. This is because of the funny semantics of integer
sub-range types in Ivy. The result of arithmetic operations on
these types rounds to the nearest value in the type. This means,
for example, that node_max + 1 = node_max. By writing the
loop in this way, we avoid incrementing node_max and winding
up in an infinite loop.
action broadcast(outgoing:msg_t) = {
var idx : host_id := 0;
while(idx < self)
invariant idx <= self
invariant net.sent(X,node(D).sock.id,M) <->
(old net.sent(X,node(D).sock.id,M))| X = sock.id & M = outgoing & D < idx
{
unicast(outgoing, idx);
idx := idx.next;
}
while(idx < node_max)
invariant self <= idx
invariant net.sent(X,node(D).sock.id,M) <->
(old net.sent(X,node(D).sock.id,M))| X = sock.id & M = outgoing & D <= idx & D ~= self
{
idx := idx.next;
unicast(outgoing, idx);
}
}
import action show_incoming(incoming:msg_t)
specification {
var client_state : state_t
axiom node(X).sock.id = node(Y).sock.id -> X = Y
after init {
client_state := idle;
}
before request_cs {
require client_state = idle;
client_state := waiting;
}
before enter_cs {
require client_state = waiting;
require node.client_state(X) ~= critical
proof {
property exists M. (msg_t.kind(M) = request & msg_t.sender_id(M) = X & msg_t.ts(M) = node(X).request_ts(X))
proof {apply msg_t.constr}
};
client_state := critical;
}
before exit_cs {
require client_state = critical;
client_state := idle;
}
The interface state matches the implementaiton state
invariant client_state = state
invariant node.state(X) = critical & node.state(Y) = critical -> X=Y
proof {
property exists M. (msg_t.kind(M) = request & msg_t.sender_id(M) = X & msg_t.ts(M) = node(X).request_ts(X))
proof {apply msg_t.constr}
property exists M. (msg_t.kind(M) = request & msg_t.sender_id(M) = Y & msg_t.ts(M) = node(Y).request_ts(Y))
proof {apply msg_t.constr}
}
invariant net.sent(S,node(Y).sock.id,M) & msg_t.sender_id(M) = self -> ts >= msg_t.ts(M)
invariant net.sent(S,node(Y).sock.id,M) -> S = node(msg_t.sender_id(M)).sock.id
invariant request_ts(self) > 0
& msg_t.ts(M) = request_ts(self) & msg_t.kind(M) = request & msg_t.sender_id(M) = self
& Y ~= self ->
(net.sent(sock.id,node(Y).sock.id,M) | node(Y).request_ts(self) = request_ts(self))
invariant Y ~= self -> reply_ts(Y) <= node(Y).ts
invariant (state=waiting | state=critical) -> request_ts(self) > 0
invariant net.sent(sock.id,node(Y).sock.id,M) & msg_t.kind(M) = release & msg_t.sender_id(M) = self ->
(request_ts(self) = 0 | msg_t.ts(M) < request_ts(self))
invariant net.sent(X,sock.id,M) -> msg_t.sender_id(M) ~= self
invariant ~(node(X).state = waiting & node(Y).state = critical & lexord(node(X).request_ts(X),X,node(Y).request_ts(Y),Y))
proof {
property exists M. (msg_t.kind(M) = request & msg_t.sender_id(M) = X & msg_t.ts(M) = node(X).request_ts(X))
proof {apply msg_t.constr}
}
invariant state = critical & Y ~= self -> node(Y).ts > request_ts(self)
invariant (Y ~= self -> (request_ts(self) > 0 -> node(Y).request_ts(self) <= request_ts(self))) & node(Y).request_ts(self) <= ts
invariant net.sent(S,node(Y).sock.id,M) & msg_t.kind(M) = request & msg_t.sender_id(M) = self & request_ts(self) > 0 -> msg_t.ts(M) <= request_ts(self)
invariant net.sent(S,node(Y).sock.id,M) & msg_t.kind(M) = request -> msg_t.ts(M) > 0
invariant net.sent(S,node(Y).sock.id,M) & msg_t.kind(M) = release & msg_t.sender_id(M) = self -> node(Y).request_ts(self) <= msg_t.ts(M) & (request_ts(self) > 0 -> msg_t.ts(M) < request_ts(self))
If there is a request in the network, the receiver can't be holding a request with a bigger ts.
invariant net.sent(S,node(Y).sock.id,M) & msg_t.kind(M) = request & msg_t.sender_id(M) = self & node(Y).request_ts(self) > 0 -> node(Y).request_ts(self) <= msg_t.ts(M)
invariant net.sent(S,node(Y).sock.id,M) -> node(Y).reply_ts(msg_t.sender_id(M)) <= msg_t.ts(M)
}
}