Tcp host

This is an implementation of a generic TCP endpoint. It allows you to create point-to-point, non-duplicating, ordered connections that transfer sequences of messages. The parameters are:

endpoint : the type of UDP endpoints
data  : the type of data streams

The endpoint type should be a struct with fields addr (the IP address) and port. The data type should be arrays of bytes.

module tcp_host(endpoint,data) = {
The type of socket descriptors

    type socket
These empty objects are used to hold C++ values.

    object rdr = {}         # the listener object
    object cb = {}          # struct holding the callbacks
    object send_queue = {}  # queues of outgoing packets
This code goes in the C++ header file, ahead of the ivy object declaration. Here, we put declarations (perhaps forward) of any auxiliary classes we need). We need to be careful that the names of these don't clash with other modules. However, duplicates are removed, so we don't have to worry about multiple instances of this module clashing.

<<< header
include include include

    class tcp_listener;   // class of threads that listen for connections
    class tcp_callbacks;  // class holding callbacks to ivy

    class tcp_queue;


>>>
This code goes in the C++ implementation file. Here, we put implementations of the classes declared in the header, and auxiliary functions.

<<< impl

   // Maximum number of sent packets to queue on a channel. Because TCP also
   // buffers, the total number of untransmitted backets that can back up will be greater
   // than this. This number *must* be at least one to void packet corruption.
