Tcp impl

include ip
This is an implementation of a generic TCP endpoint. It allows you to create point-to-point, non-duplicating, ordered connections that transfer sequences of messages. The parameters are:

pkt  : the type of messages
me   : the id of this endpoint
port_base : the default port of endpoint 0

If the environment does not set up a configuration, the the endpoint has IP address 127.0.0.1 and port number port_base + me.

module tcp_impl(pkt,me,ip_addr, port_base,ser,des) = {
type socket

These empty objects are used to hold C++ values.

    object rdr = {}         # the listener object
    object rdra = {}         # the listener object
    object cb = {}          # struct holding the callbacks
    object send_queue = {}  # queues of outgoing packets
This code goes in the C++ header file, ahead of the ivy object declaration. Here, we put declarations (perhaps forward) of any auxiliary classes we need). We need to be careful that the names of these don't clash with other modules. However, duplicates are removed, so we don't have to worry about multiple instances of this module clashing.

<<< header
include include include include include

    class tcp_listener;   // class of threads that listen for connections
    class tcp_listener_accept;   // class of threads that listen for connections
    class tcp_callbacks;  // class holding callbacks to ivy

    // A tcp_config maps endpoint ids to IP addresses and ports.

    class tcp_config {
    public:
        // get the address and port from the endpoint id
        virtual void get(int id, unsigned long &inetaddr, unsigned long &inetport);

        virtual void get_other(int id, unsigned int other_ip, unsigned long &inetaddr, unsigned long &inetport);

        // get the endpoint id from the address and port
        virtual int rev(unsigned long inetaddr, unsigned long inetport);
    };

    class tcp_queue;


>>>
This code goes in the C++ implementation file. Here, we put implementations of the classes declared in the header, and auxiliary functions.

<<< impl

   // Maximum number of sent packets to queue on a channel. Because TCP also
   // buffers, the total number of untransmitted backets that can back up will be greater
   // than this. This number *must* be at least one to void packet corruption.
