Tcp impl
This is an implementation of a generic TCP endpoint. It allows you to create point-to-point, non-duplicating, ordered connections that transfer sequences of messages. The parameters are:
addr : the type of endpoint ids
pkt : the type of messages
me : the id of this endpoint
port_base : the default port of endpoint 0
If the environment does not set up a configuration, the the endpoint has IP address 127.0.0.1 and port number port_base + me.
module tcp_impl(addr,pkt,me,port_base) = {
object rdr = {} # the listener object
object cb = {} # struct holding the callbacks
object send_queue = {} # queues of outgoing packets
<<< header
-
include
class tcp_listener; // class of threads that listen for connections
class tcp_callbacks; // class holding callbacks to ivy
// A tcp_config maps endpoint ids to IP addresses and ports.
class tcp_config {
public:
// get the address and port from the endpoint id
virtual void get(int id, unsigned long &inetaddr, unsigned long &inetport);
// get the endpoint id from the address and port
virtual int rev(unsigned long inetaddr, unsigned long inetport);
};
class tcp_queue;
>>>
<<< impl
// Maximum number of sent packets to queue on a channel. Because TCP also
// buffers, the total number of untransmitted backets that can back up will be greater
// than this. This number *must* be at least one to void packet corruption.
struct tcp_mutex {
HANDLE mutex;
tcp_mutex() { mutex = CreateMutex(NULL,FALSE,NULL); }
void lock() { WaitForSingleObject(mutex,INFINITE); }
void unlock() { ReleaseMutex(mutex); }
pthread_mutex_t mutex;
tcp_mutex() { pthread_mutex_init(&mutex,NULL); }
void lock() { pthread_mutex_lock(&mutex); }
void unlock() { pthread_mutex_unlock(&mutex); }
};
struct tcp_sem {
sem_t sem;
tcp_sem() { sem_init(&sem,0,0); }
void up() {sem_post(&sem); }
void down() {sem_wait(&sem);}
};
class tcp_queue {
tcp_mutex mutex;
tcp_sem sem;
bool closed;
bool reported_closed;
std::list<std::vector<char> > bufs;
public:
int other; // only acces while holding lock!
tcp_queue(int other) : closed(false), reported_closed(false), other(other) {}
bool enqueue_swap(std::vector<char> &buf) {
mutex.lock();
if (closed) {
mutex.unlock();
return true;
}
if (bufs.size() < MAX_TCP_SEND_QUEUE) {
bufs.push_back(std::vector<char>());
buf.swap(bufs.back());
}
mutex.unlock();
sem.up();
return false;
}
bool dequeue_swap(std::vector<char> &buf) {
while(true) {
sem.down();
// std::cerr << "DEQUEUEING" << closed << std::endl;
mutex.lock();
if (closed) {
if (reported_closed) {
mutex.unlock();
continue;
}
reported_closed = true;
mutex.unlock();
// std::cerr << "REPORTING CLOSED" << std::endl;
return true;
}
if (bufs.size() > 0) {
buf.swap(bufs.front());
bufs.erase(bufs.begin());
mutex.unlock();
return false;
}
mutex.unlock();
}
}
void set_closed(bool report=true) {
mutex.lock();
closed = true;
bufs.clear();
if (!report)
reported_closed = true;
mutex.unlock();
sem.up();
}
void set_open(int _other) {
mutex.lock();
closed = false;
reported_closed = false;
other = _other;
mutex.unlock();
sem.up();
}
void wait_open(bool closed_val = false){
while (true) {
mutex.lock();
if (closed == closed_val) {
mutex.unlock();
return;
}
mutex.unlock();
sem.down();
}
}
};
// The default configuration gives address 127.0.0.1 and port port_base + id.
void tcp_config::get(int id, unsigned long &inetaddr, unsigned long &inetport) {
inetaddr = ntohl(inet_addr("127.0.0.1")); // can't send to INADDR_ANY in windows
inetaddr = INADDR_ANY;
inetport = `port_base`+ id;
}
// This reverses the default configuration's map. Note, this is a little dangerous
// since an attacker could cause a bogus id to be returned. For the moment we have
// no way to know the correct range of endpoint ids.
int tcp_config::rev(unsigned long inetaddr, unsigned long inetport) {
return inetport - `port_base`; // don't use this for real, it's vulnerable
}
// construct a sockaddr_in for a specified process id using the configuration
void get_tcp_addr(ivy_class *ivy, int my_id, sockaddr_in &myaddr) {
memset((char *)&myaddr, 0, sizeof(myaddr));
unsigned long inetaddr;
unsigned long inetport;
ivy->get_tcp_config() -> get(my_id,inetaddr,inetport);
myaddr.sin_family = AF_INET;
myaddr.sin_addr.s_addr = htonl(inetaddr);
myaddr.sin_port = htons(inetport);
}
// get the process id of a sockaddr_in using the configuration in reverse
int get_tcp_id(ivy_class *ivy, const sockaddr_in &myaddr) {
return ivy->get_tcp_config() -> rev(ntohl(myaddr.sin_addr.s_addr), ntohs(myaddr.sin_port));
}
// get a new TCP socket
int make_tcp_socket() {
int sock = ::socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0)
{ std::cerr << "cannot create socket\n"; exit(1); }
int one = 1;
if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)) < 0)
{ perror("setsockopt failed"); exit(1); }
return sock;
}
// This structure holds all the callbacks for the endpoint. These are function objects
// that are called asynchronously.
struct tcp_callbacks {
%`handle_accept` acb;
%`handle_recv` rcb;
%`handle_fail` fcb;
%`handle_connected` ccb;
tcp_callbacks(const %`handle_accept` &acb,
const %`handle_recv` &rcb,
const %`handle_fail` &fcb,
const %`handle_connected` ccb)
: acb(acb), rcb(rcb), fcb(fcb), ccb(ccb) {}
};
// This is a general class for an asynchronous task. These objects are called in a loop
// by a thread allocated by the runtime. The fdes method returns a file descriptor
// associated with the object. If fdes returns a negative value, the thread deletes the
// object and terminates.
class tcp_task : public reader {
protected:
int sock; // socket associated to this task, or -1 if task complete
int my_id; // endpoint id associated to this task
tcp_callbacks cb; // callbacks to ivy
ivy_class *ivy; // pointer to main ivy object (mainly to get lock)
public:
tcp_task(int my_id, int sock, const tcp_callbacks &cb, ivy_class *ivy)
: my_id(my_id), sock(sock), cb(cb), ivy(ivy) {}
virtual int fdes() {
return sock;
}
};
// This task reads messages from a socket and calls the "recv" callback.
class tcp_reader : public tcp_task {
std::vector<char> buf;
public:
tcp_reader(int my_id, int sock, const tcp_callbacks &cb, ivy_class *ivy)
: tcp_task(my_id, sock, cb, ivy) {
}
// This is called in a loop by the task thread.
virtual void read() {
// std::cerr << "RECEIVING\n";
`pkt` pkt; // holds received message
//ivy_socket_deser ds(sock,buf); // initializer deserialize with any leftover bytes
ivy_socket_deser_128 ds(sock,buf); // initializer deserialize with any leftover bytes
buf.clear(); // clear the leftover bytes
try {
__deser(ds,pkt); // read the message
}
// If packet has bad syntax, we drop it, close the socket, call the "failed"
// callback and terminate the task.
catch (deser_err &){
if (ds.pos > 0)
std::cerr << "BAD PACKET RECEIVED\n";
else
std::cerr << "EOF ON SOCKET\n";
cb.fcb(sock);
close(sock);
sock = -1;
return;
}
// copy the leftover bytes to buf
buf.resize(ds.inp.size()-ds.pos);
std::copy(ds.inp.begin()+ds.pos,ds.inp.end(),buf.begin());
// call the "recv" callback with the received message
ivy->__lock();
cb.rcb(sock,pkt);
ivy->__unlock();
}
};
// This class writes queued packets to a socket. Packets can be added
// asynchronously to the tail of the queue. If the socket is closed,
// the queue will be emptied asynchrnonously. When the queue is empty the writer deletes
// the queue and exits.
// invariant: if socket is closed, queue is closed
class tcp_writer : public tcp_task {
tcp_queue *queue;
bool connected;
public:
tcp_writer(int my_id, int sock, tcp_queue *queue, const tcp_callbacks &cb, ivy_class *ivy)
: tcp_task(my_id,sock,cb,ivy), queue(queue), connected(false) {
}
virtual int fdes() {
return sock;
}
// This is called in a loop by the task thread.
virtual void read() {
if (!connected) {
// if the socket is not connected, wait for the queue to be open,
// then connect
queue->wait_open();
connect();
return;
}
// dequeue a packet to send
std::vector<char> buf;
bool qclosed = queue->dequeue_swap(buf);
// if queue has been closed asynchrononously, close the socket.
if (qclosed) {
// std::cerr << "CLOSING " << sock << std::endl;
::close(sock);
connected = false;
return;
}
// try a blocking send
int bytes = send(sock,&buf[0],buf.size(),MSG_NOSIGNAL);
// std::cerr << "SENT\n";
// if not all bytes sent, channel has failed, close the queue
if (bytes < (int)buf.size())
fail_close();
}
void connect() {
// Get the address of the other from the configuration
// std::cerr << "ENTERING CONNECT " << sock << std::endl;
ivy -> __lock(); // can be asynchronous, so must lock ivy!
struct sockaddr_in myaddr;
int other = queue->other;
get_tcp_addr(ivy,other,myaddr);
ivy -> __unlock();
// Call connect to make connection
// std::cerr << "CONNECTING sock=" << sock << "other=" << other << std::endl;
int res = ::connect(sock,(sockaddr *)&myaddr,sizeof(myaddr));
// If successful, call the "connected" callback, else "failed"
ivy->__lock();
if (res >= 0) {
// std::cerr << "CONNECT SUCCEEDED " << sock << std::endl;
cb.ccb(sock);
connected = true;
}
else {
// std::cerr << "CONNECT FAILED " << sock << std::endl;
fail_close();
}
ivy->__unlock();
}
void fail_close() {
queue -> set_closed(false); // close queue synchronously
// make sure socket is closed before fail callback, since this
// might open another socket, and we don't want to build up
// zombie sockets.
// std::cerr << "CLOSING ON FAILURE " << sock << std::endl;
::close(sock);
cb.fcb(sock);
connected = false;
}
};
// This task listens for connections on a socket in the background.
class tcp_listener : public tcp_task {
public:
// The constructor creates a socket to listen on.
tcp_listener(int my_id, const tcp_callbacks &cb, ivy_class *ivy)
: tcp_task(my_id,0,cb,ivy) {
sock = make_tcp_socket();
}
// The bind method is called by the runtime once, after initialization.
// This allows us to query the configuration for our address and bind the socket.
virtual void bind() {
ivy -> __lock(); // can be asynchronous, so must lock ivy!
// Get our endpoint address from the configuration
struct sockaddr_in myaddr;
get_tcp_addr(ivy,my_id,myaddr);
std::cerr << "binding id: " << my_id << " port: " << ntohs(myaddr.sin_port) << std::endl;
// Bind the socket to our address
if (::bind(sock, (struct sockaddr *)&myaddr, sizeof(myaddr)) < 0)
{ perror("bind failed"); exit(1); }
// Start lisetning on the socket
if (listen(sock,2) < 0)
{ std::cerr << "cannot listen on socket\n"; exit(1); }
ivy -> __unlock();
}
// After binding, the thread calls read in a loop. In this case, we don't read,
// we try accepting a connection. BUG: We should first call select to wait for a connection
// to be available, then call accept while holding the ivy lock. This is needed to
// guarantee the "accepted" appears to occur before "connected" which is required by
// the the tcp interface specification.
virtual void read() {
// std::cerr << "ACCEPTING\n";
// Call accept to get an incoming connection request. May block.
sockaddr_in other_addr;
socklen_t addrlen = sizeof(other_addr);
int new_sock = accept(sock, (sockaddr *)&other_addr, &addrlen);
// If this fails, something is very wrong: fail stop.
if (new_sock < 0)
{ perror("accept failed"); exit(1); }
// Get the endpoint id of the other from its address.
int other = get_tcp_id(ivy,other_addr);
// Run the "accept" callback. Since it's async, we must lock.
ivy->__lock();
cb.acb(new_sock,other);
ivy->__unlock();
// Install a reader task to read messages from the new socket.
ivy->install_reader(new tcp_reader(my_id,new_sock,cb,ivy));
}
};
>>>
<<< member
tcp_listener *`rdr`; // the listener task
tcp_callbacks *`cb`; // the callbacks to ivy
hash_space::hash_map<int,tcp_queue *> `send_queue`; // queues of blocked packets, per socket
>>>
<<< member
tcp_config *the_tcp_config; // the current configurations
// Get the current TCP configuration. If none, create a default one.
tcp_config *get_tcp_config() {
if (!the_tcp_config)
the_tcp_config = new tcp_config();
return the_tcp_config;
}
// Set the current TCP configuration. This is called by the runtime environment.
void set_tcp_config(tcp_config *conf) {
the_tcp_config = conf;
}
>>>
<<< init
the_tcp_config = 0;
// Create the callbacks. In a parameterized instance, this creates
// one set of callbacks for each endpoint id. When you put an
// action in anti-quotes it creates a function object (a "thunk")
// that captures the instance environment, in this case including
// the instance's endpoint id "me".
`cb` = new tcp_callbacks(`handle_accept`,`handle_recv`,`handle_fail`,`handle_connected`);
// Install a listener task for this endpoint. If parameterized, this creates
// one for each endpoint.
install_reader(`rdr` = new tcp_listener(`me`,*`cb`,this));
>>>
action handle_accept(s:socket, other:addr) = {
call accept(me,s,other)
}
action handle_recv(s:socket,x:pkt) = {
call recv(me,s,x)
}
action handle_fail(s:socket) = {
call failed(me,s)
}
action handle_connected(s:socket) = {
call connected(me,s)
}
object impl = {
close the socket
implement close(s:socket) {
<<< impure
// We don't want to close a socket when there is another thread
// waiting, because the other thread won't know what to do with the
// error.
// Instead we shut down the socket and let the other thread close it.
// If there is a reader thread, it will see EOF and close the socket. If there is
// on open writer thread, it will close the socket after we close the
// send queue. If the queue is already closed, closing it has no effect.
// invariant: if a socket is open there is a reader thread or
// an open writer thread, but not both.
// Because of this invariant, the socket will be closed exactly once.
::shutdown(s,SHUT_RDWR);
if (`send_queue`.find(s) != `send_queue`.end())
`send_queue`[s] -> set_closed();
>>>
}
implement connect(other:addr) returns (s:socket) {
<<< impure
s = make_tcp_socket();
// std::cerr << "SOCKET " << s << std::endl;
// create a send queue for this socket, if needed, along with
// its thread. if the queue exists, it must be closed, so
// we open it.
tcp_queue *queue;
if (`send_queue`.find(s) == `send_queue`.end()) {
`send_queue`[s] = queue = new tcp_queue(other);
install_thread(new tcp_writer(`me`,s,queue,*`cb`,this));
} else
`send_queue`[s] -> set_open(other);
>>>
}
implement send(s:socket,p:pkt) returns (ok:bool) {
<<< impure
//ivy_binary_ser sr;
ivy_binary_ser_128 sr;
__ser(sr,p);
// std::cerr << "SENDING\n";
// if the send queue for this sock doesn's exist, it isn't open,
// so the client has vioalted the precondition. we do the bad client
// the service of not crashing.
if (`send_queue`.find(s) == `send_queue`.end())
ok = true;
else {
// get the send queue, and enqueue the packet, returning false if
// the queue is closed.
ok = !`send_queue`[s]->enqueue_swap(sr.res);
}
>>>
}
}
trusted isolate iso = this
attribute test = impl
}