Udp impl

include ip
This is an implementation of a generic UDP endpoint. It allows a host to open a socket and to send and receive packets on it. The network is unreliable and allows packet duplication. Parameters are:

host : the type of host ids
pkt  : the type of messages
ser  : the packet serializer
des  : the packet deserializer

module udp_impl(host,pkt,ser,des) = {
The type of socket descriptors

    type socket
This code goes in the C++ header file, ahead of the ivy object declaration. Here, we put declarations (perhaps forward) of any auxiliary classes we need). We need to be careful that the names of these don't clash with other modules. However, duplicates are removed, so we don't have to worry about multiple instances of this module clashing.

<<< header
include ifndef _WIN32 include include include endif

    class udp_listener;   // class of threads that listen for connections
    class udp_callbacks;  // class holding callbacks to ivy

    // A udp_config maps endpoint ids to IP addresses and ports.




>>>
This code goes in the C++ implementation file. Here, we put implementations of the classes declared in the header, and auxiliary functions.

<<< impl


    // get a new UDP socket

    int make_udp_socket() {
        int sock = ::socket(AF_INET,SOCK_DGRAM, 0); // SOCK_DGRAM SOCK_RAW
        if (sock < 0)
            { std::cerr << "cannot create socket\n"; exit(1); }
        return sock;
    }


    // This structure holds all the callbacks for the endpoint. These are function objects
    // that are called asynchronously.

    struct udp_callbacks {
        %`impl.handle_recv` rcb;
        udp_callbacks(const %`impl.handle_recv` &rcb)
            : rcb(rcb) {}
    };

    // This is a general class for an asynchronous task. These objects are called in a loop
    // by a thread allocated by the runtime. The fdes method returns a file descriptor
    // associated with the object. If fdes returns a negative value, the thread deletes the
    // object and terminates.

    class udp_task : public reader {
      protected:
        int sock;           // socket associated to this task, or -1 if task complete
        `host` my_id;       // host id associated to this task
        udp_callbacks cb;   // callbacks to ivy
        ivy_class *ivy;     // pointer to main ivy object (mainly to get lock)

      public:

        udp_task(`host` my_id, int sock, const udp_callbacks &cb, ivy_class *ivy)
          : my_id(my_id), sock(sock), cb(cb), ivy(ivy) {}

        virtual int fdes() {
            return sock;
        }

    };


    // This task reads messages from a socket and calls the "recv" callback.

    class udp_reader : public udp_task {
        std::vector<char> buf;
      public:
        udp_reader(`host` my_id, int sock, const udp_callbacks &cb, ivy_class *ivy)
            : udp_task(my_id, sock, cb, ivy) {
        }

        // This is called in a loop by the task thread.

        virtual void read() {
            std::cerr << "RECEIVING start\n";

            int len=0;
            socklen_t lenlen=4;
ifdef _WIN32
            if (getsockopt(sock,SOL_SOCKET,SO_RCVBUF,(char *)&len,&lenlen))
else
            if (getsockopt(sock,SOL_SOCKET,SO_RCVBUF,&len,&lenlen))
endif
            { perror("getsockopt failed"); exit(1); }
            std::vector<char> buf(len);
            int bytes;
            sockaddr_in srcaddr;
            socklen_t addrlen = sizeof(srcaddr);
            if ((bytes = recvfrom(sock,&buf[0],len,0,(sockaddr *)&srcaddr,&addrlen)) < 0)
            { std::cerr << "recvfrom failed\n"; exit(1); }
            if (bytes == 0) {
                close(sock);
                sock = -1;  // will cause this thread to exit and reader object to be deleted
                return;
            }
            buf.resize(bytes);
            `pkt` pkt;
            std::cerr << "RECEIVING __deser\n";
            try {
                `des` ds(buf);
                __deser(ds,pkt);
                if (ds.pos < buf.size()) {
                    //std::cerr << pkt << std::endl;
                    //std::cerr << ds << std::endl;
                    std::cerr << "udp impl" << std::endl;
                    throw deser_err();
                }
            } catch (deser_err &){
                std::cerr << "BAD PACKET RECEIVED\n";
                return;
            }
            `ip.endpoint` src;
            src.protocol = `ip.udp`;
            src.addr = ntohl(srcaddr.sin_addr.s_addr);
            if(src.addr != 0x7f000001) {
                src.interface = `ip.ivy`; //TODO
            } else {
                src.interface = `ip.lo`; //TODO
            }
            src.port = ntohs(srcaddr.sin_port);
            ivy->__lock();
            cb.rcb(sock,src,pkt);
            ivy->__unlock();
            std::cerr << "RECEIVING finsh\n";
        }

    };