define MAX_TCP_SEND_QUEUE 16

   struct tcp_mutex {
ifdef _WIN32
       HANDLE mutex;
       tcp_mutex() { mutex = CreateMutex(NULL,FALSE,NULL); }
       void lock() { WaitForSingleObject(mutex,INFINITE); }
       void unlock() { ReleaseMutex(mutex); }
else
       pthread_mutex_t mutex;
       tcp_mutex() { pthread_mutex_init(&mutex,NULL); }
       void lock() { pthread_mutex_lock(&mutex); }
       void unlock() { pthread_mutex_unlock(&mutex); }
endif
   };

   struct tcp_sem {
       sem_t sem;
       tcp_sem() { sem_init(&sem,0,0); }
       void up() {sem_post(&sem); }
       void down() {sem_wait(&sem);}
   };

   class tcp_queue {
       tcp_mutex mutex;
       tcp_sem sem;
       bool closed;
       bool reported_closed;
       std::list<std::vector<char> > bufs;
    public:
       `endpoint` other; // only acces while holding lock!
       tcp_queue(`endpoint` other) : closed(false), reported_closed(false), other(other) {}
       bool enqueue_swap(std::vector<char> &buf) {
           mutex.lock();
           if (closed) {
               mutex.unlock();
               return true;
           }
           if (bufs.size() < MAX_TCP_SEND_QUEUE) {
               bufs.push_back(std::vector<char>());
               buf.swap(bufs.back());
           }
           mutex.unlock();
           sem.up();
           return false;
       }
       bool dequeue_swap(std::vector<char> &buf) {
           while(true) {
               sem.down();
               // std::cerr << "DEQUEUEING" << closed << std::endl;
               mutex.lock();
               if (closed) {
                   if (reported_closed) {
                       mutex.unlock();
                       continue;
                   }
                   reported_closed = true;
                   mutex.unlock();
                   // std::cerr << "REPORTING CLOSED" << std::endl;
                   return true;
               }
               if (bufs.size() > 0) {
                   buf.swap(bufs.front());
                   bufs.erase(bufs.begin());
                   mutex.unlock();
                   return false;
               }
               mutex.unlock();
            }
       }
       void set_closed(bool report=true) {
           mutex.lock();
           closed = true;
           bufs.clear();
           if (!report)
               reported_closed = true;
           mutex.unlock();
           sem.up();
       }
       void set_open(`endpoint` _other) {
           mutex.lock();
           closed = false;
           reported_closed = false;
           other = _other;
           mutex.unlock();
           sem.up();
       }
       void wait_open(bool closed_val = false){
           while (true) {
               mutex.lock();
               if (closed == closed_val) {
                   mutex.unlock();
                   return;
               }
               mutex.unlock();
               sem.down();
            }
       }

   };


    // construct a sockaddr_in for a specified process id using the configuration

    void get_tcp_addr(ivy_class *ivy, `endpoint` addr, sockaddr_in &myaddr) {
        memset((char *)&myaddr, 0, sizeof(myaddr));
        myaddr.sin_family = AF_INET;
        myaddr.sin_addr.s_addr = htonl(addr.addr);
    myaddr.sin_port = htons(addr.port);
    }

    // get a new TCP socket

    int make_tcp_socket() {
        int sock = ::socket(AF_INET, SOCK_STREAM, 0);
        if (sock < 0)
            { std::cerr << "cannot create socket\n"; exit(1); }
        int one = 1;
        if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)) < 0)
            { perror("setsockopt failed"); exit(1); }
        return sock;
    }


    // This structure holds all the callbacks for the endpoint. These are function objects
    // that are called asynchronously.

    struct tcp_callbacks {
        %`handle_accept` acb;
        %`handle_recv` rcb;
        %`handle_fail` fcb;
        %`handle_connected` ccb;
        tcp_callbacks(const %`handle_accept` &acb,
                      const %`handle_recv` &rcb,
                      const %`handle_fail` &fcb,
                      const %`handle_connected` ccb)
            : acb(acb), rcb(rcb), fcb(fcb), ccb(ccb) {}
    };

    // This is a general class for an asynchronous task. These objects are called in a loop
    // by a thread allocated by the runtime. The fdes method returns a file descriptor
    // associated with the object. If fdes returns a negative value, the thread deletes the
    // object and terminates.

    class tcp_task : public reader {
      protected:
        int sock;           // socket associated to this task, or -1 if task complete
        tcp_callbacks cb;   // callbacks to ivy
        ivy_class *ivy;     // pointer to main ivy object (mainly to get lock)

      public:

        tcp_task(int sock, const tcp_callbacks &cb, ivy_class *ivy)
          : sock(sock), cb(cb), ivy(ivy) {}

        virtual int fdes() {
            return sock;
        }


    };


    // This task reads messages from a socket and calls the "recv" callback.

    class tcp_reader : public tcp_task {
        std::vector<char> buf;
      public:
        tcp_reader(int sock, const tcp_callbacks &cb, ivy_class *ivy)
            : tcp_task(sock, cb, ivy) {
        }

        // This is called in a loop by the task thread.

        virtual void read() {
//            std::cerr << "RECEIVING\n";

            buf.resize(2048);
            int newbytes;
            if ((newbytes = ::read(sock,&buf[0],2048)) < 0)
            { perror("read failed"); exit(1); }

            // copy received bytes into the result

            `data` pkt;
            pkt.resize(newbytes);
            std::copy(buf.begin(),buf.begin()+newbytes,pkt.begin());

            // call the "recv" callback with the received message

            ivy->__lock();
            cb.rcb(sock,pkt);
            ivy->__unlock();
        }
    };


    // This class writes queued bytes to a socket. Packets can be added
    // asynchronously to the tail of the queue. If the socket is closed,
    // the queue will be emptied asynchrnonously. When the queue is empty the writer deletes
    // the queue and exits.

    // invariant: if socket is closed, queue is closed

    class tcp_writer : public tcp_task {
        tcp_queue *queue;
        bool connected;
      public:
        tcp_writer(int sock, tcp_queue *queue, const tcp_callbacks &cb, ivy_class *ivy)
            : tcp_task(sock,cb,ivy), queue(queue), connected(false) {
        }

        virtual int fdes() {
            return sock;
        }

        // This is called in a loop by the task thread.

        virtual void read() {

            if (!connected) {

                // if the socket is not connected, wait for the queue to be open,
                // then connect

                queue->wait_open();
                connect();
                return;
            }

            // dequeue a packet to send

            std::vector<char> buf;
            bool qclosed = queue->dequeue_swap(buf);

            // if queue has been closed asynchrononously, close the socket.

            if (qclosed) {
                // std::cerr << "CLOSING " << sock << std::endl;
                ::close(sock);
                connected = false;
                return;
            }

            // try a blocking send

            int bytes = send(sock,&buf[0],buf.size(),MSG_NOSIGNAL);

            // std::cerr << "SENT\n";

            // if not all bytes sent, channel has failed, close the queue

            if (bytes < (int)buf.size())
                fail_close();
        }

        void connect() {

            // Get the address of the other from the configuration

            // std::cerr << "ENTERING CONNECT " << sock << std::endl;

            ivy -> __lock();               // can be asynchronous, so must lock ivy!
            struct sockaddr_in myaddr;
            `endpoint` other = queue->other;
            get_tcp_addr(ivy,other,myaddr);
            ivy -> __unlock();

            // Call connect to make connection

            // std::cerr << "CONNECTING sock=" << sock << "other=" << other << std::endl;

            int res = ::connect(sock,(sockaddr *)&myaddr,sizeof(myaddr));

            // If successful, call the "connected" callback, else "failed"

            ivy->__lock();
            if (res >= 0) {
                // std::cerr << "CONNECT SUCCEEDED " << sock << std::endl;
                cb.ccb(sock);
                connected = true;
            }
            else {
                // std::cerr << "CONNECT FAILED " << sock << std::endl;
                fail_close();
            }
            ivy->__unlock();

        }

        void fail_close() {
            queue -> set_closed(false);  // close queue synchronously

            // make sure socket is closed before fail callback, since this
            // might open another socket, and we don't want to build up
            // zombie sockets.

            // std::cerr << "CLOSING ON FAILURE " << sock << std::endl;
            ::close(sock);
            cb.fcb(sock);
            connected = false;
        }

    };

    // This task listens for connections on a socket in the background.

    class tcp_listener : public tcp_task {
      public:

        // The constructor creates a socket to listen on.

        tcp_listener(const tcp_callbacks &cb, ivy_class *ivy)
            : tcp_task(0,cb,ivy) {
            sock = make_tcp_socket();
        }

        // The bind method is called by the runtime once, after initialization.
        // This allows us to query the configuration for our address and bind the socket.

        virtual void bind() {
        }

        // After binding, the thread calls read in a loop. In this case, we don't read,
        // we try accepting a connection. BUG: We should first call select to wait for a connection
        // to be available, then call accept while holding the ivy lock. This is needed to
        // guarantee the "accepted" appears to occur before "connected" which is required by
        // the the tcp interface specification.

        virtual void read() {
            // std::cerr << "ACCEPTING\n";

            // Call accept to get an incoming connection request. May block.
            sockaddr_in other_addr;
            socklen_t addrlen = sizeof(other_addr);
            int new_sock = accept(sock, (sockaddr *)&other_addr, &addrlen);

            // If this fails, something is very wrong: fail stop.
            if (new_sock < 0)
                { perror("accept failed"); exit(1); }

            `endpoint` other;
            other.addr = ntohl(other_addr.sin_addr.s_addr);
            other.port = ntohs(other_addr.sin_port);

            // Run the "accept" callback. Since it's async, we must lock.
            ivy->__lock();
            cb.acb(new_sock,other);
            ivy->__unlock();

            // Install a reader task to read messages from the new socket.
            ivy->install_reader(new tcp_reader(new_sock,cb,ivy));
        }
    };



