Tcp serdes

This is an implementation of a generic TCP endpoint. It allows you to create point-to-point, non-duplicating, ordered connections that transfer sequences of messages of a given type. Serialization and deserialization are done automatically using the standard serializer and deserializer. There is currently no option to customize this. The parameters are:

endpoint : the type of UDP endpoints
data  : the type of data streams

The endpoint type should be a struct with fields addr (the IP address) and port. The data type should be arrays of bytes.

module tcp_serdes(endpoint,data) = {
The type of socket descriptors

    type socket
These empty objects are used to hold C++ values.

object rdr = {} # the listener object object cb = {} # struct holding the callbacks object send_queue = {} # queues of outgoing packets

This code goes in the C++ header file, ahead of the ivy object declaration. Here, we put declarations (perhaps forward) of any auxiliary classes we need). We need to be careful that the names of these don't clash with other modules. However, duplicates are removed, so we don't have to worry about multiple instances of this module clashing.

<<< header
include include include include include

    template <typename data>
    class tcp_listener;   // class of threads that listen for connections
    template <typename data>
    class tcp_callbacks;  // class holding callbacks to ivy

    class tcp_queue;


>>>
This code goes in the C++ implementation file. Here, we put implementations of the classes declared in the header, and auxiliary functions.

<<< impl

   // Maximum number of sent packets to queue on a channel. Because TCP also
   // buffers, the total number of untransmitted backets that can back up will be greater
   // than this. This number *must* be at least one to void packet corruption.