>>>

object impl(me:host) = {
Here we put any new members of the ivy C++ class. If we have allocated a per-instance object, we declared it here anti-quoted. The plugs in the actual member name, which may be any array if this is a parameterized instance.

These empty objects are used to hold C++ values.

    object cb = {}          # struct holding the callbacks

    <<< member

    udp_callbacks *`cb`;             // the callbacks to ivy
    bool is_vnet = false;

    >>>
Here, we put code to go in the initializer. If this is a parameterized instance, then this code will be run in a loop, so we have to be careful that any initialization of common objects is idempotent.

    <<< init

    // Create the callbacks. In a parameterized instance, this creates
    // one set of callbacks for each endpoint id. When you put an
    // action in anti-quotes it creates a function object (a "thunk")
    // that captures the instance environment, in this case including
    // the instance's endpoint id "me".

    `cb` = new udp_callbacks(`handle_recv`);

    >>>
These actions are handlers for the callbacks. They just insert the endpoint's id and call the corresponding callback action.

    action handle_recv(s:socket,src:ip.endpoint,x:pkt) = {
        call recv(me,s,src,x)
    }
open creates a socket and binds it to the requested endpoint.

    action open(addr:ip.endpoint) returns (s:socket) = {
        <<< impure
        s = make_udp_socket();
        if(addr.addr != 0x7f000001) {
            std::cerr << "open SOCKET " << s << std::endl;
            is_vnet = true;
            char * dev = strdup("lo"); //TODO
            if (addr.interface == `ip.ivy`) {
                dev = strdup("ivy");
            }
            std::cerr << "open SOCKET dev " << dev << std::endl;
            /*if(strcmp(dev,"lo") == 0) {
                int v = 1;
                if (setsockopt(s, SOL_IP, IP_FREEBIND, &v, sizeof(v)) < 0) {
                    perror("setsockopt: freebind");
                    exit(EXIT_FAILURE);
                }
            }*/
            if (setsockopt(s, SOL_SOCKET, SO_BINDTODEVICE, dev, strlen(dev)) < 0) {
                perror("setsockopt: bind to device");
                exit(EXIT_FAILURE);
            } //TODO comment for shadow
            struct sockaddr_in v_src = {};
            v_src.sin_addr.s_addr = htonl(addr.addr); // TODO reversorder
            //inet_pton(AF_INET, htonl(addr.addr), &v_src.sin_addr.s_addr);
            v_src.sin_port = htons(addr.port);
            v_src.sin_family = AF_INET;

            int error = 0;
            if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &error, sizeof(int)) < 0 || setsockopt(s, SOL_SOCKET, SO_REUSEPORT, &error, sizeof(int)) < 0)
                perror("setsockopt(SO_REUSEADDR) failed");