define MAX_TCP_SEND_QUEUE 16

   struct tcp_mutex {
ifdef _WIN32
       HANDLE mutex;
       tcp_mutex() { mutex = CreateMutex(NULL,FALSE,NULL); }
       void lock() { WaitForSingleObject(mutex,INFINITE); }
       void unlock() { ReleaseMutex(mutex); }
else
       pthread_mutex_t mutex;
       tcp_mutex() { pthread_mutex_init(&mutex,NULL); }
       void lock() { pthread_mutex_lock(&mutex); }
       void unlock() { pthread_mutex_unlock(&mutex); }
endif
   };

   struct tcp_sem {
       sem_t sem;
       tcp_sem() { sem_init(&sem,0,0); }
       void up() {
            std::cerr << "UPPING\n";
            int value;
            if (sem_getvalue(&sem, &value) == 0) {
                std::cerr << "Semaphore value: " << value << "\n";
            } else {
                perror("Error getting semaphore value");
                exit(EXIT_FAILURE);
            }
            sem_post(&sem);
            if (sem_getvalue(&sem, &value) == 0) {
                std::cerr << "Semaphore value: " << value << "\n";
            } else {
                perror("Error getting semaphore value");
                exit(EXIT_FAILURE);
            }
        }
       void down() {
            std::cerr << "DOWNING\n";
            int value;
            if (sem_getvalue(&sem, &value) == 0) {
                std::cerr << "Semaphore value: " << value << "\n";
            } else {
                perror("Error getting semaphore value");
                exit(EXIT_FAILURE);
            }
            sem_wait(&sem);
            if (sem_getvalue(&sem, &value) == 0) {
                std::cerr << "Semaphore value: " << value << "\n";
            } else {
                perror("Error getting semaphore value");
                exit(EXIT_FAILURE);
            }
        }
   };

   class tcp_queue {
       tcp_mutex mutex;
       tcp_sem sem;
       bool closed;
       bool reported_closed;
       std::list<std::vector<char> > bufs;
    public:
       int other; // only acces while holding lock!
       unsigned int other_ip; // only acces while holding lock!
       tcp_queue(int other, unsigned int other_ip) : closed(false), reported_closed(false), other(other), other_ip(other_ip) {}
       bool enqueue_swap(std::vector<char> &buf) {
            std::cerr << "ENQUEUEING\n";
           mutex.lock();
           if (closed) {
               mutex.unlock();
               return true;
           }
           if (bufs.size() < MAX_TCP_SEND_QUEUE) {
               bufs.push_back(std::vector<char>());
               buf.swap(bufs.back());
           }
           mutex.unlock();
           sem.up();
           return false;
       }
       bool dequeue_swap(std::vector<char> &buf) {
          std::cerr << "DEQUEUEING" << std::endl;
           while(true) {
               sem.down();
               std::cerr << "DEQUEUEING 2" << closed << std::endl;
               mutex.lock();
               if (closed) {
                   std::cerr << "DEQUEUEING CLOSED" << std::endl;
                   if (reported_closed) {
                       mutex.unlock();
                       continue;
                   }
                   reported_closed = true;
                   mutex.unlock();
                   std::cerr << "REPORTING CLOSED" << std::endl;
                   return true;
               }
               if (bufs.size() > 0) {
                   std::cerr << "DEQUEUEING NOT CLOSED" << std::endl;
                   buf.swap(bufs.front());
                   bufs.erase(bufs.begin());
                   mutex.unlock();
                   return false;
               }
               mutex.unlock();
            }
       }
       void set_closed(bool report=true) {
           mutex.lock();
           closed = true;
           bufs.clear();
           if (!report)
               reported_closed = true;
           mutex.unlock();
           sem.up();
       }
       void set_open(int _other) {
           mutex.lock();
           closed = false;
           reported_closed = false;
           other = _other;
           mutex.unlock();
           sem.up();
       }
       void wait_open(bool closed_val = false){
           while (true) {
               mutex.lock();
               if (closed == closed_val) {
                   mutex.unlock();
                   return;
               }
               mutex.unlock();
               sem.down();
            }
       }

   };

   // The default configuration gives address 127.0.0.1 and port port_base + id.

    void tcp_config::get(int id, unsigned long &inetaddr, unsigned long &inetport) {
ifdef _WIN32
            inetaddr = ntohl(inet_addr("127.0.0.1")); // can't send to INADDR_ANY in windows
else
            std::cerr << "WARNING: using default TCP configuration\n";
            //std::cerr << `ip_addr`;
            inetaddr =  0x0a000001; //INADDR_ANY; //TODO
endif
            inetport = `port_base`; //+ id;
    }

     void tcp_config::get_other(int id, unsigned int other_ip,unsigned long &inetaddr, unsigned long &inetport) {
ifdef _WIN32
            inetaddr = ntohl(inet_addr("127.0.0.1")); // can't send to INADDR_ANY in windows
else
            std::cerr << "WARNING: using default TCP configuration\n";
            //std::cerr << `ip_addr`;
            inetaddr =  other_ip; //INADDR_ANY; //TODO
endif
            inetport = `port_base`; //+ id;
    }

    // This reverses the default configuration's map. Note, this is a little dangerous
    // since an attacker could cause a bogus id to be returned. For the moment we have
    // no way to know the correct range of endpoint ids.

    int tcp_config::rev(unsigned long inetaddr, unsigned long inetport) {
        return inetport - `port_base`; // don't use this for real, it's vulnerable
    }

    // construct a sockaddr_in for a specified process id using the configuration

    void get_tcp_addr(ivy_class *ivy, int my_id, sockaddr_in &myaddr) {
        memset((char *)&myaddr, 0, sizeof(myaddr));
        unsigned long inetaddr;
        unsigned long inetport;
        ivy->get_tcp_config() -> get(my_id,inetaddr,inetport);
        myaddr.sin_family = AF_INET;
        myaddr.sin_addr.s_addr = htonl(inetaddr);
        myaddr.sin_port = htons(inetport);
    }

    void get_other_tcp_addr(ivy_class *ivy, int my_id, unsigned int other_ip, sockaddr_in &myaddr) {
        memset((char *)&myaddr, 0, sizeof(myaddr));
        unsigned long inetaddr;
        unsigned long inetport;
        ivy->get_tcp_config() -> get_other(my_id,other_ip, inetaddr,inetport);
        myaddr.sin_family = AF_INET;
        myaddr.sin_addr.s_addr = htonl(inetaddr);
        myaddr.sin_port = htons(inetport);
    }

    // get the process id of a sockaddr_in using the configuration in reverse
    // TODO sould not work
    int get_tcp_id(ivy_class *ivy, const sockaddr_in &myaddr) {
       return 2; //ivy->get_tcp_config() -> rev(ntohl(myaddr.sin_addr.s_addr), ntohs(myaddr.sin_port));
    }

    // get a new TCP socket

    int make_tcp_socket() {
        int sock = ::socket(AF_INET, SOCK_STREAM, 0);
        char * dev = strdup("ivy"); //strdup("lo"); //TODO
        struct ifreq ifr;
        memset(&ifr, 0, sizeof(struct ifreq));
        ifr.ifr_flags = false;
        snprintf(ifr.ifr_name, sizeof(ifr.ifr_name), "lo");
        ioctl(sock, SIOCGIFINDEX, &ifr);
        /*if (addr.interface == `ip.ivy`) {
            dev = strdup("ivy");
        }*/
        if (sock < 0)
            { std::cerr << "cannot create socket\n"; exit(1); }
        int one = 1;
        //if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, dev, strlen(dev)) < 0)
        if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)) < 0)
            { perror("setsockopt failed"); exit(1); }
        return sock;
    }


    // This structure holds all the callbacks for the endpoint. These are function objects
    // that are called asynchronously.

    struct tcp_callbacks {
        %`impl.handle_accept` acb;
        %`impl.handle_recv` rcb;
        %`impl.handle_fail` fcb;
        %`impl.handle_connected` ccb;
        tcp_callbacks(const %`impl.handle_accept` &acb,
                      const %`impl.handle_recv` &rcb,
                      const %`impl.handle_fail` &fcb,
                      const %`impl.handle_connected` ccb)
            : acb(acb), rcb(rcb), fcb(fcb), ccb(ccb) {}
    };

    // This is a general class for an asynchronous task. These objects are called in a loop
    // by a thread allocated by the runtime. The fdes method returns a file descriptor
    // associated with the object. If fdes returns a negative value, the thread deletes the
    // object and terminates.

    class tcp_task : public reader {
      protected:
        int sock;           // socket associated to this task, or -1 if task complete
        int my_id;          // endpoint id associated to this task
        tcp_callbacks cb;   // callbacks to ivy
        ivy_class *ivy;     // pointer to main ivy object (mainly to get lock)

      public:

        tcp_task(int my_id, int sock, const tcp_callbacks &cb, ivy_class *ivy)
          : my_id(my_id), sock(sock), cb(cb), ivy(ivy) {}

        virtual int fdes() {
            std::cerr << "tcp_task fdes " << sock << std::endl;
            return sock;
        }


    };


    // This task reads messages from a socket and calls the "recv" callback.

    class tcp_reader : public tcp_task {
        std::vector<char> buf;
      public:
        tcp_reader(int my_id, int sock, const tcp_callbacks &cb, ivy_class *ivy)
            : tcp_task(my_id, sock, cb, ivy) {
        }

        virtual int fdes() {
            std::cerr << "tcp_reader fdes " << sock << std::endl;
            return sock;
        }

        // This is called in a loop by the task thread.

        virtual void read() {
            std::cerr << "RECEIVING\n";
            int len=0;
            socklen_t lenlen=4;
ifdef _WIN32
            if (getsockopt(sock,SOL_SOCKET,SO_RCVBUF,(char *)&len,&lenlen))
else
            if (getsockopt(sock,SOL_SOCKET,SO_RCVBUF,&len,&lenlen))
endif
            { perror("getsockopt failed"); exit(1); }
            std::vector<char> buf(len);
            int bytes;
            sockaddr_in srcaddr;
            socklen_t addrlen = sizeof(srcaddr);
            if ((bytes = recvfrom(sock,&buf[0],len,0,(sockaddr *)&srcaddr,&addrlen)) < 0)
            { std::cerr << "recvfrom failed\n"; exit(1); }
            std::cerr << "RECEIVED " << bytes << std::endl;
            if (bytes > 0) {
                buf.resize(bytes);
                std::cerr << "Buffer size: " << buf.size() << std::endl;
                `pkt` pkt;                      // holds received message
                //ivy_socket_deser ds(sock,buf);  // initializer deserialize with any leftover bytes
                //ivy_socket_deser_128 ds(sock,buf);  // initializer deserialize with any leftover bytes
                `des` ds(buf);
                try {
                    __deser(ds,pkt);            // read the message
                }
                // If packet has bad syntax, we drop it, close the socket, call the "failed"
                // callback and terminate the task.
                catch (deser_err &){
                    if (ds.pos > 0)
                        std::cerr << "BAD PACKET RECEIVED\n";
                    else
                        std::cerr << "EOF ON SOCKET\n";
                    cb.fcb(sock);
                    close(sock);
                    sock = -1;
                    return;
                }

                // copy the leftover bytes to buf

                buf.resize(ds.inp.size()-ds.pos);
                std::copy(ds.inp.begin()+ds.pos,ds.inp.end(),buf.begin());

                // call the "recv" callback with the received message

                ivy->__lock();
                ivy->_generating = false;
                cb.rcb(sock,pkt);
                ivy->__unlock();
            }
            std::cerr << "RECEIVING finsh\n";
            buf.clear();                    // clear the leftover bytes
        }
    };


    // This class writes queued packets to a socket. Packets can be added
    // asynchronously to the tail of the queue. If the socket is closed,
    // the queue will be emptied asynchrnonously. When the queue is empty the writer deletes
    // the queue and exits.

    // invariant: if socket is closed, queue is closed

    class tcp_writer : public tcp_task {
        tcp_queue *queue;
        bool connected;
      public:
        tcp_writer(int my_id, int sock, tcp_queue *queue, const tcp_callbacks &cb, ivy_class *ivy)
            : tcp_task(my_id,sock,cb,ivy), queue(queue), connected(false) {
        }

        virtual int fdes() {
            std::cerr << "tcp_writer fdes " << sock << std::endl;
            return sock;
        }

        // This is called in a loop by the task thread.

        virtual void read() {
            std::cerr << "WRITING\n";

            if (!connected) {
                // if the socket is not connected, wait for the queue to be open,
                // then connect
                std::cerr << "WAITING FOR OPEN " << sock << std::endl;
                queue->wait_open();
                connect();
                return;
            }

            // dequeue a packet to send

            std::vector<char> buf;
            bool qclosed = queue->dequeue_swap(buf);
            std::cerr << "DEQUEUED " << qclosed << std::endl;
            std::cerr << "SIZE " << buf.size() << std::endl;
            // if queue has been closed asynchrononously, close the socket.

            if (qclosed) {
                std::cerr << "CLOSING " << sock << std::endl;
                ::close(sock);
                connected = false;
                return;
            }

            // try a blocking send
            std::cerr << "SENDING 2\n";
            int bytes = send(sock, &buf[0], buf.size(), MSG_NOSIGNAL);

            std::cerr << "SENT\n";

            // if not all bytes sent, channel has failed, close the queue

            if (bytes < (int)buf.size()) {
                std::cerr << bytes << " " << buf.size() << std::endl;
                std::cerr << "SEND FAILED\n";
                fail_close();
            }
        }

        void connect() {

            // Get the address of the other from the configuration

            std::cerr << "ENTERING CONNECT " << sock << std::endl;

            ivy -> __lock();               // can be asynchronous, so must lock ivy!
            struct sockaddr_in myaddr;
            int other = queue->other;
            get_other_tcp_addr(ivy,other, queue->other_ip,myaddr);
            ivy -> __unlock();

            // Call connect to make connection

            std::cerr << "CONNECTING sock=" << sock << " other=" << other << std::endl;

            int res = ::connect(sock,(sockaddr *)&myaddr,sizeof(myaddr));

            // If successful, call the "connected" callback, else "failed"

            ivy->__lock();
            if (res >= 0) {
                std::cerr << "CONNECT SUCCEEDED " << sock << std::endl;
                cb.ccb(sock);
                connected = true;
                // Install a reader task to read messages from the new socket.
                std::cerr << "INSTALLING READER tcp_reader" <<std::endl;
                ivy->install_reader(new tcp_reader(my_id,sock,cb,ivy));
            }
            else {
                std::cerr << "CONNECT FAILED " << sock << std::endl;
                fail_close();
            }
            ivy->__unlock();

        }

        void fail_close() {
            queue -> set_closed(false);  // close queue synchronously

            // make sure socket is closed before fail callback, since this
            // might open another socket, and we don't want to build up
            // zombie sockets.

            std::cerr << "CLOSING ON FAILURE " << sock << std::endl;
            ::close(sock);
            cb.fcb(sock);
            connected = false;
        }

    };

    // This task listens for connections on a socket in the background.

    class tcp_listener : public tcp_task {
      public:

        // The constructor creates a socket to listen on.

        tcp_listener(int my_id, const tcp_callbacks &cb, ivy_class *ivy)
            : tcp_task(my_id,0,cb,ivy) {
            sock = make_tcp_socket();
        }

        virtual int fdes() {
            std::cerr << "tcp_listener fdes " << sock << std::endl;
            return sock;
        }

        // The bind method is called by the runtime once, after initialization.
        // This allows us to query the configuration for our address and bind the socket.

        virtual void bind() {
            ivy -> __lock();  // can be asynchronous, so must lock ivy!

            // Get our endpoint address from the configuration
            struct sockaddr_in myaddr;
            get_tcp_addr(ivy,my_id,myaddr);

            std::cerr << "binding id: " << my_id << " port: " << ntohs(myaddr.sin_port) << std::endl;

            // Bind the socket to our address
            if (::bind(sock, (struct sockaddr *)&myaddr, sizeof(myaddr)) < 0)
                { perror("bind failed"); exit(1); }

            // Start lisetning on the socket
            if (listen(sock,2) < 0)
                { std::cerr << "cannot listen on socket\n"; exit(1); }

            ivy -> __unlock();
        }

        // After binding, the thread calls read in a loop. In this case, we don't read,
        // we try accepting a connection. BUG: We should first call select to wait for a connection
        // to be available, then call accept while holding the ivy lock. This is needed to
        // guarantee the "accepted" appears to occur before "connected" which is required by
        // the the tcp interface specification.

        virtual void read() {
            //ivy->__lock();
            std::cerr << "tcp_listener ACCEPTING\n";

            // Call accept to get an incoming connection request. May block.
            sockaddr_in other_addr;
            socklen_t addrlen = sizeof(other_addr);
            int new_sock = accept(sock, (sockaddr *)&other_addr, &addrlen);

            // If this fails, something is very wrong: fail stop.
            if (new_sock < 0){
                std::cerr << "cannot accept connection\n";
                perror("accept failed"); exit(1);
            }

            // Get the endpoint id of the other from its address.
            int other = get_tcp_id(ivy,other_addr);
            std::cerr << "ACCEPTED " << new_sock << " " << other << std::endl;

            // Run the "accept" callback. Since it's async, we must lock.
            ivy->__lock();
            cb.acb(new_sock,other);
            ivy->__unlock();

            // Install a reader task to read messages from the new socket.
            std::cerr << "INSTALLING READER tcp_reader" <<std::endl;
            ivy->install_reader(new tcp_reader(my_id,new_sock,cb,ivy));
            //ivy->__unlock();
        }
    };

    class tcp_listener_accept : public tcp_task {
      public:

        // The constructor creates a socket to listen on.

        tcp_listener_accept(int my_id, const tcp_callbacks &cb, ivy_class *ivy)
            : tcp_task(my_id,0,cb,ivy) {
            sock = make_tcp_socket();
        }

        virtual int fdes() {
            std::cerr << "tcp_listener_accept fdes " << sock << std::endl;
            return sock;
        }

        // The bind method is called by the runtime once, after initialization.
        // This allows us to query the configuration for our address and bind the socket.

        virtual void bind() {
            //ivy->__lock();
            std::cerr << "tcp_listener_accept ACCEPTING\n";

            // Call accept to get an incoming connection request. May block.
            sockaddr_in other_addr;
            socklen_t addrlen = sizeof(other_addr);
            int new_sock = accept(sock, (sockaddr *)&other_addr, &addrlen);

            // If this fails, something is very wrong: fail stop.
            if (new_sock < 0){
                std::cerr << "cannot accept connection\n";
                perror("accept failed"); exit(1);
            }

            // Get the endpoint id of the other from its address.
            int other = get_tcp_id(ivy,other_addr);
            std::cerr << "ACCEPTED " << new_sock << " " << other << std::endl;

            // Run the "accept" callback. Since it's async, we must lock.
            ivy->__lock();
            cb.acb(new_sock,other);
            ivy->__unlock();

            // Install a reader task to read messages from the new socket.
            std::cerr << "INSTALLING READER tcp_reader tcp_listener_accept" <<std::endl;
            ivy->install_reader(new tcp_reader(my_id,new_sock,cb,ivy));
            //ivy->__unlock();
        }

        // After binding, the thread calls read in a loop. In this case, we don't read,
        // we try accepting a connection. BUG: We should first call select to wait for a connection
        // to be available, then call accept while holding the ivy lock. This is needed to
        // guarantee the "accepted" appears to occur before "connected" which is required by
        // the the tcp interface specification.

        virtual void read() {
            //ivy->__lock();
            //TODO
            std::cerr << "tcp_listener ACCEPTING\n";

            // Call accept to get an incoming connection request. May block.
            sockaddr_in other_addr;
            socklen_t addrlen = sizeof(other_addr);
            int new_sock = accept(sock, (sockaddr *)&other_addr, &addrlen);

            // If this fails, something is very wrong: fail stop.
            if (new_sock < 0){
                std::cerr << "cannot accept connection\n";
                perror("accept failed"); exit(1);
            }

            // Get the endpoint id of the other from its address.
            int other = get_tcp_id(ivy,other_addr);
            std::cerr << "ACCEPTED " << new_sock << " " << other << std::endl;

            // Run the "accept" callback. Since it's async, we must lock.
            ivy->__lock();
            cb.acb(new_sock,other);
            ivy->__unlock();

            // Install a reader task to read messages from the new socket.
            std::cerr << "INSTALLING READER tcp_reader" <<std::endl;
            ivy->install_reader(new tcp_reader(my_id,new_sock,cb,ivy));
            //ivy->__unlock();
        }
    };