define MAX_TCP_SEND_QUEUE 16

   struct tcp_mutex {
ifdef _WIN32
       HANDLE mutex;
       tcp_mutex() { mutex = CreateMutex(NULL,FALSE,NULL); }
       void lock() { WaitForSingleObject(mutex,INFINITE); }
       void unlock() { ReleaseMutex(mutex); }
else
       pthread_mutex_t mutex;
       tcp_mutex() { pthread_mutex_init(&mutex,NULL); }
       void lock() { pthread_mutex_lock(&mutex); }
       void unlock() { pthread_mutex_unlock(&mutex); }
endif
   };

   class tcp_queue {
       std::mutex mtx;
       std::condition_variable cv;
       bool closed;
       bool reported_closed;
       std::list<std::vector<char> > bufs;
    public:
       `endpoint` self; // only acces while holding lock!
       `endpoint` other; // only acces while holding lock!
       tcp_queue(`endpoint` self, `endpoint` other) : closed(false), reported_closed(false), self(self), other(other) {}
       bool enqueue_swap(std::vector<char> &buf) {
           std::unique_lock<std::mutex> lock(mtx);
           // mutex.lock();
           if (closed) {
               // mutex.unlock();
               return true;
           }
           if (bufs.size() < MAX_TCP_SEND_QUEUE) {
               bufs.push_back(std::vector<char>());
               buf.swap(bufs.back());
           }
           // mutex.unlock();
           // std::cout << "PUTTING BUFFER size=" << bufs.size() << std::endl;
           // sem.up();
           cv.notify_all();
           return false;
       }
       bool dequeue_swap(std::vector<char> &buf) {
           std::unique_lock<std::mutex> lock(mtx);
           while(true) {
               // sem.down();
               // mutex.lock();
               if (closed) {
                   if (reported_closed) {
                       // mutex.unlock();
                       cv.wait(lock);
                       continue;
                   }
                   reported_closed = true;
                   // mutex.unlock();
                   // std::cout << "REPORTING CLOSED" << std::endl;
                   return true;
               }
               if (bufs.size() > 0) {
                   // std::cout << "GETTING BUFFER" << closed << std::endl;
                   buf.swap(bufs.front());
                   bufs.erase(bufs.begin());
                   // mutex.unlock();
                   return false;
               }
               cv.wait(lock);
               // mutex.unlock();
            }
       }
       void set_closed(bool report=true) {
           std::unique_lock<std::mutex> lock(mtx);
           // mutex.lock();
           closed = true;
           bufs.clear();
           if (!report)
               reported_closed = true;
           // mutex.unlock();
           // sem.up();
           cv.notify_all();
       }
       void set_open(`endpoint` _other) {
           std::unique_lock<std::mutex> lock(mtx);
           // mutex.lock();
           closed = false;
           reported_closed = false;
           other = _other;
           // mutex.unlock();
           // sem.up();
           cv.notify_all();
       }
       void wait_open(bool closed_val = false){
           std::unique_lock<std::mutex> lock(mtx);
           while (true) {
               // mutex.lock();
               if (closed == closed_val) {
                   // mutex.unlock();
                   return;
               }
               // mutex.unlock();
               // sem.down();
               cv.wait(lock);
            }
       }

   };


    // construct a sockaddr_in for a specified process id using the configuration

    void get_tcp_addr(ivy_class *ivy, `endpoint` addr, sockaddr_in &myaddr) {
        memset((char *)&myaddr, 0, sizeof(myaddr));
        myaddr.sin_family = AF_INET;
        myaddr.sin_addr.s_addr = htonl(addr.addr);
    myaddr.sin_port = htons(addr.port);
    }

    // get a new TCP socket

    int make_tcp_socket() {
        int sock = ::socket(AF_INET, SOCK_STREAM, 0);
        if (sock < 0)
            { std::cerr << "cannot create socket\n"; exit(1); }
        int one = 1;
        if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)) < 0)
            { perror("setsockopt TCP_NODELAY failed"); exit(1); }
        if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0)
            { perror("setsockopt SO_REUSEADDR failed"); exit(1); }
        return sock;
    }


    // This structure holds all the callbacks for the endpoint. These are function objects
    // that are called asynchronously.

    template <typename data>
    struct tcp_callbacks {
        virtual void acb(int,`endpoint`) const = 0;
        virtual void rcb(int,data) const = 0;
        virtual void fcb(int) const = 0;
        virtual void ccb(int) const = 0;
    };

    template<class acb_thunk, class rcb_thunk, class fcb_thunk, class ccb_thunk, typename data>
    struct tcp_callbacks_thunk : tcp_callbacks<data> {
        acb_thunk acbt;
        rcb_thunk rcbt;
        fcb_thunk fcbt;
        ccb_thunk ccbt;
        tcp_callbacks_thunk(const acb_thunk &acb,const rcb_thunk &rcb,const fcb_thunk &fcb,const ccb_thunk &ccb)
        : acbt(acb), rcbt(rcb), fcbt(fcb), ccbt(ccb) {}
        virtual void acb(int sock,`endpoint` ep) const {
            acbt(sock,ep);
        }
        virtual void rcb(int sock, data pkt) const {
            rcbt(sock,pkt);
        }
        virtual void fcb(int sock) const {
            fcbt(sock);
        }
        virtual void ccb(int sock) const {
            ccbt(sock);
        }
    };

    // This is a general class for an asynchronous task. These objects are called in a loop
    // by a thread allocated by the runtime. The fdes method returns a file descriptor
    // associated with the object. If fdes returns a negative value, the thread deletes the
    // object and terminates.

    template <typename data>
    class tcp_task : public reader {
      protected:
        int sock;           // socket associated to this task, or -1 if task complete
        const tcp_callbacks<data> *cb;   // callbacks to ivy
        ivy_class *ivy;     // pointer to main ivy object (mainly to get lock)

      public:

        tcp_task(int sock, const tcp_callbacks<data> *cb, ivy_class *ivy)
          : sock(sock), cb(cb), ivy(ivy) {}

        virtual int fdes() {
            return sock;
        }


    };


    // This task reads messages from a socket and calls the "recv" callback.

    template <typename data>
    class tcp_reader : public tcp_task<data> {
        std::vector<char> buf;
        using tcp_task<data>::sock;
        using tcp_task<data>::cb;
        using tcp_task<data>::ivy;
      public:
        tcp_reader(int sock, const tcp_callbacks<data> *cb, ivy_class *ivy)
            : tcp_task<data>(sock, cb, ivy) {
        }

        // This is called in a loop by the task thread.

        virtual void read() {
//            std::cout << "RECEIVING\n";

            data pkt;                      // holds received message
            ivy_socket_deser ds(sock,buf);  // initializer deserialize with any leftover bytes
            buf.clear();                    // clear the leftover bytes

            try {
                __deser(ds,pkt);            // read the message
            }

            // If packet has bad syntax, we drop it, close the socket, call the "failed"
            // callback and terminate the task.

            catch (deser_err &){
                if (ds.pos > 0)
                    std::cout << "BAD PACKET RECEIVED\n";
                else
                    std::cout << "EOF ON SOCKET\n";
                cb->fcb(sock);
                close(sock);
                sock = -1;
                return;
            }

            // copy the leftover bytes to buf

            buf.resize(ds.inp.size()-ds.pos);
            std::copy(ds.inp.begin()+ds.pos,ds.inp.end(),buf.begin());

            // call the "recv" callback with the received message

            ivy->__lock();
            cb->rcb(sock,pkt);
            ivy->__unlock();
        }
    };


    // This class writes queued bytes to a socket. Packets can be added
    // asynchronously to the tail of the queue. If the socket is closed,
    // the queue will be emptied asynchrnonously. When the queue is empty the writer deletes
    // the queue and exits.

    // invariant: if socket is closed, queue is closed

    template <typename data>
    class tcp_writer : public tcp_task<data> {
        tcp_queue *queue;
        bool connected;
        using tcp_task<data>::sock;
        using tcp_task<data>::cb;
        using tcp_task<data>::ivy;
      public:
        tcp_writer(int sock, tcp_queue *queue, const tcp_callbacks<data> *cb, ivy_class *ivy)
            : tcp_task<data>(sock,cb,ivy), queue(queue), connected(false) {
        }

        virtual int fdes() {
            return sock;
        }

        // This is called in a loop by the task thread.

        virtual void read() {

            if (!connected) {

                // if the socket is not connected, wait for the queue to be open,
                // then connect

                queue->wait_open();
                connect();
                return;
            }

            // dequeue a packet to send

            std::vector<char> buf;
            bool qclosed = queue->dequeue_swap(buf);

            // if queue has been closed asynchrononously, close the socket.

            if (qclosed) {
                std::cout << "CLOSING " << sock << std::endl;
                ::close(sock);
                connected = false;
                return;
            }

            // try a blocking send
ifdef MSG_NOSIGNAL
            int bytes = send(sock,&buf[0],buf.size(),MSG_NOSIGNAL);
else
            int bytes = send(sock,&buf[0],buf.size(),0);
endif
            // std::cout << "SENT\n";

            // if not all bytes sent, channel has failed, close the queue

            if (bytes < (int)buf.size())
                fail_close();
        }

        void connect() {

            // Get the address of the other from the configuration

            // std::cout << "ENTERING CONNECT " << sock << std::endl;

            ivy -> __lock();               // can be asynchronous, so must lock ivy!
            struct sockaddr_in myaddr;
            `endpoint` other = queue->other;
            get_tcp_addr(ivy,other,myaddr);
            ivy -> __unlock();

            // Call connect to make connection

            // std::cout << "CONNECTING sock=" << sock << "other=" << other << std::endl;

            int res = ::connect(sock,(sockaddr *)&myaddr,sizeof(myaddr));

            // If successful, call the "connected" callback, else "failed"

            ivy->__lock();
            if (res >= 0) {
                // std::cout << "CONNECT SUCCEEDED " << sock << std::endl;
ifdef MSG_NOSIGNAL
                int bytes = send(sock,&queue->self.port,sizeof(queue->self.port),MSG_NOSIGNAL);
else
                int bytes = send(sock,&queue->self.port,sizeof(queue->self.port),0);
endif
                cb->ccb(sock);
                connected = true;
            }
            else {
                // std::cout << "CONNECT FAILED " << sock << std::endl;
                // perror("connect failed");
                ivy->__unlock();
                ::sleep(1);
                close(sock);
                sock = make_tcp_socket();
                return;
                // fail_close();
            }
            ivy->__unlock();

        }

        void fail_close() {
            queue -> set_closed(false);  // close queue synchronously

            // make sure socket is closed before fail callback, since this
            // might open another socket, and we don't want to build up
            // zombie sockets.

            // std::cout << "CLOSING ON FAILURE " << sock << std::endl;
            ::close(sock);
            cb->fcb(sock);
            connected = false;
        }

    };

    // This task listens for connections on a socket in the background.

    template <typename data>
    class tcp_listener : public tcp_task<data> {
        using tcp_task<data>::sock;
        using tcp_task<data>::cb;
        using tcp_task<data>::ivy;
      public:

        virtual bool background() {
            return true;
        }

        // The constructor creates a socket to listen on.

        tcp_listener(const tcp_callbacks<data> *cb, ivy_class *ivy)
            : tcp_task<data>(0,cb,ivy) {
            sock = make_tcp_socket();
        }

        // The bind method is called by the runtime once, after initialization.
        // This allows us to query the configuration for our address and bind the socket.

        virtual void bind() {
        }

        // After binding, the thread calls read in a loop. In this case, we don't read,
        // we try accepting a connection. BUG: We should first call select to wait for a connection
        // to be available, then call accept while holding the ivy lock. This is needed to
        // guarantee the "accepted" appears to occur before "connected" which is required by
        // the the tcp interface specification.

        virtual void read() {
            // Call accept to get an incoming connection request. May block.
            sockaddr_in other_addr;
            socklen_t addrlen = sizeof(other_addr);
            int new_sock = accept(sock, (sockaddr *)&other_addr, &addrlen);

            // If this fails, something is very wrong: fail stop.
            if (new_sock < 0)
                { perror("accept failed"); exit(1); }

            `endpoint` other;
        other.addr = ntohl(other_addr.sin_addr.s_addr);
        other.port = ntohs(other_addr.sin_port);

            int bytes = ::read(new_sock,&other.port,sizeof(other.port));

            // Run the "accept" callback. Since it's async, we must lock.
            ivy->__lock();
            cb->acb(new_sock,other);
            ivy->__unlock();

            // Install a reader task to read messages from the new socket.
            ivy->install_reader(new tcp_reader<data>(new_sock,cb,ivy));
        }
    };



