Tcp impl old

This is an implementation of a generic TCP endpoint. It allows you to create point-to-point, non-duplicating, ordered connections that transfer sequences of messages. The parameters are:

addr : the type of endpoint ids
pkt  : the type of messages
me   : the id of this endpoint
port_base : the default port of endpoint 0

If the environment does not set up a configuration, the the endpoint has IP address 127.0.0.1 and port number port_base + me.

module tcp_impl(addr,pkt,me,port_base) = {
These empty objects are used to hold C++ values.

    object rdr = {}         # the listener object
    object cb = {}          # struct holding the callbacks
    object send_queue = {}  # queues of outgoing packets
This code goes in the C++ header file, ahead of the ivy object declaration. Here, we put declarations (perhaps forward) of any auxiliary classes we need). We need to be careful that the names of these don't clash with other modules. However, duplicates are removed, so we don't have to worry about multiple instances of this module clashing.

<<< header
include include include

    class tcp_listener;   // class of threads that listen for connections
    class tcp_callbacks;  // class holding callbacks to ivy

    // A tcp_config maps endpoint ids to IP addresses and ports.

    class tcp_config {
    public:
        // get the address and port from the endpoint id
        virtual void get(int id, unsigned long &inetaddr, unsigned long &inetport);

        // get the endpoint id from the address and port
        virtual int rev(unsigned long inetaddr, unsigned long inetport);
    };

    class tcp_queue;


>>>
This code goes in the C++ implementation file. Here, we put implementations of the classes declared in the header, and auxiliary functions.

<<< impl

   // Maximum number of sent packets to queue on a channel. Because TCP also
   // buffers, the total number of untransmitted backets that can back up will be greater
   // than this. This number *must* be at least one to void packet corruption.
