Tcp serdes
This is an implementation of a generic TCP endpoint. It allows you to create point-to-point, non-duplicating, ordered connections that transfer sequences of messages of a given type. Serialization and deserialization are done automatically using the standard serializer and deserializer. There is currently no option to customize this. The parameters are:
endpoint : the type of UDP endpoints
data : the type of data streams
The endpoint type should be a struct with fields addr (the IP
address) and port. The data type should be arrays of bytes.
module tcp_serdes(endpoint,data) = {
type socket
object rdr = {} # the listener object object cb = {} # struct holding the callbacks object send_queue = {} # queues of outgoing packets
This code goes in the C++ header file, ahead of the ivy object declaration. Here, we put declarations (perhaps forward) of any auxiliary classes we need). We need to be careful that the names of these don't clash with other modules. However, duplicates are removed, so we don't have to worry about multiple instances of this module clashing.
<<< header
-
include
template <typename data>
class tcp_listener; // class of threads that listen for connections
template <typename data>
class tcp_callbacks; // class holding callbacks to ivy
class tcp_queue;
>>>
<<< impl
// Maximum number of sent packets to queue on a channel. Because TCP also
// buffers, the total number of untransmitted backets that can back up will be greater
// than this. This number *must* be at least one to void packet corruption.
struct tcp_mutex {
HANDLE mutex;
tcp_mutex() { mutex = CreateMutex(NULL,FALSE,NULL); }
void lock() { WaitForSingleObject(mutex,INFINITE); }
void unlock() { ReleaseMutex(mutex); }
pthread_mutex_t mutex;
tcp_mutex() { pthread_mutex_init(&mutex,NULL); }
void lock() { pthread_mutex_lock(&mutex); }
void unlock() { pthread_mutex_unlock(&mutex); }
};
class tcp_queue {
std::mutex mtx;
std::condition_variable cv;
bool closed;
bool reported_closed;
std::list<std::vector<char> > bufs;
public:
`endpoint` self; // only acces while holding lock!
`endpoint` other; // only acces while holding lock!
tcp_queue(`endpoint` self, `endpoint` other) : closed(false), reported_closed(false), self(self), other(other) {}
bool enqueue_swap(std::vector<char> &buf) {
std::unique_lock<std::mutex> lock(mtx);
// mutex.lock();
if (closed) {
// mutex.unlock();
return true;
}
if (bufs.size() < MAX_TCP_SEND_QUEUE) {
bufs.push_back(std::vector<char>());
buf.swap(bufs.back());
}
// mutex.unlock();
// std::cout << "PUTTING BUFFER size=" << bufs.size() << std::endl;
// sem.up();
cv.notify_all();
return false;
}
bool dequeue_swap(std::vector<char> &buf) {
std::unique_lock<std::mutex> lock(mtx);
while(true) {
// sem.down();
// mutex.lock();
if (closed) {
if (reported_closed) {
// mutex.unlock();
cv.wait(lock);
continue;
}
reported_closed = true;
// mutex.unlock();
// std::cout << "REPORTING CLOSED" << std::endl;
return true;
}
if (bufs.size() > 0) {
// std::cout << "GETTING BUFFER" << closed << std::endl;
buf.swap(bufs.front());
bufs.erase(bufs.begin());
// mutex.unlock();
return false;
}
cv.wait(lock);
// mutex.unlock();
}
}
void set_closed(bool report=true) {
std::unique_lock<std::mutex> lock(mtx);
// mutex.lock();
closed = true;
bufs.clear();
if (!report)
reported_closed = true;
// mutex.unlock();
// sem.up();
cv.notify_all();
}
void set_open(`endpoint` _other) {
std::unique_lock<std::mutex> lock(mtx);
// mutex.lock();
closed = false;
reported_closed = false;
other = _other;
// mutex.unlock();
// sem.up();
cv.notify_all();
}
void wait_open(bool closed_val = false){
std::unique_lock<std::mutex> lock(mtx);
while (true) {
// mutex.lock();
if (closed == closed_val) {
// mutex.unlock();
return;
}
// mutex.unlock();
// sem.down();
cv.wait(lock);
}
}
};
// construct a sockaddr_in for a specified process id using the configuration
void get_tcp_addr(ivy_class *ivy, `endpoint` addr, sockaddr_in &myaddr) {
memset((char *)&myaddr, 0, sizeof(myaddr));
myaddr.sin_family = AF_INET;
myaddr.sin_addr.s_addr = htonl(addr.addr);
myaddr.sin_port = htons(addr.port);
}
// get a new TCP socket
int make_tcp_socket() {
int sock = ::socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0)
{ std::cerr << "cannot create socket\n"; exit(1); }
int one = 1;
if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)) < 0)
{ perror("setsockopt TCP_NODELAY failed"); exit(1); }
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0)
{ perror("setsockopt SO_REUSEADDR failed"); exit(1); }
return sock;
}
// This structure holds all the callbacks for the endpoint. These are function objects
// that are called asynchronously.
template <typename data>
struct tcp_callbacks {
virtual void acb(int,`endpoint`) const = 0;
virtual void rcb(int,data) const = 0;
virtual void fcb(int) const = 0;
virtual void ccb(int) const = 0;
};
template<class acb_thunk, class rcb_thunk, class fcb_thunk, class ccb_thunk, typename data>
struct tcp_callbacks_thunk : tcp_callbacks<data> {
acb_thunk acbt;
rcb_thunk rcbt;
fcb_thunk fcbt;
ccb_thunk ccbt;
tcp_callbacks_thunk(const acb_thunk &acb,const rcb_thunk &rcb,const fcb_thunk &fcb,const ccb_thunk &ccb)
: acbt(acb), rcbt(rcb), fcbt(fcb), ccbt(ccb) {}
virtual void acb(int sock,`endpoint` ep) const {
acbt(sock,ep);
}
virtual void rcb(int sock, data pkt) const {
rcbt(sock,pkt);
}
virtual void fcb(int sock) const {
fcbt(sock);
}
virtual void ccb(int sock) const {
ccbt(sock);
}
};
// This is a general class for an asynchronous task. These objects are called in a loop
// by a thread allocated by the runtime. The fdes method returns a file descriptor
// associated with the object. If fdes returns a negative value, the thread deletes the
// object and terminates.
template <typename data>
class tcp_task : public reader {
protected:
int sock; // socket associated to this task, or -1 if task complete
const tcp_callbacks<data> *cb; // callbacks to ivy
ivy_class *ivy; // pointer to main ivy object (mainly to get lock)
public:
tcp_task(int sock, const tcp_callbacks<data> *cb, ivy_class *ivy)
: sock(sock), cb(cb), ivy(ivy) {}
virtual int fdes() {
return sock;
}
};
// This task reads messages from a socket and calls the "recv" callback.
template <typename data>
class tcp_reader : public tcp_task<data> {
std::vector<char> buf;
using tcp_task<data>::sock;
using tcp_task<data>::cb;
using tcp_task<data>::ivy;
public:
tcp_reader(int sock, const tcp_callbacks<data> *cb, ivy_class *ivy)
: tcp_task<data>(sock, cb, ivy) {
}
// This is called in a loop by the task thread.
virtual void read() {
// std::cout << "RECEIVING\n";
data pkt; // holds received message
ivy_socket_deser ds(sock,buf); // initializer deserialize with any leftover bytes
buf.clear(); // clear the leftover bytes
try {
__deser(ds,pkt); // read the message
}
// If packet has bad syntax, we drop it, close the socket, call the "failed"
// callback and terminate the task.
catch (deser_err &){
if (ds.pos > 0)
std::cout << "BAD PACKET RECEIVED\n";
else
std::cout << "EOF ON SOCKET\n";
cb->fcb(sock);
close(sock);
sock = -1;
return;
}
// copy the leftover bytes to buf
buf.resize(ds.inp.size()-ds.pos);
std::copy(ds.inp.begin()+ds.pos,ds.inp.end(),buf.begin());
// call the "recv" callback with the received message
ivy->__lock();
cb->rcb(sock,pkt);
ivy->__unlock();
}
};
// This class writes queued bytes to a socket. Packets can be added
// asynchronously to the tail of the queue. If the socket is closed,
// the queue will be emptied asynchrnonously. When the queue is empty the writer deletes
// the queue and exits.
// invariant: if socket is closed, queue is closed
template <typename data>
class tcp_writer : public tcp_task<data> {
tcp_queue *queue;
bool connected;
using tcp_task<data>::sock;
using tcp_task<data>::cb;
using tcp_task<data>::ivy;
public:
tcp_writer(int sock, tcp_queue *queue, const tcp_callbacks<data> *cb, ivy_class *ivy)
: tcp_task<data>(sock,cb,ivy), queue(queue), connected(false) {
}
virtual int fdes() {
return sock;
}
// This is called in a loop by the task thread.
virtual void read() {
if (!connected) {
// if the socket is not connected, wait for the queue to be open,
// then connect
queue->wait_open();
connect();
return;
}
// dequeue a packet to send
std::vector<char> buf;
bool qclosed = queue->dequeue_swap(buf);
// if queue has been closed asynchrononously, close the socket.
if (qclosed) {
std::cout << "CLOSING " << sock << std::endl;
::close(sock);
connected = false;
return;
}
// try a blocking send
int bytes = send(sock,&buf[0],buf.size(),MSG_NOSIGNAL);
int bytes = send(sock,&buf[0],buf.size(),0);
// std::cout << "SENT\n";
// if not all bytes sent, channel has failed, close the queue
if (bytes < (int)buf.size())
fail_close();
}
void connect() {
// Get the address of the other from the configuration
// std::cout << "ENTERING CONNECT " << sock << std::endl;
ivy -> __lock(); // can be asynchronous, so must lock ivy!
struct sockaddr_in myaddr;
`endpoint` other = queue->other;
get_tcp_addr(ivy,other,myaddr);
ivy -> __unlock();
// Call connect to make connection
// std::cout << "CONNECTING sock=" << sock << "other=" << other << std::endl;
int res = ::connect(sock,(sockaddr *)&myaddr,sizeof(myaddr));
// If successful, call the "connected" callback, else "failed"
ivy->__lock();
if (res >= 0) {
// std::cout << "CONNECT SUCCEEDED " << sock << std::endl;
int bytes = send(sock,&queue->self.port,sizeof(queue->self.port),MSG_NOSIGNAL);
int bytes = send(sock,&queue->self.port,sizeof(queue->self.port),0);
cb->ccb(sock);
connected = true;
}
else {
// std::cout << "CONNECT FAILED " << sock << std::endl;
// perror("connect failed");
ivy->__unlock();
::sleep(1);
close(sock);
sock = make_tcp_socket();
return;
// fail_close();
}
ivy->__unlock();
}
void fail_close() {
queue -> set_closed(false); // close queue synchronously
// make sure socket is closed before fail callback, since this
// might open another socket, and we don't want to build up
// zombie sockets.
// std::cout << "CLOSING ON FAILURE " << sock << std::endl;
::close(sock);
cb->fcb(sock);
connected = false;
}
};
// This task listens for connections on a socket in the background.
template <typename data>
class tcp_listener : public tcp_task<data> {
using tcp_task<data>::sock;
using tcp_task<data>::cb;
using tcp_task<data>::ivy;
public:
virtual bool background() {
return true;
}
// The constructor creates a socket to listen on.
tcp_listener(const tcp_callbacks<data> *cb, ivy_class *ivy)
: tcp_task<data>(0,cb,ivy) {
sock = make_tcp_socket();
}
// The bind method is called by the runtime once, after initialization.
// This allows us to query the configuration for our address and bind the socket.
virtual void bind() {
}
// After binding, the thread calls read in a loop. In this case, we don't read,
// we try accepting a connection. BUG: We should first call select to wait for a connection
// to be available, then call accept while holding the ivy lock. This is needed to
// guarantee the "accepted" appears to occur before "connected" which is required by
// the the tcp interface specification.
virtual void read() {
// Call accept to get an incoming connection request. May block.
sockaddr_in other_addr;
socklen_t addrlen = sizeof(other_addr);
int new_sock = accept(sock, (sockaddr *)&other_addr, &addrlen);
// If this fails, something is very wrong: fail stop.
if (new_sock < 0)
{ perror("accept failed"); exit(1); }
`endpoint` other;
other.addr = ntohl(other_addr.sin_addr.s_addr);
other.port = ntohs(other_addr.sin_port);
int bytes = ::read(new_sock,&other.port,sizeof(other.port));
// Run the "accept" callback. Since it's async, we must lock.
ivy->__lock();
cb->acb(new_sock,other);
ivy->__unlock();
// Install a reader task to read messages from the new socket.
ivy->install_reader(new tcp_reader<data>(new_sock,cb,ivy));
}
};
>>>
<<< member
hash_space::hash_map<int,void *> <code>rdr</code>;
tcp_callbacks *<code>cb</code>; // the callbacks to ivy
hash_space::hash_map<int,tcp_queue *> <code>send_queue</code>; // queues of blocked packets, per socket
type rdr_map_t
interpret rdr_map_t -> <<< primitive hash_space::hash_map<int,void *> >>>
type callback_t
interpret callback_t -> <<< primitive tcp_callbacks<`data`> * >>>
type queue_t
interpret queue_t -> <<< primitive hash_space::hash_map<int,tcp_queue *> >>>
var rdr : rdr_map_t
var cb : callback_t
var send_queue : queue_t
<<< init
// Create the callbacks. In a parameterized instance, this creates
// one set of callbacks for each endpoint id. When you put an
// action in anti-quotes it creates a function object (a "thunk")
// that captures the instance environment, in this case including
// the instance's endpoint id "me".
`cb` = new tcp_callbacks_thunk<%`handle_accept`,%`handle_recv`,%`handle_fail`,%`handle_connected`,`data`>
(`handle_accept`,`handle_recv`,`handle_fail`,`handle_connected`);
>>>
action handle_accept(s:socket, other:endpoint) = {
call accept(s,other)
}
action handle_recv(s:socket,x:data) = {
call recv(s,x)
}
action handle_fail(s:socket) = {
call failed(s)
}
action handle_connected(s:socket) = {
call connected(s)
}
open creates a socket, binds it to the requested endpoint, and starts listening on it.
action open(addr:endpoint) returns (s:socket) = {
<<< impure
tcp_listener<`data`> *r = new tcp_listener<`data`>(`cb`,this);
s = r->fdes();
// std::cout << "SOCKET " << s << std::endl;
struct sockaddr_in myaddr;
myaddr.sin_family = AF_INET;
myaddr.sin_addr.s_addr = htonl(addr.addr);
// myaddr.sin_addr.s_addr = htonl(INADDR_ANY);
myaddr.sin_port = htons(addr.port);
// std::cout << "binding id: " << " addr: " << ntohl(myaddr.sin_addr.s_addr) << " port: " << ntohs(myaddr.sin_port) << std::endl;
if (::bind(s, (struct sockaddr *)&myaddr, sizeof(myaddr)) < 0)
{ perror("bind failed"); exit(1); }
// Start lisetning on the socket
if (listen(s,2) < 0)
{ std::cerr << "cannot listen on socket\n"; exit(1); }
{
int opt = 1;
if (setsockopt(s, SOL_SOCKET, SO_NOSIGPIPE, &opt, sizeof(opt)) == -1) {
perror("setsockopt");
return -1;
}
}
`rdr`[s] = r;
install_reader(r);
>>>
}
action close(s:socket) = {
<<< impure
// We don't want to close a socket when there is another thread
// waiting, because the other thread won't know what to do with the
// error.
// Instead we shut down the socket and let the other thread close it.
// If there is a reader thread, it will see EOF and close the socket. If there is
// on open writer thread, it will close the socket after we close the
// send queue. If the queue is already closed, closing it has no effect.
// invariant: if a socket is open there is a reader thread or
// an open writer thread, but not both.
// Because of this invariant, the socket will be closed exactly once.
::shutdown(s,SHUT_RDWR);
if (`send_queue`.find(s) != `send_queue`.end())
`send_queue`[s] -> set_closed();
>>>
}
action connect(self:endpoint, other:endpoint) returns (s:socket) = {
<<< impure
s = make_tcp_socket();
// std::cout << "SOCKET " << s << std::endl;
// create a send queue for this socket, if needed, along with
// its thread. if the queue exists, it must be closed, so
// we open it.
tcp_queue *queue;
if (`send_queue`.find(s) == `send_queue`.end()) {
`send_queue`[s] = queue = new tcp_queue(self,other);
install_thread(new tcp_writer<`data`>(s,queue,`cb`,this));
} else
`send_queue`[s] -> set_open(other);
>>>
}
action send(s:socket,p:data) returns (ok:bool) = {
<<< impure
ivy_binary_ser sr;
__ser(sr,p);
// if the send queue for this sock doesn's exist, it isn't open,
// so the client has violated the precondition. we do the bad client
// the service of not crashing.
if (`send_queue`.find(s) == `send_queue`.end()) {
// std::cout << "CANNOT FIND QUEUE: " << s << "\n";
ok = true;
}
else {
// get the send queue, and enqueue the packet, returning false if
// the queue is closed.
// std::cout << "SENDING\n";
ok = !`send_queue`[s]->enqueue_swap(sr.res);
}
>>>
}
action accept(s:socket,other:endpoint)
action recv(s:socket,p:data)
action failed(s:socket)
action connected(s:socket)
trusted isolate iso = this
attribute test = impl
}