>>>
Here we put any new members of the ivy C++ class. If we have allocated a per-instance object, we declared it here anti-quoted. The plugs in the actual member name, which may be any array if this is a parameterized instance.

<<< member

hash_space::hash_map<int,void *> <code>rdr</code>;
tcp_callbacks *<code>cb</code>;             // the callbacks to ivy
hash_space::hash_map<int,tcp_queue *> <code>send_queue</code>;   // queues of blocked packets, per socket

type rdr_map_t
interpret rdr_map_t -> <<< primitive hash_space::hash_map<int,void *> >>>
type callback_t
interpret callback_t -> <<< primitive tcp_callbacks<`data`> * >>>
type queue_t
interpret queue_t -> <<< primitive hash_space::hash_map<int,tcp_queue *> >>>

var rdr : rdr_map_t
var cb : callback_t
var send_queue : queue_t
Here, we put code to go in the initializer. If this is a parameterized instance, then this code will be run in a loop, so we have to be careful that any initialization of common objects is idempotent.

<<< init


    // Create the callbacks. In a parameterized instance, this creates
    // one set of callbacks for each endpoint id. When you put an
    // action in anti-quotes it creates a function object (a "thunk")
    // that captures the instance environment, in this case including
    // the instance's endpoint id "me".

    `cb` = new tcp_callbacks_thunk<%`handle_accept`,%`handle_recv`,%`handle_fail`,%`handle_connected`,`data`>
                  (`handle_accept`,`handle_recv`,`handle_fail`,`handle_connected`);