>>>
Here we put any new members of the ivy C++ class. If we have allocated a per-instance object, we declared it here anti-quoted. The plugs in the actual member name, which may be any array if this is a parameterized instance.
object impl = {

    <<< member

        tcp_listener *`rdr`;             // the listener task
        tcp_listener_accept *`rdra`;             // the listener task
        tcp_callbacks *`cb`;             // the callbacks to ivy
        hash_space::hash_map<int,tcp_queue *> `send_queue`;   // queues of blocked packets, per socket

    >>>

    <<< member

        tcp_config *the_tcp_config;  // the current configurations

        // Get the current TCP configuration. If none, create a default one.

        tcp_config *get_tcp_config() {
            if (!the_tcp_config)
                the_tcp_config = new tcp_config();
            return the_tcp_config;
        }

        // Set the current TCP configuration. This is called by the runtime environment.

        void set_tcp_config(tcp_config *conf) {
            the_tcp_config = conf;
        }

    >>>
Here, we put code to go in the initializer. If this is a parameterized instance, then this code will be run in a loop, so we have to be careful that any initialization of common objects is idempotent.

    <<< init

        the_tcp_config = 0;

        // Create the callbacks. In a parameterized instance, this creates
        // one set of callbacks for each endpoint id. When you put an
        // action in anti-quotes it creates a function object (a "thunk")
        // that captures the instance environment, in this case including
        // the instance's endpoint id "me".