            if (bind(s, (struct sockaddr*) &v_src, sizeof(struct sockaddr_in)) != 0) {
                char s[100];
                sprintf(s, "bind to addr %u", htonl(addr.addr));
                perror(s);
                exit(EXIT_FAILURE);
            }
        } else {
            std::cerr << "SOCKET " << s << std::endl;
            struct sockaddr_in myaddr;
            myaddr.sin_family = AF_INET;
            myaddr.sin_addr.s_addr = htonl(addr.addr); // inet_addr("10.0.0.1"); //
            // myaddr.sin_addr.s_addr = htonl(INADDR_ANY);
            myaddr.sin_port = htons(addr.port);

            struct sockaddr_in sin;
            socklen_t len = sizeof(sin);
            if (::getsockname(s, (struct sockaddr *)&sin, &len) == -1)
                perror("getsockname");
            else
                std::cerr <<  "source port number " << ntohs(sin.sin_port)  << std::endl;

            std::cerr << "binding client id: " << `me` << " addr: " << ntohl(myaddr.sin_addr.s_addr) << " port: " << ntohs(myaddr.sin_port) << std::endl;
            if (::bind(s, (struct sockaddr *)&myaddr, sizeof(myaddr)) != 0)
                { perror("bind failed"); exit(1); }

            len = sizeof(sin);
            if (::getsockname(s, (struct sockaddr *)&sin, &len) == -1)
                perror("getsockname");
            else
                std::cerr <<  "source port number " << ntohs(sin.sin_port)  << std::endl;
        }
        install_reader(new udp_reader(`me`,s, *`cb`, this));
        >>>
    }

    action listen(dst:ip.endpoint) returns (s:socket) = {
        <<< impure
            // Modified code from Tom R.
            char *dev = strdup("lo"); //TODO
            bool free = false;
            char opt;
            int port;

            if (dst.interface == `ip.ivy`) {
                dev = strdup("ivy");
            }
            int l = strlen(dev);

            s = socket(AF_INET, SOCK_DGRAM,0);
            std::cerr << "listen SOCKET " << s << std::endl;
            std::cerr << "listen SOCKET dev " << dev << std::endl;
            if (s <= 0) {
                printf("socket: socket\n");
                exit(EXIT_FAILURE);
            }
            int v = 1;
            if (setsockopt(s, SOL_IP, IP_FREEBIND, &v, sizeof(v)) < 0) {
                perror("setsockopt: freebind");
                exit(EXIT_FAILURE);
            }
            if (setsockopt(s, SOL_SOCKET, SO_BINDTODEVICE, dev, l) < 0) {
                char s[100];
                sprintf(s, "setsockopt: bind to device %s", dev);
                perror(s);
                exit(EXIT_FAILURE);
            }

            int error = 0;
            if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR,  &error, sizeof(int)) < 0 || setsockopt(s, SOL_SOCKET, SO_REUSEPORT,  &error, sizeof(int)) < 0)
                perror("setsockopt(SO_REUSEADDR) failed");

            struct sockaddr_in v_dst = {};
            v_dst.sin_addr.s_addr = htonl(dst.addr);
            //inet_pton(AF_INET, ip, &v_dst.sin_addr.s_addr);
            v_dst.sin_port = htons(dst.port);
            v_dst.sin_family = AF_INET;

            if (bind(s, (struct sockaddr*) &v_dst, sizeof(struct sockaddr_in)) != 0) {
                    char s[100];
                    sprintf(s, "bind to addr %u", htonl(dst.addr));
                    perror(s);
                exit(EXIT_FAILURE);
            }


            /*
            char buf[100];
            int len = 100;
            if (recv(s, buf, len, 0) == -1) {
                perror("recv");
                exit(EXIT_FAILURE);
            }
            printf("%s: %s\n", dev, buf);
            */

            install_reader(new udp_reader(`me`,s, *`cb`, this));

        >>>
    }

    }
These are the implementations of the interface calls. These operations are synchronous.

close the socket

    action close(me:host,s:socket) = {
    <<< impure

        // We don't want to close a socket when there is another thread
        // waiting, because the other thread won't know what to do with the
        // error.

        // Instead we shut down the socket and let the other thread close it.
        // If there is a reader thread, it will see EOF and close the socket. If there is
        // on open writer thread, it will close the socket after we close the
        // send queue. If the queue is already closed, closing it has no effect.

        // invariant: if a socket is open there is a reader thread or
        // an open writer thread, but not both.

        // Because of this invariant, the socket will be closed exactly once.

        ::shutdown(s,SHUT_RDWR);

    >>>
    }
