Tcp host
This is an implementation of a generic TCP endpoint. It allows you to create point-to-point, non-duplicating, ordered connections that transfer sequences of messages. The parameters are:
endpoint : the type of UDP endpoints
data : the type of data streams
The endpoint type should be a struct with fields addr (the IP
address) and port. The data type should be arrays of bytes.
module tcp_host(endpoint,data) = {
type socket
object rdr = {} # the listener object
object cb = {} # struct holding the callbacks
object send_queue = {} # queues of outgoing packets
<<< header
-
include
class tcp_listener; // class of threads that listen for connections
class tcp_callbacks; // class holding callbacks to ivy
class tcp_queue;
>>>
<<< impl
// Maximum number of sent packets to queue on a channel. Because TCP also
// buffers, the total number of untransmitted backets that can back up will be greater
// than this. This number *must* be at least one to void packet corruption.
struct tcp_mutex {
HANDLE mutex;
tcp_mutex() { mutex = CreateMutex(NULL,FALSE,NULL); }
void lock() { WaitForSingleObject(mutex,INFINITE); }
void unlock() { ReleaseMutex(mutex); }
pthread_mutex_t mutex;
tcp_mutex() { pthread_mutex_init(&mutex,NULL); }
void lock() { pthread_mutex_lock(&mutex); }
void unlock() { pthread_mutex_unlock(&mutex); }
};
struct tcp_sem {
sem_t sem;
tcp_sem() { sem_init(&sem,0,0); }
void up() {sem_post(&sem); }
void down() {sem_wait(&sem);}
};
class tcp_queue {
tcp_mutex mutex;
tcp_sem sem;
bool closed;
bool reported_closed;
std::list<std::vector<char> > bufs;
public:
`endpoint` other; // only acces while holding lock!
tcp_queue(`endpoint` other) : closed(false), reported_closed(false), other(other) {}
bool enqueue_swap(std::vector<char> &buf) {
mutex.lock();
if (closed) {
mutex.unlock();
return true;
}
if (bufs.size() < MAX_TCP_SEND_QUEUE) {
bufs.push_back(std::vector<char>());
buf.swap(bufs.back());
}
mutex.unlock();
sem.up();
return false;
}
bool dequeue_swap(std::vector<char> &buf) {
while(true) {
sem.down();
// std::cerr << "DEQUEUEING" << closed << std::endl;
mutex.lock();
if (closed) {
if (reported_closed) {
mutex.unlock();
continue;
}
reported_closed = true;
mutex.unlock();
// std::cerr << "REPORTING CLOSED" << std::endl;
return true;
}
if (bufs.size() > 0) {
buf.swap(bufs.front());
bufs.erase(bufs.begin());
mutex.unlock();
return false;
}
mutex.unlock();
}
}
void set_closed(bool report=true) {
mutex.lock();
closed = true;
bufs.clear();
if (!report)
reported_closed = true;
mutex.unlock();
sem.up();
}
void set_open(`endpoint` _other) {
mutex.lock();
closed = false;
reported_closed = false;
other = _other;
mutex.unlock();
sem.up();
}
void wait_open(bool closed_val = false){
while (true) {
mutex.lock();
if (closed == closed_val) {
mutex.unlock();
return;
}
mutex.unlock();
sem.down();
}
}
};
// construct a sockaddr_in for a specified process id using the configuration
void get_tcp_addr(ivy_class *ivy, `endpoint` addr, sockaddr_in &myaddr) {
memset((char *)&myaddr, 0, sizeof(myaddr));
myaddr.sin_family = AF_INET;
myaddr.sin_addr.s_addr = htonl(addr.addr);
myaddr.sin_port = htons(addr.port);
}
// get a new TCP socket
int make_tcp_socket() {
int sock = ::socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0)
{ std::cerr << "cannot create socket\n"; exit(1); }
int one = 1;
if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)) < 0)
{ perror("setsockopt failed"); exit(1); }
return sock;
}
// This structure holds all the callbacks for the endpoint. These are function objects
// that are called asynchronously.
struct tcp_callbacks {
%`handle_accept` acb;
%`handle_recv` rcb;
%`handle_fail` fcb;
%`handle_connected` ccb;
tcp_callbacks(const %`handle_accept` &acb,
const %`handle_recv` &rcb,
const %`handle_fail` &fcb,
const %`handle_connected` ccb)
: acb(acb), rcb(rcb), fcb(fcb), ccb(ccb) {}
};
// This is a general class for an asynchronous task. These objects are called in a loop
// by a thread allocated by the runtime. The fdes method returns a file descriptor
// associated with the object. If fdes returns a negative value, the thread deletes the
// object and terminates.
class tcp_task : public reader {
protected:
int sock; // socket associated to this task, or -1 if task complete
tcp_callbacks cb; // callbacks to ivy
ivy_class *ivy; // pointer to main ivy object (mainly to get lock)
public:
tcp_task(int sock, const tcp_callbacks &cb, ivy_class *ivy)
: sock(sock), cb(cb), ivy(ivy) {}
virtual int fdes() {
return sock;
}
};
// This task reads messages from a socket and calls the "recv" callback.
class tcp_reader : public tcp_task {
std::vector<char> buf;
public:
tcp_reader(int sock, const tcp_callbacks &cb, ivy_class *ivy)
: tcp_task(sock, cb, ivy) {
}
// This is called in a loop by the task thread.
virtual void read() {
// std::cerr << "RECEIVING\n";
buf.resize(2048);
int newbytes;
if ((newbytes = ::read(sock,&buf[0],2048)) < 0)
{ perror("read failed"); exit(1); }
// copy received bytes into the result
`data` pkt;
pkt.resize(newbytes);
std::copy(buf.begin(),buf.begin()+newbytes,pkt.begin());
// call the "recv" callback with the received message
ivy->__lock();
cb.rcb(sock,pkt);
ivy->__unlock();
}
};
// This class writes queued bytes to a socket. Packets can be added
// asynchronously to the tail of the queue. If the socket is closed,
// the queue will be emptied asynchrnonously. When the queue is empty the writer deletes
// the queue and exits.
// invariant: if socket is closed, queue is closed
class tcp_writer : public tcp_task {
tcp_queue *queue;
bool connected;
public:
tcp_writer(int sock, tcp_queue *queue, const tcp_callbacks &cb, ivy_class *ivy)
: tcp_task(sock,cb,ivy), queue(queue), connected(false) {
}
virtual int fdes() {
return sock;
}
// This is called in a loop by the task thread.
virtual void read() {
if (!connected) {
// if the socket is not connected, wait for the queue to be open,
// then connect
queue->wait_open();
connect();
return;
}
// dequeue a packet to send
std::vector<char> buf;
bool qclosed = queue->dequeue_swap(buf);
// if queue has been closed asynchrononously, close the socket.
if (qclosed) {
// std::cerr << "CLOSING " << sock << std::endl;
::close(sock);
connected = false;
return;
}
// try a blocking send
int bytes = send(sock,&buf[0],buf.size(),MSG_NOSIGNAL);
// std::cerr << "SENT\n";
// if not all bytes sent, channel has failed, close the queue
if (bytes < (int)buf.size())
fail_close();
}
void connect() {
// Get the address of the other from the configuration
// std::cerr << "ENTERING CONNECT " << sock << std::endl;
ivy -> __lock(); // can be asynchronous, so must lock ivy!
struct sockaddr_in myaddr;
`endpoint` other = queue->other;
get_tcp_addr(ivy,other,myaddr);
ivy -> __unlock();
// Call connect to make connection
// std::cerr << "CONNECTING sock=" << sock << "other=" << other << std::endl;
int res = ::connect(sock,(sockaddr *)&myaddr,sizeof(myaddr));
// If successful, call the "connected" callback, else "failed"
ivy->__lock();
if (res >= 0) {
// std::cerr << "CONNECT SUCCEEDED " << sock << std::endl;
cb.ccb(sock);
connected = true;
}
else {
// std::cerr << "CONNECT FAILED " << sock << std::endl;
fail_close();
}
ivy->__unlock();
}
void fail_close() {
queue -> set_closed(false); // close queue synchronously
// make sure socket is closed before fail callback, since this
// might open another socket, and we don't want to build up
// zombie sockets.
// std::cerr << "CLOSING ON FAILURE " << sock << std::endl;
::close(sock);
cb.fcb(sock);
connected = false;
}
};
// This task listens for connections on a socket in the background.
class tcp_listener : public tcp_task {
public:
// The constructor creates a socket to listen on.
tcp_listener(const tcp_callbacks &cb, ivy_class *ivy)
: tcp_task(0,cb,ivy) {
sock = make_tcp_socket();
}
// The bind method is called by the runtime once, after initialization.
// This allows us to query the configuration for our address and bind the socket.
virtual void bind() {
}
// After binding, the thread calls read in a loop. In this case, we don't read,
// we try accepting a connection. BUG: We should first call select to wait for a connection
// to be available, then call accept while holding the ivy lock. This is needed to
// guarantee the "accepted" appears to occur before "connected" which is required by
// the the tcp interface specification.
virtual void read() {
// std::cerr << "ACCEPTING\n";
// Call accept to get an incoming connection request. May block.
sockaddr_in other_addr;
socklen_t addrlen = sizeof(other_addr);
int new_sock = accept(sock, (sockaddr *)&other_addr, &addrlen);
// If this fails, something is very wrong: fail stop.
if (new_sock < 0)
{ perror("accept failed"); exit(1); }
`endpoint` other;
other.addr = ntohl(other_addr.sin_addr.s_addr);
other.port = ntohs(other_addr.sin_port);
// Run the "accept" callback. Since it's async, we must lock.
ivy->__lock();
cb.acb(new_sock,other);
ivy->__unlock();
// Install a reader task to read messages from the new socket.
ivy->install_reader(new tcp_reader(new_sock,cb,ivy));
}
};
>>>
<<< member
hash_space::hash_map<int,void *> `rdr`;
tcp_callbacks *`cb`; // the callbacks to ivy
hash_space::hash_map<int,tcp_queue *> `send_queue`; // queues of blocked packets, per socket
>>>
<<< init
// Create the callbacks. In a parameterized instance, this creates
// one set of callbacks for each endpoint id. When you put an
// action in anti-quotes it creates a function object (a "thunk")
// that captures the instance environment, in this case including
// the instance's endpoint id "me".
`cb` = new tcp_callbacks(`handle_accept`,`handle_recv`,`handle_fail`,`handle_connected`);
>>>
action handle_accept(s:socket, other:endpoint) = {
call accept(s,other)
}
action handle_recv(s:socket,x:data) = {
call recv(s,x)
}
action handle_fail(s:socket) = {
call failed(s)
}
action handle_connected(s:socket) = {
call connected(s)
}
open creates a socket, binds it to the requested endpoint, and starts listening on it.
action open(addr:endpoint) returns (s:socket) = {
<<< impure
tcp_listener *r = new tcp_listener(*`cb`,this);
s = r->fdes();
// std::cerr << "SOCKET " << s << std::endl;
struct sockaddr_in myaddr;
myaddr.sin_family = AF_INET;
myaddr.sin_addr.s_addr = htonl(addr.addr);
// myaddr.sin_addr.s_addr = htonl(INADDR_ANY);
myaddr.sin_port = htons(addr.port);
// std::cerr << "binding id: " << " addr: " << ntohl(myaddr.sin_addr.s_addr) << " port: " << ntohs(myaddr.sin_port) << std::endl;
if (::bind(s, (struct sockaddr *)&myaddr, sizeof(myaddr)) < 0)
{ perror("bind failed"); exit(1); }
// Start lisetning on the socket
if (listen(s,2) < 0)
{ std::cerr << "cannot listen on socket\n"; exit(1); }
`rdr`[s] = r;
install_reader(r);
>>>
}
action close(s:socket) = {
<<< impure
// We don't want to close a socket when there is another thread
// waiting, because the other thread won't know what to do with the
// error.
// Instead we shut down the socket and let the other thread close it.
// If there is a reader thread, it will see EOF and close the socket. If there is
// on open writer thread, it will close the socket after we close the
// send queue. If the queue is already closed, closing it has no effect.
// invariant: if a socket is open there is a reader thread or
// an open writer thread, but not both.
// Because of this invariant, the socket will be closed exactly once.
::shutdown(s,SHUT_RDWR);
if (`send_queue`.find(s) != `send_queue`.end())
`send_queue`[s] -> set_closed();
>>>
}
action connect(other:endpoint) returns (s:socket) = {
<<< impure
s = make_tcp_socket();
// std::cerr << "SOCKET " << s << std::endl;
// create a send queue for this socket, if needed, along with
// its thread. if the queue exists, it must be closed, so
// we open it.
tcp_queue *queue;
if (`send_queue`.find(s) == `send_queue`.end()) {
`send_queue`[s] = queue = new tcp_queue(other);
install_thread(new tcp_writer(s,queue,*`cb`,this));
} else
`send_queue`[s] -> set_open(other);
>>>
}
action send(s:socket,p:data) returns (ok:bool) = {
<<< impure
std::vector<char> buf;
buf.resize(p.size());
std::copy(p.begin(),p.end(),buf.begin());
// std::cerr << "SENDING\n";
// if the send queue for this sock doesn's exist, it isn't open,
// so the client has violated the precondition. we do the bad client
// the service of not crashing.
if (`send_queue`.find(s) == `send_queue`.end())
ok = true;
else {
// get the send queue, and enqueue the packet, returning false if
// the queue is closed.
ok = !`send_queue`[s]->enqueue_swap(buf);
}
>>>
}
action accept(s:socket,other:endpoint)
action recv(s:socket,p:data)
action failed(s:socket)
action connected(s:socket)
trusted isolate iso = this
attribute test = impl
}