Udp impl
include ip
host : the type of host ids
pkt : the type of messages
ser : the packet serializer
des : the packet deserializer
module udp_impl(host,pkt,ser,des) = {
type socket
<<< header
-
ifndef _WIN32
include
class udp_listener; // class of threads that listen for connections
class udp_callbacks; // class holding callbacks to ivy
// A udp_config maps endpoint ids to IP addresses and ports.
>>>
<<< impl
// get a new UDP socket
int make_udp_socket() {
int sock = ::socket(AF_INET,SOCK_DGRAM, 0); // SOCK_DGRAM SOCK_RAW
if (sock < 0)
{ std::cerr << "cannot create socket\n"; exit(1); }
return sock;
}
// This structure holds all the callbacks for the endpoint. These are function objects
// that are called asynchronously.
struct udp_callbacks {
%`impl.handle_recv` rcb;
udp_callbacks(const %`impl.handle_recv` &rcb)
: rcb(rcb) {}
};
// This is a general class for an asynchronous task. These objects are called in a loop
// by a thread allocated by the runtime. The fdes method returns a file descriptor
// associated with the object. If fdes returns a negative value, the thread deletes the
// object and terminates.
class udp_task : public reader {
protected:
int sock; // socket associated to this task, or -1 if task complete
`host` my_id; // host id associated to this task
udp_callbacks cb; // callbacks to ivy
ivy_class *ivy; // pointer to main ivy object (mainly to get lock)
public:
udp_task(`host` my_id, int sock, const udp_callbacks &cb, ivy_class *ivy)
: my_id(my_id), sock(sock), cb(cb), ivy(ivy) {}
virtual int fdes() {
return sock;
}
};
// This task reads messages from a socket and calls the "recv" callback.
class udp_reader : public udp_task {
std::vector<char> buf;
public:
udp_reader(`host` my_id, int sock, const udp_callbacks &cb, ivy_class *ivy)
: udp_task(my_id, sock, cb, ivy) {
}
// This is called in a loop by the task thread.
virtual void read() {
std::cerr << "RECEIVING start\n";
int len=0;
socklen_t lenlen=4;
if (getsockopt(sock,SOL_SOCKET,SO_RCVBUF,(char *)&len,&lenlen))
if (getsockopt(sock,SOL_SOCKET,SO_RCVBUF,&len,&lenlen))
{ perror("getsockopt failed"); exit(1); }
std::vector<char> buf(len);
int bytes;
sockaddr_in srcaddr;
socklen_t addrlen = sizeof(srcaddr);
if ((bytes = recvfrom(sock,&buf[0],len,0,(sockaddr *)&srcaddr,&addrlen)) < 0)
{ std::cerr << "recvfrom failed\n"; exit(1); }
if (bytes == 0) {
close(sock);
sock = -1; // will cause this thread to exit and reader object to be deleted
return;
}
buf.resize(bytes);
`pkt` pkt;
std::cerr << "RECEIVING __deser\n";
try {
`des` ds(buf);
__deser(ds,pkt);
if (ds.pos < buf.size()) {
//std::cerr << pkt << std::endl;
//std::cerr << ds << std::endl;
std::cerr << "udp impl" << std::endl;
throw deser_err();
}
} catch (deser_err &){
std::cerr << "BAD PACKET RECEIVED\n";
return;
}
`ip.endpoint` src;
src.protocol = `ip.udp`;
src.addr = ntohl(srcaddr.sin_addr.s_addr);
if(src.addr != 0x7f000001) {
src.interface = `ip.ivy`; //TODO
} else {
src.interface = `ip.lo`; //TODO
}
src.port = ntohs(srcaddr.sin_port);
ivy->__lock();
cb.rcb(sock,src,pkt);
ivy->__unlock();
std::cerr << "RECEIVING finsh\n";
}
};
>>>
object impl(me:host) = {
These empty objects are used to hold C++ values.
object cb = {} # struct holding the callbacks
<<< member
udp_callbacks *`cb`; // the callbacks to ivy
bool is_vnet = false;
>>>
<<< init
// Create the callbacks. In a parameterized instance, this creates
// one set of callbacks for each endpoint id. When you put an
// action in anti-quotes it creates a function object (a "thunk")
// that captures the instance environment, in this case including
// the instance's endpoint id "me".
`cb` = new udp_callbacks(`handle_recv`);
>>>
action handle_recv(s:socket,src:ip.endpoint,x:pkt) = {
call recv(me,s,src,x)
}
action open(addr:ip.endpoint) returns (s:socket) = {
<<< impure
s = make_udp_socket();
if(addr.addr != 0x7f000001) {
std::cerr << "open SOCKET " << s << std::endl;
is_vnet = true;
char * dev = strdup("lo"); //TODO
if (addr.interface == `ip.ivy`) {
dev = strdup("ivy");
}
std::cerr << "open SOCKET dev " << dev << std::endl;
/*if(strcmp(dev,"lo") == 0) {
int v = 1;
if (setsockopt(s, SOL_IP, IP_FREEBIND, &v, sizeof(v)) < 0) {
perror("setsockopt: freebind");
exit(EXIT_FAILURE);
}
}*/
if (setsockopt(s, SOL_SOCKET, SO_BINDTODEVICE, dev, strlen(dev)) < 0) {
perror("setsockopt: bind to device");
exit(EXIT_FAILURE);
} //TODO comment for shadow
struct sockaddr_in v_src = {};
v_src.sin_addr.s_addr = htonl(addr.addr); // TODO reversorder
//inet_pton(AF_INET, htonl(addr.addr), &v_src.sin_addr.s_addr);
v_src.sin_port = htons(addr.port);
v_src.sin_family = AF_INET;
int error = 0;
if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &error, sizeof(int)) < 0 || setsockopt(s, SOL_SOCKET, SO_REUSEPORT, &error, sizeof(int)) < 0)
perror("setsockopt(SO_REUSEADDR) failed");
if (bind(s, (struct sockaddr*) &v_src, sizeof(struct sockaddr_in)) != 0) {
char s[100];
sprintf(s, "bind to addr %u", htonl(addr.addr));
perror(s);
exit(EXIT_FAILURE);
}
} else {
std::cerr << "SOCKET " << s << std::endl;
struct sockaddr_in myaddr;
myaddr.sin_family = AF_INET;
myaddr.sin_addr.s_addr = htonl(addr.addr); // inet_addr("10.0.0.1"); //
// myaddr.sin_addr.s_addr = htonl(INADDR_ANY);
myaddr.sin_port = htons(addr.port);
struct sockaddr_in sin;
socklen_t len = sizeof(sin);
if (::getsockname(s, (struct sockaddr *)&sin, &len) == -1)
perror("getsockname");
else
std::cerr << "source port number " << ntohs(sin.sin_port) << std::endl;
std::cerr << "binding client id: " << `me` << " addr: " << ntohl(myaddr.sin_addr.s_addr) << " port: " << ntohs(myaddr.sin_port) << std::endl;
if (::bind(s, (struct sockaddr *)&myaddr, sizeof(myaddr)) != 0)
{ perror("bind failed"); exit(1); }
len = sizeof(sin);
if (::getsockname(s, (struct sockaddr *)&sin, &len) == -1)
perror("getsockname");
else
std::cerr << "source port number " << ntohs(sin.sin_port) << std::endl;
}
install_reader(new udp_reader(`me`,s, *`cb`, this));
>>>
}
action listen(dst:ip.endpoint) returns (s:socket) = {
<<< impure
// Modified code from Tom R.
char *dev = strdup("lo"); //TODO
bool free = false;
char opt;
int port;
if (dst.interface == `ip.ivy`) {
dev = strdup("ivy");
}
int l = strlen(dev);
s = socket(AF_INET, SOCK_DGRAM,0);
std::cerr << "listen SOCKET " << s << std::endl;
std::cerr << "listen SOCKET dev " << dev << std::endl;
if (s <= 0) {
printf("socket: socket\n");
exit(EXIT_FAILURE);
}
int v = 1;
if (setsockopt(s, SOL_IP, IP_FREEBIND, &v, sizeof(v)) < 0) {
perror("setsockopt: freebind");
exit(EXIT_FAILURE);
}
if (setsockopt(s, SOL_SOCKET, SO_BINDTODEVICE, dev, l) < 0) {
char s[100];
sprintf(s, "setsockopt: bind to device %s", dev);
perror(s);
exit(EXIT_FAILURE);
}
int error = 0;
if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &error, sizeof(int)) < 0 || setsockopt(s, SOL_SOCKET, SO_REUSEPORT, &error, sizeof(int)) < 0)
perror("setsockopt(SO_REUSEADDR) failed");
struct sockaddr_in v_dst = {};
v_dst.sin_addr.s_addr = htonl(dst.addr);
//inet_pton(AF_INET, ip, &v_dst.sin_addr.s_addr);
v_dst.sin_port = htons(dst.port);
v_dst.sin_family = AF_INET;
if (bind(s, (struct sockaddr*) &v_dst, sizeof(struct sockaddr_in)) != 0) {
char s[100];
sprintf(s, "bind to addr %u", htonl(dst.addr));
perror(s);
exit(EXIT_FAILURE);
}
/*
char buf[100];
int len = 100;
if (recv(s, buf, len, 0) == -1) {
perror("recv");
exit(EXIT_FAILURE);
}
printf("%s: %s\n", dev, buf);
*/
install_reader(new udp_reader(`me`,s, *`cb`, this));
>>>
}
}
close the socket
action close(me:host,s:socket) = {
<<< impure
// We don't want to close a socket when there is another thread
// waiting, because the other thread won't know what to do with the
// error.
// Instead we shut down the socket and let the other thread close it.
// If there is a reader thread, it will see EOF and close the socket. If there is
// on open writer thread, it will close the socket after we close the
// send queue. If the queue is already closed, closing it has no effect.
// invariant: if a socket is open there is a reader thread or
// an open writer thread, but not both.
// Because of this invariant, the socket will be closed exactly once.
::shutdown(s,SHUT_RDWR);
>>>
}
action open(me:host,addr:ip.endpoint) returns (s:socket) = {
s := impl.open(me,addr)
}
action listen(me:host,addr:ip.endpoint) returns (s:socket) = {
s := impl.listen(me,addr)
}
action send(me:host,s:socket,dst:ip.endpoint,x:pkt) = {
<<< impure
/*if(is_vnet) {
struct sockaddr_in v_dst = {};
v_dst.sin_addr.s_addr = htonl(addr.addr);
//inet_pton(AF_INET, htonl(dst.addr), &v_dst.sin_addr.s_addr);
v_dst.sin_port = htons(dst.port);
v_dst.sin_family = AF_INET;
std::cerr << "sending from socket: " << s << std::endl;
if (sendto(s, buf, len, 0, (struct sockaddr*)&v_dst, sizeof(struct sockaddr_in)) != len) {
char s[100];
sprintf(s, "sendto %s", buf);
perror(s);
exit(EXIT_FAILURE);
}
} else {*/
struct sockaddr_in dstaddr;
dstaddr.sin_family = AF_INET;
dstaddr.sin_addr.s_addr = htonl(dst.addr);
//dstaddr.sin_addr.s_addr = htonl(INADDR_ANY);
dstaddr.sin_port = htons(dst.port);
std::cerr << "sending from socket: " << s << std::endl;
struct sockaddr_in sin;
socklen_t len = sizeof(sin);
if (::getsockname(s, (struct sockaddr *)&sin, &len) == -1)
perror("getsockname");
else
std::cerr << "port number " << ntohs(sin.sin_port) << std::endl;
std::cerr << "sending to id: " << x << std::endl;
std::cerr << "sending to id: " << me << " addr: " << ntohl(dstaddr.sin_addr.s_addr) << " port: " << ntohs(dstaddr.sin_port) << std::endl;
`ser` sr;
__ser(sr,x);
if (::getsockname(s, (struct sockaddr *)&sin, &len) == -1)
perror("getsockname");
else
std::cerr << "port number " << ntohs(sin.sin_port) << std::endl;
int error = 0;
socklen_t lsen = sizeof (error);
int retval = ::getsockopt (s, SOL_SOCKET, SO_ERROR, &error, &lsen);
if (retval != 0) {
/* there was a problem getting the error code */
std::cerr << "error getting socket error code: " << strerror(retval) << std::endl;
return;
}
if (error != 0) {
/* socket has a non zero error status */
std::cerr << "socket error: " << strerror(error) << std::endl;
}
if (::sendto(s,&sr.res[0],sr.res.size(),0,(struct sockaddr *)&dstaddr,sizeof(dstaddr)) < 0)
if (::sendto(s,&sr.res[0],sr.res.size(),0,(struct sockaddr *)&dstaddr,sizeof(dstaddr)) < 0)
if (::sendto(s,&sr.res[0],sr.res.size(),0,(struct sockaddr *)&dstaddr,sizeof(dstaddr)) < 0)
{ std::cerr << "sendto failed " << WSAGetLastError() << "\n"; exit(1); }
{ perror("sendto failed"); exit(1); }
error = 0;
lsen = sizeof (error);
retval = ::getsockopt (s, SOL_SOCKET, SO_ERROR, &error, &lsen);
if (retval != 0) {
/* there was a problem getting the error code */
std::cerr << "error getting socket error code: " << strerror(retval) << std::endl;
return;
}
if (error != 0) {
/* socket has a non zero error status */
std::cerr << "socket error: " << strerror(error) << std::endl;
}
std::cerr << "sending id finish" << std::endl;
//}
>>>
}
action recv(me:host,s:socket,src:ip.endpoint,x:pkt)
trusted isolate iso = this
attribute test = impl
}