>>>
Here we put any new members of the ivy C++ class. If we have allocated a per-instance object, we declared it here anti-quoted. The plugs in the actual member name, which may be any array if this is a parameterized instance.

<<< member

    hash_space::hash_map<int,void *> `rdr`;
    tcp_callbacks *`cb`;             // the callbacks to ivy
    hash_space::hash_map<int,tcp_queue *> `send_queue`;   // queues of blocked packets, per socket

>>>
Here, we put code to go in the initializer. If this is a parameterized instance, then this code will be run in a loop, so we have to be careful that any initialization of common objects is idempotent.

<<< init


    // Create the callbacks. In a parameterized instance, this creates
    // one set of callbacks for each endpoint id. When you put an
    // action in anti-quotes it creates a function object (a "thunk")
    // that captures the instance environment, in this case including
    // the instance's endpoint id "me".

    `cb` = new tcp_callbacks(`handle_accept`,`handle_recv`,`handle_fail`,`handle_connected`);

>>>
These actions are handlers for the callbacks. They just insert the endpoint's id and call the corresponding callback action.

    action handle_accept(s:socket, other:endpoint) = {
        call accept(s,other)
    }

    action handle_recv(s:socket,x:data) = {
        call recv(s,x)
    }

    action handle_fail(s:socket) = {
        call failed(s)
    }

    action handle_connected(s:socket) = {
        call connected(s)
    }