define MAX_TCP_SEND_QUEUE 16

   struct tcp_mutex {
ifdef _WIN32
       HANDLE mutex;
       tcp_mutex() { mutex = CreateMutex(NULL,FALSE,NULL); }
       void lock() { WaitForSingleObject(mutex,INFINITE); }
       void unlock() { ReleaseMutex(mutex); }
else
       pthread_mutex_t mutex;
       tcp_mutex() { pthread_mutex_init(&mutex,NULL); }
       void lock() { pthread_mutex_lock(&mutex); }
       void unlock() { pthread_mutex_unlock(&mutex); }
endif
   };

   struct tcp_sem {
       sem_t sem;
       tcp_sem() { sem_init(&sem,0,0); }
       void up() {sem_post(&sem); }
       void down() {sem_wait(&sem);}
   };

   class tcp_queue {
       tcp_mutex mutex;
       tcp_sem sem;
       bool closed;
       bool reported_closed;
       std::list<std::vector<char> > bufs;
    public:
       int other; // only acces while holding lock!
       tcp_queue(int other) : closed(false), reported_closed(false), other(other) {}
       bool enqueue_swap(std::vector<char> &buf) {
           mutex.lock();
           if (closed) {
               mutex.unlock();
               return true;
           }
           if (bufs.size() < MAX_TCP_SEND_QUEUE) {
               bufs.push_back(std::vector<char>());
               buf.swap(bufs.back());
           }
           mutex.unlock();
           sem.up();
           return false;
       }
       bool dequeue_swap(std::vector<char> &buf) {
           while(true) {
               sem.down();
               // std::cerr << "DEQUEUEING" << closed << std::endl;
               mutex.lock();
               if (closed) {
                   if (reported_closed) {
                       mutex.unlock();
                       continue;
                   }
                   reported_closed = true;
                   mutex.unlock();
                   // std::cerr << "REPORTING CLOSED" << std::endl;
                   return true;
               }
               if (bufs.size() > 0) {
                   buf.swap(bufs.front());
                   bufs.erase(bufs.begin());
                   mutex.unlock();
                   return false;
               }
               mutex.unlock();
            }
       }
       void set_closed(bool report=true) {
           mutex.lock();
           closed = true;
           bufs.clear();
           if (!report)
               reported_closed = true;
           mutex.unlock();
           sem.up();
       }
       void set_open(int _other) {
           mutex.lock();
           closed = false;
           reported_closed = false;
           other = _other;
           mutex.unlock();
           sem.up();
       }
       void wait_open(bool closed_val = false){
           while (true) {
               mutex.lock();
               if (closed == closed_val) {
                   mutex.unlock();
                   return;
               }
               mutex.unlock();
               sem.down();
            }
       }

   };

   // The default configuration gives address 127.0.0.1 and port port_base + id.

    void tcp_config::get(int id, unsigned long &inetaddr, unsigned long &inetport) {
ifdef _WIN32
            inetaddr = ntohl(inet_addr("127.0.0.1")); // can't send to INADDR_ANY in windows
else
            inetaddr = INADDR_ANY;
endif
            inetport = `port_base`+ id;
    }

    // This reverses the default configuration's map. Note, this is a little dangerous
    // since an attacker could cause a bogus id to be returned. For the moment we have
    // no way to know the correct range of endpoint ids.

    int tcp_config::rev(unsigned long inetaddr, unsigned long inetport) {
        return inetport - `port_base`; // don't use this for real, it's vulnerable
    }

    // construct a sockaddr_in for a specified process id using the configuration

    void get_tcp_addr(ivy_class *ivy, int my_id, sockaddr_in &myaddr) {
        memset((char *)&myaddr, 0, sizeof(myaddr));
        unsigned long inetaddr;
        unsigned long inetport;
        ivy->get_tcp_config() -> get(my_id,inetaddr,inetport);
        myaddr.sin_family = AF_INET;
        myaddr.sin_addr.s_addr = htonl(inetaddr);
        myaddr.sin_port = htons(inetport);
    }

    // get the process id of a sockaddr_in using the configuration in reverse

    int get_tcp_id(ivy_class *ivy, const sockaddr_in &myaddr) {
       return ivy->get_tcp_config() -> rev(ntohl(myaddr.sin_addr.s_addr), ntohs(myaddr.sin_port));
    }

    // get a new TCP socket

    int make_tcp_socket() {
        int sock = ::socket(AF_INET, SOCK_STREAM, 0);
        if (sock < 0)
            { std::cerr << "cannot create socket\n"; exit(1); }
        int one = 1;
        if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)) < 0)
            { perror("setsockopt failed"); exit(1); }
        return sock;
    }


    // This structure holds all the callbacks for the endpoint. These are function objects
    // that are called asynchronously.

    struct tcp_callbacks {
        %`handle_accept` acb;
        %`handle_recv` rcb;
        %`handle_fail` fcb;
        %`handle_connected` ccb;
        tcp_callbacks(const %`handle_accept` &acb,
                      const %`handle_recv` &rcb,
                      const %`handle_fail` &fcb,
                      const %`handle_connected` ccb)
            : acb(acb), rcb(rcb), fcb(fcb), ccb(ccb) {}
    };

    // This is a general class for an asynchronous task. These objects are called in a loop
    // by a thread allocated by the runtime. The fdes method returns a file descriptor
    // associated with the object. If fdes returns a negative value, the thread deletes the
    // object and terminates.

    class tcp_task : public reader {
      protected:
        int sock;           // socket associated to this task, or -1 if task complete
        int my_id;          // endpoint id associated to this task
        tcp_callbacks cb;   // callbacks to ivy
        ivy_class *ivy;     // pointer to main ivy object (mainly to get lock)

      public:

        tcp_task(int my_id, int sock, const tcp_callbacks &cb, ivy_class *ivy)
          : my_id(my_id), sock(sock), cb(cb), ivy(ivy) {}

        virtual int fdes() {
            return sock;
        }


    };


    // This task reads messages from a socket and calls the "recv" callback.

    class tcp_reader : public tcp_task {
        std::vector<char> buf;
      public:
        tcp_reader(int my_id, int sock, const tcp_callbacks &cb, ivy_class *ivy)
            : tcp_task(my_id, sock, cb, ivy) {
        }

        // This is called in a loop by the task thread.

        virtual void read() {
//            std::cerr << "RECEIVING\n";

            `pkt` pkt;                      // holds received message
            //ivy_socket_deser ds(sock,buf);  // initializer deserialize with any leftover bytes
            ivy_socket_deser_128 ds(sock,buf);  // initializer deserialize with any leftover bytes
            buf.clear();                    // clear the leftover bytes

            try {
                __deser(ds,pkt);            // read the message
            }

            // If packet has bad syntax, we drop it, close the socket, call the "failed"
            // callback and terminate the task.

            catch (deser_err &){
                if (ds.pos > 0)
                    std::cerr << "BAD PACKET RECEIVED\n";
                else
                    std::cerr << "EOF ON SOCKET\n";
                cb.fcb(sock);
                close(sock);
                sock = -1;
                return;
            }

            // copy the leftover bytes to buf

            buf.resize(ds.inp.size()-ds.pos);
            std::copy(ds.inp.begin()+ds.pos,ds.inp.end(),buf.begin());

            // call the "recv" callback with the received message

            ivy->__lock();
            cb.rcb(sock,pkt);
            ivy->__unlock();
        }
    };


    // This class writes queued packets to a socket. Packets can be added
    // asynchronously to the tail of the queue. If the socket is closed,
    // the queue will be emptied asynchrnonously. When the queue is empty the writer deletes
    // the queue and exits.

    // invariant: if socket is closed, queue is closed

    class tcp_writer : public tcp_task {
        tcp_queue *queue;
        bool connected;
      public:
        tcp_writer(int my_id, int sock, tcp_queue *queue, const tcp_callbacks &cb, ivy_class *ivy)
            : tcp_task(my_id,sock,cb,ivy), queue(queue), connected(false) {
        }

        virtual int fdes() {
            return sock;
        }

        // This is called in a loop by the task thread.

        virtual void read() {

            if (!connected) {

                // if the socket is not connected, wait for the queue to be open,
                // then connect

                queue->wait_open();
                connect();
                return;
            }

            // dequeue a packet to send

            std::vector<char> buf;
            bool qclosed = queue->dequeue_swap(buf);

            // if queue has been closed asynchrononously, close the socket.

            if (qclosed) {
                // std::cerr << "CLOSING " << sock << std::endl;
                ::close(sock);
                connected = false;
                return;
            }

            // try a blocking send

            int bytes = send(sock,&buf[0],buf.size(),MSG_NOSIGNAL);

            // std::cerr << "SENT\n";

            // if not all bytes sent, channel has failed, close the queue

            if (bytes < (int)buf.size())
                fail_close();
        }

        void connect() {

            // Get the address of the other from the configuration

            // std::cerr << "ENTERING CONNECT " << sock << std::endl;

            ivy -> __lock();               // can be asynchronous, so must lock ivy!
            struct sockaddr_in myaddr;
            int other = queue->other;
            get_tcp_addr(ivy,other,myaddr);
            ivy -> __unlock();

            // Call connect to make connection

            // std::cerr << "CONNECTING sock=" << sock << "other=" << other << std::endl;

            int res = ::connect(sock,(sockaddr *)&myaddr,sizeof(myaddr));

            // If successful, call the "connected" callback, else "failed"

            ivy->__lock();
            if (res >= 0) {
                // std::cerr << "CONNECT SUCCEEDED " << sock << std::endl;
                cb.ccb(sock);
                connected = true;
            }
            else {
                // std::cerr << "CONNECT FAILED " << sock << std::endl;
                fail_close();
            }
            ivy->__unlock();

        }

        void fail_close() {
            queue -> set_closed(false);  // close queue synchronously

            // make sure socket is closed before fail callback, since this
            // might open another socket, and we don't want to build up
            // zombie sockets.

            // std::cerr << "CLOSING ON FAILURE " << sock << std::endl;
            ::close(sock);
            cb.fcb(sock);
            connected = false;
        }

    };

    // This task listens for connections on a socket in the background.

    class tcp_listener : public tcp_task {
      public:

        // The constructor creates a socket to listen on.

        tcp_listener(int my_id, const tcp_callbacks &cb, ivy_class *ivy)
            : tcp_task(my_id,0,cb,ivy) {
            sock = make_tcp_socket();
        }

        // The bind method is called by the runtime once, after initialization.
        // This allows us to query the configuration for our address and bind the socket.

        virtual void bind() {
            ivy -> __lock();  // can be asynchronous, so must lock ivy!

            // Get our endpoint address from the configuration
            struct sockaddr_in myaddr;
            get_tcp_addr(ivy,my_id,myaddr);

                    std::cerr << "binding id: " << my_id << " port: " << ntohs(myaddr.sin_port) << std::endl;

            // Bind the socket to our address
            if (::bind(sock, (struct sockaddr *)&myaddr, sizeof(myaddr)) < 0)
                { perror("bind failed"); exit(1); }

            // Start lisetning on the socket
            if (listen(sock,2) < 0)
                { std::cerr << "cannot listen on socket\n"; exit(1); }

            ivy -> __unlock();
        }

        // After binding, the thread calls read in a loop. In this case, we don't read,
        // we try accepting a connection. BUG: We should first call select to wait for a connection
        // to be available, then call accept while holding the ivy lock. This is needed to
        // guarantee the "accepted" appears to occur before "connected" which is required by
        // the the tcp interface specification.

        virtual void read() {
            // std::cerr << "ACCEPTING\n";

            // Call accept to get an incoming connection request. May block.
            sockaddr_in other_addr;
            socklen_t addrlen = sizeof(other_addr);
            int new_sock = accept(sock, (sockaddr *)&other_addr, &addrlen);

            // If this fails, something is very wrong: fail stop.
            if (new_sock < 0)
                { perror("accept failed"); exit(1); }

            // Get the endpoint id of the other from its address.
            int other = get_tcp_id(ivy,other_addr);

            // Run the "accept" callback. Since it's async, we must lock.
            ivy->__lock();
            cb.acb(new_sock,other);
            ivy->__unlock();

            // Install a reader task to read messages from the new socket.
            ivy->install_reader(new tcp_reader(my_id,new_sock,cb,ivy));
        }
    };