open creates a socket and binds it to the requested endpoint.

    action open(me:host,addr:ip.endpoint) returns (s:socket) = {
        s := impl.open(me,addr)
    }

    action listen(me:host,addr:ip.endpoint) returns (s:socket) = {
        s := impl.listen(me,addr)
    }
send transmits a packet synchronously.

    action send(me:host,s:socket,dst:ip.endpoint,x:pkt) = {
    <<< impure
    /*if(is_vnet) {
        struct sockaddr_in v_dst = {};
        v_dst.sin_addr.s_addr = htonl(addr.addr);
        //inet_pton(AF_INET, htonl(dst.addr), &v_dst.sin_addr.s_addr);
        v_dst.sin_port = htons(dst.port);
        v_dst.sin_family = AF_INET;
        std::cerr << "sending from socket: " << s << std::endl;
        if (sendto(s, buf, len, 0, (struct sockaddr*)&v_dst, sizeof(struct sockaddr_in)) != len) {
            char s[100];
            sprintf(s, "sendto %s", buf);
            perror(s);
            exit(EXIT_FAILURE);
        }
    } else {*/
        struct sockaddr_in dstaddr;
        dstaddr.sin_family = AF_INET;
        dstaddr.sin_addr.s_addr = htonl(dst.addr);
        //dstaddr.sin_addr.s_addr = htonl(INADDR_ANY);
        dstaddr.sin_port = htons(dst.port);
        std::cerr << "sending from socket: " << s << std::endl;

        struct sockaddr_in sin;
        socklen_t len = sizeof(sin);
        if (::getsockname(s, (struct sockaddr *)&sin, &len) == -1)
            perror("getsockname");
        else
            std::cerr <<  "port number " << ntohs(sin.sin_port)  << std::endl;
        std::cerr << "sending to id: " << x << std::endl;
        std::cerr << "sending to id: " << me << " addr: " << ntohl(dstaddr.sin_addr.s_addr) << " port: " << ntohs(dstaddr.sin_port) << std::endl;
        `ser` sr;
        __ser(sr,x);
        if (::getsockname(s, (struct sockaddr *)&sin, &len) == -1)
            perror("getsockname");
        else
            std::cerr <<  "port number " << ntohs(sin.sin_port)  << std::endl;


        int error = 0;
        socklen_t lsen = sizeof (error);
        int retval = ::getsockopt (s, SOL_SOCKET, SO_ERROR, &error, &lsen);

        if (retval != 0) {
            /* there was a problem getting the error code */
            std::cerr << "error getting socket error code: " << strerror(retval)  << std::endl;
            return;
        }

        if (error != 0) {
            /* socket has a non zero error status */
            std::cerr <<  "socket error: " << strerror(error)  << std::endl;
        }

        if (::sendto(s,&sr.res[0],sr.res.size(),0,(struct sockaddr *)&dstaddr,sizeof(dstaddr)) < 0)
    if (::sendto(s,&sr.res[0],sr.res.size(),0,(struct sockaddr *)&dstaddr,sizeof(dstaddr)) < 0)
        if (::sendto(s,&sr.res[0],sr.res.size(),0,(struct sockaddr *)&dstaddr,sizeof(dstaddr)) < 0)
ifdef _WIN32
                { std::cerr << "sendto failed " << WSAGetLastError() << "\n"; exit(1); }
else
                { perror("sendto failed"); exit(1); }
endif
        error = 0;
        lsen = sizeof (error);
        retval = ::getsockopt (s, SOL_SOCKET, SO_ERROR, &error, &lsen);

        if (retval != 0) {
            /* there was a problem getting the error code */
            std::cerr << "error getting socket error code: " << strerror(retval)  << std::endl;
            return;
        }

        if (error != 0) {
            /* socket has a non zero error status */
            std::cerr <<  "socket error: " << strerror(error)  << std::endl;
        }

        std::cerr << "sending id finish" << std::endl;
    //}

    >>>

    }
callback on reception of message, to be implemented by user.

    action recv(me:host,s:socket,src:ip.endpoint,x:pkt)

    trusted isolate iso = this

    attribute test = impl
}