Skip to content

Simple key-value store¤

When designing a distributed system, we typically have to confront the problem of consistency in some form. That is, when processing a collection of transactions concurrently, we need to define in what sense the view of these transactions seen by various concurrent processes is consistent.

To illustrate this idea, we will use a consistency condition called linearizability (sometimes also called "strong consistency"). In a linearizable system, each transaction has a duration in time, marked by a begin event and an end event. The begin event corresponds to a "call" and passes the input parameters of the transaction. The end event corresponds to a "return" and passes output parameters. An execution of the system is linearizable if it is consistent with the hypothesis that each transaction executed instantaneously at some time between its begin and end events. This moment is called the commit point of the transaction.

To illustrate how we specify and verify a linearizable concurrent system, we will use a simple server that acts as a key/value store for multiple clients. The transactions implemented by the store are read(K) and write(K,V), where K is a key and V is a value.

Our first key/value store server will be a simple sequential process that receives transaction requests and sends responses over a network. This will allow us to see in a simple setting how a linearizable service is specified.

Basic data types¤

We begin by declaring the data types needed to represent transactions and the messages that carry them.

The key and value types are uninterpreted for the moment.

type key
type value
We also introduce a type txid as a transaction identifier. For transaction identifiers, we use a ghost type. This is a value that appears in the system when we specify and verify it, but not in production (where it acts like a void type). Non-ghost values can never depend on ghost values.

ghost type txid
As mentioned above, each transaction is either a read or a write. We use an enumerated type to represent this.

type request_kind = {write,read}
This struct represents the body of a request message. It contains the request kind, the key, and the value (if any). In a read request, the value field is unused.

object request_body = {
    type t = struct {
    knd : request_kind,
    ky : key,
    vl : value
    }
}
A request message encapsulates the request body with a transaction identifier. We could include other information, such as source or destination if needed.

object request = {
    type t = struct {
    tx : txid,
    bd : request_body.t
    }
}
This struct represents the body of a response message. It contains just a value, which represents the return value of a read transaction and is unused for write transactions.

object response_body = {
    type t = struct {
    vl : value
    }
}
A response message encapsulates the response body with a transaction identifier.

object response = {
    type t = struct {
    tx : txid,
    bd : response_body.t
    }
}
A replica is a copy of the store. This module defines the state of the store (a map from keys to values). Initially, all keys map to the default value 0. The module also defines the effect of executing a transaction on the store using an action exec. This action takes a request, updating the store if it is a write and returning the value if it is a read.

module replica = {
    function store(K:key) : value
    after init {
    store(K) := 0
    }
    action exec(inp : request_body.t) returns (out:response_body.t) = {
    if inp.knd = write {
        store(inp.ky) := inp.vl;
    }
    else if inp.knd = read {
        out.vl := store(inp.ky);
    }
    }
}
Now we are getting ready to specify a linearizable service. In anticipation of the possiblity of distributed servers, we will here define a type of server id called dest. Since we have only one server for now, this is a singleton set.

object dest = {
    ghost type t = {srvr}
}
To specify our service, we use a reference object. This object provides three methods: create, commit and eval. The first corresponds to a begin event, the second to a commit point and the third to an end event. This object is also "ghost" in the sense that it will be abstracted away in production.

The create action takes a request and returns a unique transaction id. At this point, the transaction has begun. The commit action takes a transaction id and indicates the commit point of that transaction. At the commit point, the reference object actually executes the transaction on its replica and stores the result. The eval action takes a transaction id of a commited transaction and returns the correct result for the transaction.

object ref = {
    action create(inp : request_body.t, dst:dest.t) returns (tx : txid)
    action serialize(tx : txid)
    action commit(tx : txid,dst:dest.t)
    action eval(tx : txid) returns (res : response_body.t)
The reference object keeps track of state information needed to implement create, commit and eval. First, we have a replica of the store, calle rep. To generate unique transaction ids, we use a counter, whose value is next. The map txs stores the request corresonding to each transaction id, txres stores the result, and committed kees track of the set of committed transactions. Initially, there are no committed transactions.

    instance rep : replica

    individual next : txid
    function txs(X:txid) : request_body.t
    function txres(X:txid) : response_body.t
    relation committed(X:txid)

    after init {
    next := 0;
    committed(X) := false;
    }
The create action simply generates a new transaction id and stores the request.

    implement create {
    tx := next;
    txs(tx) := inp;
    next := next + 1;
    }
The commit action asserts that the transaction id must be for an existing uncommitted transaction. Then it executes the transaction on the replica, storing the result. Finally, it marks the transaction committed. The declaration delegate commit tells ivy that the assertions are guarantees for the caller.

    implement commit {
    assert 0 <= tx & tx < next;
    assert ~committed(tx);
    txres(tx) := rep.exec(txs(tx));
    committed(tx) := true;
    }
    delegate commit
The eval action asserts that the specified transaction must already be committed, then returns the stored result.