These are the implementations of the interface calls. These operations are synchronous.

open creates a socket, binds it to the requested endpoint, and starts listening on it.

    action open(addr:endpoint) returns (s:socket) = {
        <<< impure
            tcp_listener *r = new tcp_listener(*`cb`,this);
            s = r->fdes();
            // std::cerr << "SOCKET " << s << std::endl;
            struct sockaddr_in myaddr;
            myaddr.sin_family = AF_INET;
            myaddr.sin_addr.s_addr = htonl(addr.addr);
            // myaddr.sin_addr.s_addr = htonl(INADDR_ANY);
            myaddr.sin_port = htons(addr.port);
            // std::cerr << "binding id: " << " addr: " << ntohl(myaddr.sin_addr.s_addr) << " port: " << ntohs(myaddr.sin_port) << std::endl;
            if (::bind(s, (struct sockaddr *)&myaddr, sizeof(myaddr)) < 0)
                { perror("bind failed"); exit(1); }
            // Start lisetning on the socket
            if (listen(s,2) < 0)
                { std::cerr << "cannot listen on socket\n"; exit(1); }
            `rdr`[s] = r;
            install_reader(r);
        >>>
    }
close the socket

    action close(s:socket) = {
        <<< impure

            // We don't want to close a socket when there is another thread
            // waiting, because the other thread won't know what to do with the
            // error.

            // Instead we shut down the socket and let the other thread close it.
            // If there is a reader thread, it will see EOF and close the socket. If there is
            // on open writer thread, it will close the socket after we close the
            // send queue. If the queue is already closed, closing it has no effect.

            // invariant: if a socket is open there is a reader thread or
            // an open writer thread, but not both.

            // Because of this invariant, the socket will be closed exactly once.

            ::shutdown(s,SHUT_RDWR);

            if (`send_queue`.find(s) != `send_queue`.end())
                `send_queue`[s] -> set_closed();

        >>>
    }
connect creates a socket and then installs a connector task to establish the connection asynchronously.

    action connect(other:endpoint) returns (s:socket) = {
        <<< impure
            s = make_tcp_socket();
            // std::cerr << "SOCKET " << s << std::endl;

            // create a send queue for this socket, if needed, along with
            // its thread. if the queue exists, it must be closed, so
            // we open it.

            tcp_queue *queue;
            if (`send_queue`.find(s) == `send_queue`.end()) {
                `send_queue`[s] = queue = new tcp_queue(other);
                 install_thread(new tcp_writer(s,queue,*`cb`,this));
            } else
                `send_queue`[s] -> set_open(other);
        >>>
    }
send serializes the message and calls unix send to send it in the given socket. It returns true if sending was successful. Not, a "successfully" sent packet could still be lost to due congestion. If we want to maintain ordered semantics, we will need to have an additional return code indicating the packet could not be queued (analogous to EAGAIN).

    action send(s:socket,p:data) returns (ok:bool) = {
        <<< impure
                std::vector<char> buf;
                buf.resize(p.size());
                std::copy(p.begin(),p.end(),buf.begin());
//                std::cerr << "SENDING\n";

                // if the send queue for this sock doesn's exist, it isn't open,
                // so the client has violated the precondition. we do the bad client
                // the service of not crashing.

                if (`send_queue`.find(s) == `send_queue`.end())
                    ok = true;

                else {
                    // get the send queue, and enqueue the packet, returning false if
                    // the queue is closed.

                    ok = !`send_queue`[s]->enqueue_swap(buf);
               }
        >>>
    }

    action accept(s:socket,other:endpoint)

    action recv(s:socket,p:data)

    action failed(s:socket)

    action connected(s:socket)

    trusted isolate iso = this

    attribute test = impl
}