>>>
Here we put any new members of the ivy C++ class. If we have allocated a per-instance object, we declared it here anti-quoted. The plugs in the actual member name, which may be any array if this is a parameterized instance.

<<< member

    tcp_listener *`rdr`;             // the listener task
    tcp_callbacks *`cb`;             // the callbacks to ivy
    hash_space::hash_map<int,tcp_queue *> `send_queue`;   // queues of blocked packets, per socket

>>>

<<< member

    tcp_config *the_tcp_config;  // the current configurations

    // Get the current TCP configuration. If none, create a default one.

    tcp_config *get_tcp_config() {
        if (!the_tcp_config)
            the_tcp_config = new tcp_config();
        return the_tcp_config;
    }

    // Set the current TCP configuration. This is called by the runtime environment.

    void set_tcp_config(tcp_config *conf) {
        the_tcp_config = conf;
    }

>>>
Here, we put code to go in the initializer. If this is a parameterized instance, then this code will be run in a loop, so we have to be careful that any initialization of common objects is idempotent.

<<< init

    the_tcp_config = 0;

    // Create the callbacks. In a parameterized instance, this creates
    // one set of callbacks for each endpoint id. When you put an
    // action in anti-quotes it creates a function object (a "thunk")
    // that captures the instance environment, in this case including
    // the instance's endpoint id "me".

    `cb` = new tcp_callbacks(`handle_accept`,`handle_recv`,`handle_fail`,`handle_connected`);

    // Install a listener task for this endpoint. If parameterized, this creates
    // one for each endpoint.

    install_reader(`rdr` = new tcp_listener(`me`,*`cb`,this));

