Lamport mutex

This is an implememtation of Lamport's distributed mutual excluson (DME) algorithm.

We start by including libraries. In this case we need two:

  • The network library for network communication
  • The timout library to implement a periodic timer
  • The numbers library to get some basic ordered datatypes

include network
include timeout
include order
Next comes the global section. This contaions the declarations of any resources that are used in common by all processes. These usually include:

  • Data types
  • Services, such as network services
  • Immutable global parameters, such as netwrok addresses

We can't have mutable global variables, since processes, being distributed, don't have shared memory.

global {
Our first global data type is the type of host identifiers. We will have one process for each value of this type. Host identifiers take on integer values from 0 to node_max, where node_max is implicitly defined here as global parameter whose value can be selected a run time.

    instance host_id : iterable
    alias node_max = host_id.max
object host_id = { type this = {0..2} action next(x:host_id) returns (x:host_id) = {x := x + 1;} } alias node_max = 2

Since we have three kinds of messages in our protocol, we define an enumerated type for the message kind with three symbolic values.

    type msg_kind = {request,reply,release}
In addition, we use a sequence type to represent timestamps. The unbounded_sequence template in the order library gives a discrete totally ordered type with a least value 0 and a next operator.

    instance timestamp : unbounded_sequence
Our messages are stucts with three fields: the message kind and the host identifier of the sender and a timestamp. We order messages according to the timestamp. This ordering is useful in the proof of correctness.

    object msg_t = {
        type this = struct {
            kind : msg_kind,
            sender_id : host_id,
            ts : timestamp
        }
        definition (M1:msg_t < M2:msg_t) = ts(M1) < ts(M2)
    }
Finally we instantiate a network service via which our processes will communicate. Here, tcp.net is a template defined in the network library that we included above. The template takes one parameter, which is the type of messages to be sent. Our instance of this template is an object called net.

    instance net : tcp_test.net(msg_t)
}
After the global section, we introduce some distribtued processes. A process with parameters has one instance for each value of the parameters. In this case we have one parameter of type host_id which means there is one process in the system for each value of host_id in the range 0..node_max. The parameter is named self. This means that the process can refer to its own host identifier by the name self.