    implement eval {
    assert committed(tx);
    res := txres(tx);
    }
    delegate eval
Finally, we interpret the transaction id type as int in order to get in infinite sequence of transaction ids.

    interpret txid -> int
}
The reference object will allow us to specify the client interface of our service and later also its internal interfaces. We can use these interface specifications to verify our server, and also to verify any other system that might be layered on top of our service, in a compositional manner.

The implementation¤

include udp
To make things, simple, our system will consist of some generic client endpoints and a server process, connected by unordered, non-duplicating channels. We define the client set in a parameterized way using an uninterpreted type as the client id.

type client_id
To allow us to route respones to the correct client, we encapsulate the request messages with a client id.

type req_msg = struct {
    cid : client_id,
    req : request.t
}
Our client endpoint is a simple module with two actions: client_request and client_response. The user calls client_request to send a request and implements client_response to get the response as a callback. The parameter dst indicates which server process to use, but in this case there is only one choice.

module client(cid,srvr_chan,cl_chans) = {

    action client_request(req : request_body.t, dst: dest.t)
    action client_response(req : response_body.t, tx : txid)
To generate a request, we build a request message and send it to the server.

    implement client_request {
    local m : req_msg {
        m.cid := cid;
        m.req.tx := ref.create(req,dst);
        m.req.bd := req;
        call srvr_chan.send(m);
    }
    }
To handle a response from the server, we simply pass it to the client_response callback. The transaction id parameter is "ghost" and is only used for specification.

    implement cl_chans.recv(resp : response.t) {
    call client_response(resp.bd,resp.tx)
    }

}
Now we get to the implementation of the server. The server contains a repilica of the store and a network endpoint to receive requests. The port parameter tells us what port to listen on.

module server_node(port, cl_chans) = {

    instance rep : replica

    instance req_chan : nondup_endpoint(port,req_msg)
When we receive a request message on our port, we first commit the transaction id, then build a response message by executing the trasnaction on the replica, then send the response to the client. In this way, transactions are trivially always committed in the order in which they execute on the server, which gives us correct semantics. We'll see later that things get a little more complicated with a distributed server. The call to commit is a no-op on the production server, since the reference object is abstracted away. It is only used as a witness for linearizablility during verification.

    implement req_chan.recv(inp : req_msg) {
    call ref.commit(inp.req.tx,dest.srvr);
    var rr := inp.req.bd;
    var res : response.t;
    res.tx := inp.req.tx;
    res.bd := rep.exec(rr);
    call cl_chans(inp.cid).send(res)
    }
}
The plumbing


Now we a server and an array of client endpoints, connecting them up with network channels. We use a parameterized array of endpoints for the clients called cl_chans. The endpoints will use a range of port numbers beginning with 441000.

instance cl_chans : nondup_endpoint_set(client_id,44100,response.t)
We create a corresponding array of clients.

instance cl(X:client_id) : client(X,srvr.req_chan,cl_chans)
We create a single server.

instance srvr : server_node(44200,cl_chans)
The interface specifications


For now we need only specify the client API. The service specification says that we get at most one response for each transaction and whenever we get a response, the transaction has been committed and gives the correct value according to the reference model. This specifcation allows us to verify the server in isolation, and then seprately verify any clients using the API, using only the reference model. Notice that the role of a specification is not to give a "golden model" of correctness but to provide a contract that allows us to verify components in isolation.

object service_spec = {

    relation responded(X:txid)

    after init {
    responded(X) := false;
    }

    before cl_chans.send(p : client_id, m : response.t) {
    assert ~responded(m.tx);
    assert m.bd = ref.eval(m.tx);
    responded(m.tx) := true;
    }
}
We export/import our API.

export cl.client_request
import cl.client_response
Verification


Finally, we create an isolate the allows us to verify our server implementation in isolation. We verify only the guarentees of the server, using the specifications of the network channels. Thus, the test generator will play the role of the environment and the network.

trusted isolate iso_srvr = srvr with cl,cl_chans,ref,service_spec
To test, we have to interpret all of the uninterpreted types. We use strings for the keys, 16-bit unsigned integers for the values and one-bit integers for the client ids (meaning we have exactly two clients).

The theory strbv[1] encodes strings with 1-bit vectors. This means we can have at most 2 distinct keys in a test run. This limitation is a good thing since it means we have a good chance to have multiple transactions on the same key. Otherwise, we would be unlikely to detect any data errors.

interpret key -> strbv[1]
interpret value -> bv[16]
interpret client_id -> bv[1]
The source file for this example is here. Try running this example using commands like this:

$ ivy_to_cpp target=test isolate=iso_srvr build=true repstore1.ivy
$ ./repstore1 iters=100 runs=10 out=file.iev

Look at the trace file file.iev to see the test generator, server and reference object interact. Try putting some errors in this example and see if they produce assertions failure. If not, why not?