>>>
These actions are handlers for the callbacks. They just insert the endpoint's id and call the corresponding callback action.

    action handle_accept(s:socket, other:addr) = {
        call accept(me,s,other)
    }

    action handle_recv(s:socket,x:pkt) = {
        call recv(me,s,x)
    }

    action handle_fail(s:socket) = {
        call failed(me,s)
    }

    action handle_connected(s:socket) = {
        call connected(me,s)
    }

    object impl = {
These are the implementations of the interface calls. These operations are synchronous.

close the socket

    implement close(s:socket) {
        <<< impure

            // We don't want to close a socket when there is another thread
            // waiting, because the other thread won't know what to do with the
            // error.

            // Instead we shut down the socket and let the other thread close it.
            // If there is a reader thread, it will see EOF and close the socket. If there is
            // on open writer thread, it will close the socket after we close the
            // send queue. If the queue is already closed, closing it has no effect.

            // invariant: if a socket is open there is a reader thread or
            // an open writer thread, but not both.

            // Because of this invariant, the socket will be closed exactly once.

            ::shutdown(s,SHUT_RDWR);

            if (`send_queue`.find(s) != `send_queue`.end())
                `send_queue`[s] -> set_closed();

        >>>
    }
connect creates a socket and then installs a connector task to establish the connection asynchronously.

    implement connect(other:addr) returns (s:socket) {
        <<< impure
            s = make_tcp_socket();
            // std::cerr << "SOCKET " << s << std::endl;

            // create a send queue for this socket, if needed, along with
            // its thread. if the queue exists, it must be closed, so
            // we open it.

            tcp_queue *queue;
            if (`send_queue`.find(s) == `send_queue`.end()) {
                `send_queue`[s] = queue = new tcp_queue(other);
                 install_thread(new tcp_writer(`me`,s,queue,*`cb`,this));
            } else
                `send_queue`[s] -> set_open(other);
        >>>
    }
send serializes the message and calls unix send to send it in the given socket. It returns true if sending was successful. Not, a "successfully" sent packet could still be lost to due congestion. If we want to maintain ordered semantics, we will need to have an additional return code indicating the packet could not be queued (analogous to EAGAIN).

    implement send(s:socket,p:pkt) returns (ok:bool) {
        <<< impure
                //ivy_binary_ser sr;
                ivy_binary_ser_128 sr;
                __ser(sr,p);
//                std::cerr << "SENDING\n";

                // if the send queue for this sock doesn's exist, it isn't open,
                // so the client has vioalted the precondition. we do the bad client
                // the service of not crashing.

                if (`send_queue`.find(s) == `send_queue`.end())
                    ok = true;

                else {
                    // get the send queue, and enqueue the packet, returning false if
                    // the queue is closed.

                    ok = !`send_queue`[s]->enqueue_swap(sr.res);
               }
        >>>
    }
This has to be a trusted isolate, since ivy can't verify C++ code formally.

    }

    trusted isolate iso = this

    attribute test = impl
}