process node(self:host_id) = {
A process usually begins by declaring an interface. This consists of a set of actions that are either calls in from the environment (exports) or calls out to the environment (imports).

Our action is an export request_cs, which our client uses to request to enter the critical section. It takes no parameters.

    export action request_cs
Our second action is an import enter_cs. This is a callback to the client indicating that is is safe to enter the critical section.

    import action enter_cs
Our third action is an export exit_cs. This is called by the client when exiting the critical section, indicating it is safe to another process to enter.

    export action exit_cs
Next we declare per-process objects. Each process needs a socket on network net in order to communicate. We declare the socket here. The socket sock is an instance of the template socket declared by the network service net.

    instance sock : net.socket
We also declare some local (per-process) types and variables.

    type state_t = {idle,waiting,critical}
    var state : state_t
We also keep track of the current timestamp

    var ts : timestamp
Each process maintains a 'request queue', which a map from host_ids to the timestamp of the current request from that host, or 0 if none.

    var request_ts(X:host_id) : timestamp
This map records the highest timestamp of a reply received from each host.

    var reply_ts(X:host_id) : timestamp
Having declared our variables, we initialize them. Code in an after init section runs on initialization of the process. You aren't allowed to do much here, just assign values to local variables.

    after init {
        state := idle;
        ts := 0;
        request_ts(X) := 0;
        reply_ts(X) := 0;
    }
Now we come to the implementation code. Here we implement our exported actions, if any, and also any callback actions from the services we use (i.e., actions that these services import from us).

We start with the request_cs action. This builds a request message, appends it to the request queue, and broadcasts it. The action broadcast is a local action (i.e., a subroutine) and is defined later.

    implement request_cs {
        ts := ts.next;
        var outgoing : msg_t;
        outgoing.kind := request;
        outgoing.sender_id := self;
        outgoing.ts := ts;
        broadcast(outgoing);
        request_ts(self) := ts;
        state := waiting;
    }
Next we implement the callback recv from our network socket, indicating we have an incoming message. This is called sock.recv. It gives us as input parameters the network address of the sending socket (not useful here) and the incoming message.

    implement sock.recv(src:tcp.endpoint,incoming:msg_t) {
show_incoming(incoming);

First, we update out timestamp to reflect the incoming message.

        ts := timestamp.max(incoming.ts,ts).next;
We partly construct an outgoing message

        var outgoing : msg_t;
        outgoing.sender_id := self;
        outgoing.ts := ts;
What we do here depends on the kind of message.

When we receive a request message, we put it on our request queue, and return a reply message to the sender.

        if incoming.kind = request {
            outgoing.kind := reply;
            request_ts(incoming.sender_id) := incoming.ts;
            unicast(outgoing,incoming.sender_id);
        }
When we receive a release message, the sender's request must be at the head of our queue. We dequeue it.

        else if incoming.kind = release {
            request_ts(incoming.sender_id) := 0;

        }
On a reply, we update the highest timestamp received from this sender. Because of in-order devlivery, the timestamps are received in increasing order, so the incoming one must be the greatest so far.

        else if incoming.kind = reply {
            reply_ts(incoming.sender_id) := incoming.ts;
        }
Having proceesed the incoming message, we might now be able to enter our critical section. We do this if:

  • We are in the waiting state
  • Our request message has the least timestamp in lexicographic order
  • Every host has sent a reply later than our request

        if state = waiting
        & forall X. X ~= self ->
                      (request_ts(X) = 0 | lexord(request_ts(self),self,request_ts(X),X))
                         & reply_ts(X) > request_ts(self)
        {
            state := critical;
            enter_cs;
        }
    }

    implement exit_cs {
        ts := ts.next;
        request_ts(self) := 0;
        var outgoing : msg_t;
        outgoing.sender_id := self;
        outgoing.ts := ts;
        outgoing.kind := release;
        broadcast(outgoing);
        state := idle;
    }
At the end, we have definitions of internal (non-interface) actions (in other words, subroutines) and functions (i.e., pure functions).

This function takes two timestamp-host_id pairs and determines whether (X1,Y1) < (X2,Y2) in lexicogrpahic order.

    function lexord(X1:timestamp,Y1:host_id,X2:timestamp,Y2:host_id) =
        X1 < X2 | X1 = X2 & Y1 < Y2
The action unicast sends a message to just one process. To actually send a mesage to a socket, we call the send action of our socket, giving it the receiving socket's network address and the message to be sent. Notice we can get the network address of process with identifier idx with the expression node(idx).sock.id. This might seem odd, as we asre asking for the local state of an object in another process. This is allowed because the network addresses of the sockets are immutable parameters that are determined at initialization and are provided to all processes.

    action unicast(outgoing:msg_t, dst_id : host_id) = {
       sock.send(node(dst_id).sock.id,outgoing);
    }
Action broadcast sends a message to all processes with identifiers not equal to self. Notice the second while loop is written in a funny way, that is, pre-incrementing the index variable. This is because of the funny semantics of integer sub-range types in Ivy. The result of arithmetic operations on these types rounds to the nearest value in the type. This means, for example, that node_max + 1 = node_max. By writing the loop in this way, we avoid incrementing node_max and winding up in an infinite loop.

    action broadcast(outgoing:msg_t) = {
        var idx : host_id := 0;
        while(idx < self)
        invariant idx <= self
        invariant net.sent(X,node(D).sock.id,M) <->
            (old net.sent(X,node(D).sock.id,M))| X = sock.id & M = outgoing & D < idx
        {
            unicast(outgoing, idx);
            idx := idx.next;
        }
        while(idx < node_max)
        invariant self <= idx
        invariant net.sent(X,node(D).sock.id,M) <->
            (old net.sent(X,node(D).sock.id,M))| X = sock.id & M = outgoing & D <= idx & D ~= self
        {
            idx := idx.next;
            unicast(outgoing, idx);
        }
    }

    import action show_incoming(incoming:msg_t)

    specification {

        var client_state : state_t

        axiom node(X).sock.id = node(Y).sock.id -> X = Y

        after init {
            client_state := idle;
        }

        before request_cs {
            require client_state = idle;
            client_state := waiting;
        }

        before enter_cs {
            require client_state = waiting;
            require node.client_state(X) ~= critical
            proof {
                property exists M. (msg_t.kind(M) = request & msg_t.sender_id(M) = X & msg_t.ts(M) = node(X).request_ts(X))
                proof {apply msg_t.constr}
            };

            client_state := critical;
        }

        before exit_cs {
            require client_state = critical;
            client_state := idle;
        }
Auxiliar invariants to help in the proof

The interface state matches the implementaiton state

        invariant client_state = state
No two processes are in critical state
        invariant node.state(X) = critical & node.state(Y) = critical -> X=Y
        proof {
            property exists M. (msg_t.kind(M) = request & msg_t.sender_id(M) = X & msg_t.ts(M) = node(X).request_ts(X))
            proof {apply msg_t.constr}
            property exists M. (msg_t.kind(M) = request & msg_t.sender_id(M) = Y & msg_t.ts(M) = node(Y).request_ts(Y))
            proof {apply msg_t.constr}
        }
Our current timestamp is >= the timestamp of all messages we have sent.
        invariant net.sent(S,node(Y).sock.id,M) & msg_t.sender_id(M) = self -> ts >= msg_t.ts(M)
All messages in the channel from us to Y have us as sender

        invariant net.sent(S,node(Y).sock.id,M) -> S = node(msg_t.sender_id(M)).sock.id
If our request is in our quque, then for all other processes Y, either the request is on the way to Y, or Y has stored the request. Note this is written in a funny way to avoid function cycles. That is, we say "for all messages M such that M is our request message...". This avoids using a constructor function for messages that would create a cycle.
        invariant request_ts(self) > 0
            & msg_t.ts(M) = request_ts(self) & msg_t.kind(M) = request & msg_t.sender_id(M) = self
            & Y ~= self ->
                (net.sent(sock.id,node(Y).sock.id,M) | node(Y).request_ts(self) = request_ts(self))
The latest timestamp we have received from process Y is <= the current timestamp of Y.

        invariant Y ~= self -> reply_ts(Y) <= node(Y).ts
If we are in the waiting or critical state, we have a request in our own queue.

        invariant (state=waiting | state=critical) -> request_ts(self) > 0
A release message indicates either the sender has no pending request, or the pending request has a timestamp greater than the message.

        invariant net.sent(sock.id,node(Y).sock.id,M) & msg_t.kind(M) = release & msg_t.sender_id(M) = self ->
        (request_ts(self) = 0 | msg_t.ts(M) < request_ts(self))
We don't send any message to ourself.

        invariant net.sent(X,sock.id,M) -> msg_t.sender_id(M) ~= self
We can't be critical if another process with a lower ticket is waiting.

        invariant ~(node(X).state = waiting & node(Y).state = critical & lexord(node(X).request_ts(X),X,node(Y).request_ts(Y),Y))
        proof {
            property exists M. (msg_t.kind(M) = request & msg_t.sender_id(M) = X & msg_t.ts(M) = node(X).request_ts(X))
            proof {apply msg_t.constr}
        }
If we are critical, everone else has a higher ticket number than our request

        invariant state = critical & Y ~= self -> node(Y).ts > request_ts(self)
Our request at Y can be bigger than our current request or our current ts.

        invariant (Y ~= self -> (request_ts(self) > 0 -> node(Y).request_ts(self) <= request_ts(self))) & node(Y).request_ts(self) <= ts
Our request ts in the network must be <= than our current request, if any

        invariant net.sent(S,node(Y).sock.id,M) & msg_t.kind(M) = request & msg_t.sender_id(M) = self & request_ts(self) > 0 -> msg_t.ts(M) <= request_ts(self)
Request messages can't have a ts of 0

        invariant net.sent(S,node(Y).sock.id,M) & msg_t.kind(M) = request -> msg_t.ts(M) > 0
A release massage means there is no current request <= ts

        invariant net.sent(S,node(Y).sock.id,M) & msg_t.kind(M) = release & msg_t.sender_id(M) = self -> node(Y).request_ts(self) <= msg_t.ts(M) & (request_ts(self) > 0 -> msg_t.ts(M) < request_ts(self))
If we are critical, everyone else is holding our request

If there is a request in the network, the receiver can't be holding a request with a bigger ts.

        invariant net.sent(S,node(Y).sock.id,M) & msg_t.kind(M) = request & msg_t.sender_id(M) = self & node(Y).request_ts(self) > 0 -> node(Y).request_ts(self) <= msg_t.ts(M)
The reply_ts can't be larger than any messages in the network from the same source

        invariant net.sent(S,node(Y).sock.id,M) -> node(Y).reply_ts(msg_t.sender_id(M)) <= msg_t.ts(M)
    }
}