        `cb` = new tcp_callbacks(`handle_accept`,`handle_recv`,`handle_fail`,`handle_connected`);

        // Install a listener task for this endpoint. If parameterized, this creates
        // one for each endpoint.
        std::cerr << "INSTALLING LISTENER tcp_listener" <<std::endl;
        this->install_reader(`rdr` = new tcp_listener(`me`,*`cb`,this));

    >>>
These actions are handlers for the callbacks. They just insert the endpoint's id and call the corresponding callback action.

    action handle_accept(s:socket, other:bgp_id) = {
        call show_handle_accept(s,other);
        call accept(me,s,other)
    }

    action handle_recv(s:socket,x:pkt) = {
        call show_handle_recv(s,x);
        call recv(me,s,x)
    }

    action handle_fail(s:socket) = {
        call show_handle_fail(s);
        call failed(me,s)
    }

    action handle_connected(s:socket) = {
        call show_handle_connected(s);
        call connected(me,s)
    }
implement connected(self:bgp_id, s:socket) { <<< impure std::cerr << "CONNECTED " << s << std::endl; //rdr->read(); >>> }

    action show_handle_accept(s:socket, other:bgp_id) = {
        <<< impure
            std::cerr << "handle ACCEPTED " << s << std::endl;
        >>>
    }

    action show_handle_recv(s:socket,x:pkt) = {
        <<< impure
            std::cerr << "handle RECEIVED " << s << std::endl;
        >>>
    }

    action show_handle_fail(s:socket) = {
        <<< impure
            std::cerr << "handle FAILED " << s << std::endl;
        >>>
    }

    action show_handle_connected(s:socket) = {
        <<< impure
            std::cerr << "handle CONNECTED " << s << std::endl;
        >>>
    }
These are the implementations of the interface calls. These operations are synchronous.

close the socket

    implement close(self:bgp_id, s:socket) {
        <<< impure
            std::cerr << "CLOSING " << s << std::endl;
            // We don't want to close a socket when there is another thread
            // waiting, because the other thread won't know what to do with the
            // error.

            // Instead we shut down the socket and let the other thread close it.
            // If there is a reader thread, it will see EOF and close the socket. If there is
            // on open writer thread, it will close the socket after we close the
            // send queue. If the queue is already closed, closing it has no effect.

            // invariant: if a socket is open there is a reader thread or
            // an open writer thread, but not both.

            // Because of this invariant, the socket will be closed exactly once.

            ::shutdown(s,SHUT_RDWR);

            if (`send_queue`.find(s) != `send_queue`.end())
                `send_queue`[s] -> set_closed();

        >>>
    }
connect creates a socket and then installs a connector task to establish the connection asynchronously.

    implement connect(self:bgp_id, other:bgp_id, other_ip:ip.addr) returns (s:socket) {
        <<< impure
            s = make_tcp_socket();
            std::cerr << "SOCKET " << s << std::endl;

            // create a send queue for this socket, if needed, along with
            // its thread. if the queue exists, it must be closed, so
            // we open it.

            tcp_queue *queue;
            if (`send_queue`.find(s) == `send_queue`.end()) {
                std::cerr << "CREATING QUEUE " << s << std::endl;
                std::cerr << "OTHER " << other << std::endl;
                std::cerr << "OTHER IP " << other_ip << std::endl;
                `send_queue`[s] = queue = new tcp_queue(other, other_ip);
                std::cerr << "INSTALLING WRITER tcp_writer" <<std::endl;
                install_thread(new tcp_writer(`me`,s,queue,*`cb`,this));
            } else{
                std::cerr << "OPENING QUEUE " << s << std::endl;
                `send_queue`[s] -> set_open(other);
            }
        >>>
    }

    implement connect_accept(self:bgp_id, other:bgp_id, other_ip:ip.addr) returns (s:socket) {
        <<< impure
            s = make_tcp_socket();
            std::cerr << "SOCKET " << s << std::endl;
            install_reader(`rdra` = new tcp_listener_accept(`me`,*`cb`,this));

            // create a send queue for this socket, if needed, along with
            // its thread. if the queue exists, it must be closed, so
            // we open it.

            tcp_queue *queue;
            if (`send_queue`.find(s) == `send_queue`.end()) {
                std::cerr << "CREATING QUEUE " << s << std::endl;
                std::cerr << "OTHER " << other << std::endl;
                std::cerr << "OTHER IP " << other_ip << std::endl;
                `send_queue`[s] = queue = new tcp_queue(other, other_ip);
                std::cerr << "INSTALLING WRITER tcp_writer" <<std::endl;
                install_thread(new tcp_writer(`me`,s,queue,*`cb`,this));
            } else{
                std::cerr << "OPENING QUEUE " << s << std::endl;
                `send_queue`[s] -> set_open(other);
            }
        >>>
    }
send serializes the message and calls unix send to send it in the given socket. It returns true if sending was successful. Not, a "successfully" sent packet could still be lost to due congestion. If we want to maintain ordered semantics, we will need to have an additional return code indicating the packet could not be queued (analogous to EAGAIN).

    implement send(self:bgp_id, s:socket,other:bgp_id, p:pkt) returns (ok:bool) {
        <<< impure
                //ivy_binary_ser sr;
                //ivy_binary_ser_128 sr;
                `ser` sr;
                __ser(sr,p);
                std::cerr << "SENDING 1\n";

                // if the send queue for this sock doesn's exist, it isn't open,
                // so the client has vioalted the precondition. we do the bad client
                // the service of not crashing.

                if (`send_queue`.find(s) == `send_queue`.end()) {
                    std::cerr << "BAD SEND\n";
                    ok = true;
                }
                else {
                    // get the send queue, and enqueue the packet,
                    // returning false if the queue is closed.
                    std::cerr << "ENQUEUING\n";
                    ok = !`send_queue`[s]->enqueue_swap(sr.res);
               }
        >>>
    }
This has to be a trusted isolate, since ivy can't verify C++ code formally.

    }

    trusted isolate iso = this

    attribute test = impl
}