>>>
These actions are handlers for the callbacks. They just insert the endpoint's id and call the corresponding callback action.

    action handle_accept(s:socket, other:endpoint) = {
        call accept(s,other)
    }

    action handle_recv(s:socket,x:data) = {
        call recv(s,x)
    }

    action handle_fail(s:socket) = {
        call failed(s)
    }

    action handle_connected(s:socket) = {
        call connected(s)
    }
These are the implementations of the interface calls. These operations are synchronous.

open creates a socket, binds it to the requested endpoint, and starts listening on it.

    action open(addr:endpoint) returns (s:socket) = {
        <<< impure
            tcp_listener<`data`> *r = new tcp_listener<`data`>(`cb`,this);
            s = r->fdes();
            // std::cout << "SOCKET " << s << std::endl;
            struct sockaddr_in myaddr;
            myaddr.sin_family = AF_INET;
            myaddr.sin_addr.s_addr = htonl(addr.addr);
            // myaddr.sin_addr.s_addr = htonl(INADDR_ANY);
            myaddr.sin_port = htons(addr.port);
            // std::cout << "binding id: " << " addr: " << ntohl(myaddr.sin_addr.s_addr) << " port: " << ntohs(myaddr.sin_port) << std::endl;
            if (::bind(s, (struct sockaddr *)&myaddr, sizeof(myaddr)) < 0)
                { perror("bind failed"); exit(1); }
            // Start lisetning on the socket
            if (listen(s,2) < 0)
                { std::cerr << "cannot listen on socket\n"; exit(1); }
ifdef SO_NOSIGPIPE
            {
                int opt = 1;

                if (setsockopt(s, SOL_SOCKET, SO_NOSIGPIPE, &opt, sizeof(opt)) == -1) {
                    perror("setsockopt");
                    return -1;
                }
            }
endif
            `rdr`[s] = r;
            install_reader(r);
        >>>
    }
close the socket

    action close(s:socket) = {
        <<< impure

            // We don't want to close a socket when there is another thread
            // waiting, because the other thread won't know what to do with the
            // error.

            // Instead we shut down the socket and let the other thread close it.
            // If there is a reader thread, it will see EOF and close the socket. If there is
            // on open writer thread, it will close the socket after we close the
            // send queue. If the queue is already closed, closing it has no effect.

            // invariant: if a socket is open there is a reader thread or
            // an open writer thread, but not both.

            // Because of this invariant, the socket will be closed exactly once.

            ::shutdown(s,SHUT_RDWR);

            if (`send_queue`.find(s) != `send_queue`.end())
                `send_queue`[s] -> set_closed();

        >>>
    }
connect creates a socket and then installs a connector task to establish the connection asynchronously.

    action connect(self:endpoint, other:endpoint) returns (s:socket) = {
        <<< impure
            s = make_tcp_socket();
            // std::cout << "SOCKET " << s << std::endl;

            // create a send queue for this socket, if needed, along with
            // its thread. if the queue exists, it must be closed, so
            // we open it.

            tcp_queue *queue;
            if (`send_queue`.find(s) == `send_queue`.end()) {
                `send_queue`[s] = queue = new tcp_queue(self,other);
                 install_thread(new tcp_writer<`data`>(s,queue,`cb`,this));
            } else
                `send_queue`[s] -> set_open(other);
        >>>
    }
send serializes the message and calls unix send to send it in the given socket. It returns true if sending was successful. Not, a "successfully" sent packet could still be lost to due congestion. If we want to maintain ordered semantics, we will need to have an additional return code indicating the packet could not be queued (analogous to EAGAIN).

    action send(s:socket,p:data) returns (ok:bool) = {
        <<< impure
                ivy_binary_ser sr;
                __ser(sr,p);

                // if the send queue for this sock doesn's exist, it isn't open,
                // so the client has violated the precondition. we do the bad client
                // the service of not crashing.

                if (`send_queue`.find(s) == `send_queue`.end()) {
                    // std::cout << "CANNOT FIND QUEUE: " << s << "\n";
                    ok = true;
                }
                else {
                    // get the send queue, and enqueue the packet, returning false if
                    // the queue is closed.

                    // std::cout << "SENDING\n";
                    ok = !`send_queue`[s]->enqueue_swap(sr.res);
               }
        >>>
    }

    action accept(s:socket,other:endpoint)

    action recv(s:socket,p:data)

    action failed(s:socket)

    action connected(s:socket)

    trusted isolate iso = this

    attribute test = impl
}