Skip to content
Snippets Groups Projects
Commit 9ab92e81 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Refactor ftl net into classes and finnish components

parent 7b84b34c
No related branches found
No related tags found
No related merge requests found
......@@ -22,7 +22,9 @@ set(CMAKE_CXX_FLAGS_RELEASE "-O3")
SET(CMAKE_USE_RELATIVE_PATHS ON)
set(FTLSOURCE
src/raw.cpp
src/net.cpp
src/listener.cpp
src/socket.cpp
)
check_include_file("uriparser/Uri.h" HAVE_URI_H)
......
#ifndef _FTL_NET_HPP_
#define _FTL_NET_HPP_
#include "ftl/net/raw.hpp"
#include <memory>
namespace ftl {
namespace net {
inline raw::Socket *connect(const char *uri) { return raw::connect(uri); }
class Listener;
class Socket;
const int MAX_CONNECTIONS = 100; // TODO Is this a good number?
/**
* Start a listening socket for new connections on the given URI. An example
* URI might be:
* tcp://localhost:9000.
*/
std::shared_ptr<Listener> listen(const char *uri);
/**
* Accepts tcp, ipc and ws URIs. An example would be:
* ws://ftl.utu.fi/api/connect
*/
std::shared_ptr<Socket> connect(const char *uri);
/**
* Start a loop to continually check for network messages. If the async
* parameter is false then this function will block as long as any connections
* or listeners remain active.
*
* @param async Use a separate thread.
*/
bool run(bool async);
/**
* Wait for a bunch of messages, but return once at least one has been
* processed.
*/
bool wait();
/**
* Check and process any waiting messages, but do not block if there are none.
*/
bool check();
/**
* Ensure that the network loop terminates, whether a separate thread or not.
*/
void stop();
/**
* Is the network loop running in another thread?
*/
bool is_async();
/**
* Is the network loop currently handling a message?
*/
bool is_handling();
}
}
......
#ifndef _FTL_NET_HANDLERS_HPP_
#define _FTL_NET_HANDLERS_HPP_
#include <functional>
namespace ftl {
namespace net {
typedef std::function<void(int, std::string&)> sockdatahandler_t;
typedef std::function<void(int)> sockerrorhandler_t;
typedef std::function<void()> sockconnecthandler_t;
typedef std::function<void(int)> sockdisconnecthandler_t;
typedef std::function<void(Socket&, int, std::string&)> datahandler_t;
typedef std::function<void(Socket&, int)> errorhandler_t;
typedef std::function<void(Socket&)> connecthandler_t;
typedef std::function<void(Socket&)> disconnecthandler_t;
};
};
#endif // _FTL_NET_HANDLERS_HPP_
#ifndef _FTL_NET_LISTENER_HPP_
#define _FTL_NET_LISTENER_HPP_
#ifndef WIN32
#include <netinet/in.h>
#endif
#ifdef WIN32
//#include <windows.h>
#include <winsock.h>
#endif
namespace ftl {
namespace net {
class Listener {
public:
Listener(const char *uri);
Listener(int sfd) : descriptor_(sfd) {}
virtual ~Listener();
bool isListening() { return descriptor_ >= 0; }
void close();
int _socket() { return descriptor_; }
private:
int descriptor_;
sockaddr_in slocalAddr;
};
};
};
#endif // _FTL_NET_LISTENER_HPP_
......@@ -5,106 +5,7 @@
#include <sstream>
#include <string>
#ifndef WIN32
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <arpa/inet.h>
#define INVALID_SOCKET -1
#define SOCKET_ERROR -1
#endif
#ifdef WIN32
#include <windows.h>
#include <winsock.h>
typedef int socklen_t;
#define MSG_WAITALL 0
#endif
namespace ftl {
namespace net {
namespace raw {
class Socket;
int listen(const char *uri);
void stop();
int run(bool blocking);
/**
* Accepts tcp, ipc and ws URIs. An example would be:
* ws://ftl.utu.fi/api/connect
*/
Socket *connect(const char *uri);
typedef std::function<void(int, std::string&)> sockdatahandler_t;
typedef std::function<void(int)> sockerrorhandler_t;
typedef std::function<void()> sockconnecthandler_t;
typedef std::function<void(int)> sockdisconnecthandler_t;
typedef std::function<void(Socket&, int, std::string&)> datahandler_t;
typedef std::function<void(Socket&, int)> errorhandler_t;
typedef std::function<void(Socket&)> connecthandler_t;
typedef std::function<void(Socket&)> disconnecthandler_t;
class Socket {
public:
int close();
int send(uint32_t service, std::string &data);
int send(uint32_t service, std::ostringstream &data);
int send(uint32_t service, void *data, int length);
friend int ftl::net::raw::listen(const char*);
friend Socket *ftl::net::raw::connect(const char*);
friend int ftl::net::raw::run(bool);
int _socket() { return m_sock; };
bool isConnected() { return m_sock != INVALID_SOCKET; };
void onMessage(sockdatahandler_t handler) { m_handler = handler; }
void onError(sockerrorhandler_t handler) {}
void onConnect(sockconnecthandler_t handler) {}
void onDisconnect(sockdisconnecthandler_t handler) {}
protected:
Socket(int s, const char *uri);
~Socket();
bool data();
void error();
char m_addr[INET6_ADDRSTRLEN];
private:
const char *m_uri;
int m_sock;
size_t m_pos;
char *m_buffer;
sockdatahandler_t m_handler;
static const int MAX_MESSAGE = 10*1024*1024; // 10Mb currently
static const int BUFFER_SIZE = MAX_MESSAGE + 16;
};
/**
* Get the number of current connections.
* @return Connection count
*/
int connections();
void onMessage(datahandler_t handler);
void onConnect(connecthandler_t handler);
void onDisconnect(disconnecthandler_t handler);
void onError(errorhandler_t handler);
const int MAX_CONNECTIONS = 100; // TODO Is this a good number?
} // raw
} // net
} // ftl
#endif // _FTL_NET_RAW_HPP_
#ifndef _FTL_NET_SOCKET_HPP_
#define _FTL_NET_SOCKET_HPP_
#include <ftl/net.hpp>
#include <ftl/net/handlers.hpp>
#ifndef WIN32
#define INVALID_SOCKET -1
#include <netinet/in.h>
#endif
#ifdef WIN32
//#include <windows.h>
#include <winsock.h>
#endif
namespace ftl {
namespace net {
class Socket {
public:
Socket(const char *uri);
Socket(int s);
~Socket();
int close();
int send(uint32_t service, std::string &data);
int send(uint32_t service, std::ostringstream &data);
int send(uint32_t service, void *data, int length);
//friend bool ftl::net::run(bool);
int _socket() { return m_sock; };
bool isConnected() { return m_sock != INVALID_SOCKET; };
bool isValid() { return m_valid; };
void onMessage(sockdatahandler_t handler) { m_handler = handler; }
void onError(sockerrorhandler_t handler) {}
void onConnect(sockconnecthandler_t handler) {}
void onDisconnect(sockdisconnecthandler_t handler) {}
bool data();
void error();
protected:
char m_addr[INET6_ADDRSTRLEN];
private:
const char *m_uri;
int m_sock;
size_t m_pos;
char *m_buffer;
sockdatahandler_t m_handler;
bool m_valid;
static const int MAX_MESSAGE = 10*1024*1024; // 10Mb currently
static const int BUFFER_SIZE = MAX_MESSAGE + 16;
};
};
};
#endif // _FTL_NET_SOCKET_HPP_
#include <ftl/uri.hpp>
#include <ftl/net/listener.hpp>
#include <iostream>
#ifndef WIN32
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <arpa/inet.h>
#define INVALID_SOCKET -1
#define SOCKET_ERROR -1
#endif
#ifdef WIN32
#include <windows.h>
#include <winsock.h>
typedef int socklen_t;
#define MSG_WAITALL 0
#endif
using namespace ftl;
using ftl::net::Listener;
int tcpListen(URI &uri) {
int ssock;
//std::cerr << "TCP Listen: " << uri.getHost() << " : " << uri.getPort() << std::endl;
#ifdef WIN32
WSAData wsaData;
//If Win32 then load winsock
if (WSAStartup(MAKEWORD(1,1), &wsaData) != 0) {
return INVALID_SOCKET;
}
#endif
ssock = socket(AF_INET, SOCK_STREAM, 0);
if (ssock == INVALID_SOCKET) {
return INVALID_SOCKET;
}
//Specify listen port and address
sockaddr_in slocalAddr;
slocalAddr.sin_family = AF_INET;
slocalAddr.sin_addr.s_addr = htonl(INADDR_ANY); // TODO, use that given in URI
slocalAddr.sin_port = htons(uri.getPort());
int rc = ::bind(ssock, (struct sockaddr*)&slocalAddr, sizeof(slocalAddr));
if (rc == SOCKET_ERROR) {
#ifndef WIN32
close(ssock);
#else
closesocket(ssock);
#endif
ssock = INVALID_SOCKET;
return INVALID_SOCKET;
}
//Attempt to start listening for connection requests.
rc = ::listen(ssock, 1);
if (rc == SOCKET_ERROR) {
#ifndef WIN32
close(ssock);
#else
closesocket(ssock);
#endif
ssock = INVALID_SOCKET;
return INVALID_SOCKET;
}
return ssock;
}
int wsListen(URI &uri) {
return INVALID_SOCKET;
}
Listener::Listener(const char *pUri) {
URI uri(pUri);
descriptor_ = INVALID_SOCKET;
if (uri.getProtocol() == URI::SCHEME_TCP) {
descriptor_ = tcpListen(uri);
std::cout << "Listening: " << pUri << " - " << descriptor_ << std::endl;
} else if (uri.getProtocol() == URI::SCHEME_WS) {
descriptor_ = wsListen(uri);
} else {
}
}
Listener::~Listener() {
// Close the socket.
close();
}
void Listener::close() {
//if (isConnected()) {
#ifndef WIN32
::close(descriptor_);
#else
closesocket(descriptor_);
#endif
descriptor_ = INVALID_SOCKET;
// Attempt auto reconnect?
//}
}
#include <ftl/net.hpp>
#include <ftl/net/listener.hpp>
#include <ftl/net/socket.hpp>
#include <vector>
#include <iostream>
using namespace std;
using ftl::net::Listener;
using ftl::net::Socket;
static std::vector<shared_ptr<ftl::net::Socket>> sockets;
static std::vector<shared_ptr<ftl::net::Listener>> listeners;
static fd_set sfdread;
static fd_set sfderror;
static int freeSocket() {
int freeclient = -1;
//Find a free client slot and allocated it
for (unsigned int i=0; i<sockets.size(); i++) {
if (sockets[i] == nullptr) { // CHECK, was 0 which seems wrong
freeclient = i;
break;
}
}
//Max clients reached, so send error
if (freeclient == -1) {
if (sockets.size() < ftl::net::MAX_CONNECTIONS) {
sockets.push_back(shared_ptr<Socket>(nullptr));
freeclient = sockets.size()-1;
} else {
// exceeded max connections
return -1;
}
}
return freeclient;
}
static int setDescriptors() {
//Reset all file descriptors
FD_ZERO(&sfdread);
FD_ZERO(&sfderror);
int n = 0;
//Set file descriptor for the listening sockets.
for (auto l : listeners) {
if (l != nullptr && l->isListening()) {
FD_SET(l->_socket(), &sfdread);
FD_SET(l->_socket(), &sfderror);
if (l->_socket() > n) n = l->_socket();
}
}
//Set the file descriptors for each client
for (auto s : sockets) {
if (s != nullptr && s->isConnected()) {
if (s->_socket() > n) {
n = s->_socket();
}
FD_SET(s->_socket(), &sfdread);
FD_SET(s->_socket(), &sfderror);
}
}
return n;
}
shared_ptr<Listener> ftl::net::listen(const char *uri) {
shared_ptr<Listener> l(new Listener(uri));
listeners.push_back(l);
return l;
}
shared_ptr<Socket> ftl::net::connect(const char *uri) {
shared_ptr<Socket> s(new Socket(uri));
int fs = freeSocket();
if (fs >= 0) {
sockets[fs] = s;
return s;
} else {
return NULL;
}
}
void ftl::net::stop() {
for (auto s : sockets) {
if (s != NULL) s->close();
}
sockets.clear();
/*#ifndef WIN32
if (ssock != INVALID_SOCKET) close(ssock);
#else
if (ssock != INVALID_SOCKET) closesocket(ssock);
#endif
ssock = INVALID_SOCKET;*/
for (auto l : listeners) {
l->close();
}
listeners.clear();
}
bool _run(bool blocking, bool nodelay) {
timeval block;
int n;
int selres = 1;
//if (ssock == INVALID_SOCKET) return 1;
bool active = true;
bool repeat = nodelay;
while (active || repeat) {
n = setDescriptors();
//Wait for a network event or timeout in 3 seconds
block.tv_sec = (repeat) ? 0 : 3;
block.tv_usec = 0;
selres = select(n+1, &sfdread, 0, &sfderror, &block);
repeat = false;
active = blocking;
//Some kind of error occured, it is usually possible to recover from this.
if (selres <= 0) {
return false;
}
//If connection request is waiting
for (auto l : listeners) {
if (l && l->isListening()) {
if (FD_ISSET(l->_socket(), &sfdread)) {
int rsize = sizeof(sockaddr_storage);
sockaddr_storage addr;
//int freeclient = freeSocket();
//if (freeclient >= 0) {
// TODO Limit connection rate or allow a pause in accepting
// TODO Send auto reject message under heavy load
//Finally accept this client connection.
int csock = accept(l->_socket(), (sockaddr*)&addr, (socklen_t*)&rsize);
if (csock != INVALID_SOCKET) {
shared_ptr<Socket> sock(new Socket(csock));
//sockets[freeclient] = sock;
sockets.push_back(sock);
// TODO Save the ip address
// deal with both IPv4 and IPv6:
/*if (addr.ss_family == AF_INET) {
struct sockaddr_in *s = (struct sockaddr_in *)&addr;
//port = ntohs(s->sin_port);
inet_ntop(AF_INET, &s->sin_addr, sock->m_addr, INET6_ADDRSTRLEN);
} else { // AF_INET6
struct sockaddr_in6 *s = (struct sockaddr_in6 *)&addr;
//port = ntohs(s->sin6_port);
inet_ntop(AF_INET6, &s->sin6_addr, sock->m_addr, INET6_ADDRSTRLEN);
}*/
}
//}
}
}
}
//Also check each clients socket to see if any messages or errors are waiting
for (auto s : sockets) {
if (s != NULL && s->isConnected()) {
//If message received from this client then deal with it
if (FD_ISSET(s->_socket(), &sfdread)) {
repeat |= s->data();
//An error occured with this client.
} else if (FD_ISSET(s->_socket(), &sfderror)) {
s->error();
}
}
}
}
return true;
}
bool ftl::net::check() {
return _run(false,true);
}
bool ftl::net::wait() {
return _run(false,false);
}
bool ftl::net::run(bool async) {
if (async) {
// TODO Start thread
} else {
return _run(true,false);
}
return false;
}
#include <ftl/net/raw.hpp>
#include <ftl/net/listener.hpp>
#include <ftl/uri.hpp>
#include <vector>
#include <iostream>
......@@ -13,406 +14,18 @@
using ftl::URI;
using ftl::net::raw::Socket;
static std::vector<Socket*> sockets;
static int ssock = INVALID_SOCKET;
static fd_set sfdread;
static fd_set sfderror;
static sockaddr_in slocalAddr;
static int freeSocket() {
int freeclient = -1;
//static sockaddr_in slocalAddr;
//Find a free client slot and allocated it
for (unsigned int i=0; i<sockets.size(); i++) {
if (sockets[i] == 0) { // CHECK, was 0 which seems wrong
freeclient = i;
break;
}
}
//Max clients reached, so send error
if (freeclient == -1) {
if (sockets.size() < ftl::net::raw::MAX_CONNECTIONS) {
sockets.push_back(0);
freeclient = sockets.size()-1;
} else {
// exceeded max connections
return -1;
}
}
return freeclient;
}
static int setDescriptors() {
//Reset all file descriptors
FD_ZERO(&sfdread);
FD_ZERO(&sfderror);
int n = 0;
//Set file descriptor for the listening socket.
if (ssock) {
FD_SET(ssock, &sfdread);
FD_SET(ssock, &sfderror);
n = ssock;
}
//Set the file descriptors for each client
for (auto s : sockets) {
if (s != NULL && s->isConnected()) {
if (s->_socket() > n) {
n = s->_socket();
}
FD_SET(s->_socket(), &sfdread);
FD_SET(s->_socket(), &sfderror);
}
}
return n;
}
static int tcpListen(URI &uri) {
//std::cerr << "TCP Listen: " << uri.getHost() << " : " << uri.getPort() << std::endl;
#ifdef WIN32
WSAData wsaData;
//If Win32 then load winsock
if (WSAStartup(MAKEWORD(1,1), &wsaData) != 0) {
return 1;
}
#endif
ssock = socket(AF_INET, SOCK_STREAM, 0);
if (ssock == INVALID_SOCKET) {
return 1;
}
//Specify listen port and address
slocalAddr.sin_family = AF_INET;
slocalAddr.sin_addr.s_addr = htonl(INADDR_ANY); // TODO, use that given in URI
slocalAddr.sin_port = htons(uri.getPort());
int rc = ::bind(ssock, (struct sockaddr*)&slocalAddr, sizeof(slocalAddr));
if (rc == SOCKET_ERROR) {
#ifndef WIN32
close(ssock);
#else
closesocket(ssock);
#endif
ssock = INVALID_SOCKET;
return 1;
}
//Attempt to start listening for connection requests.
rc = ::listen(ssock, 1);
if (rc == SOCKET_ERROR) {
#ifndef WIN32
close(ssock);
#else
closesocket(ssock);
#endif
ssock = INVALID_SOCKET;
return 1;
}
return 0;
}
static int wsListen(URI &uri) {
return 1;
}
static int tcpConnect(URI &uri) {
int rc;
sockaddr_in destAddr;
//std::cerr << "TCP Connect: " << uri.getHost() << " : " << uri.getPort() << std::endl;
#ifdef WIN32
WSAData wsaData;
if (WSAStartup(MAKEWORD(1,1), &wsaData) != 0) {
//ERROR
return INVALID_SOCKET;
}
#endif
//We want a TCP socket
int csocket = socket(AF_INET, SOCK_STREAM, 0);
if (csocket == INVALID_SOCKET) {
return INVALID_SOCKET;
}
#ifdef WIN32
HOSTENT *host = gethostbyname(uri.getHost().c_str());
#else
hostent *host = gethostbyname(uri.getHost().c_str());
#endif
if (host == NULL) {
#ifndef WIN32
close(csocket);
#else
closesocket(csocket);
#endif
std::cerr << "Address not found : " << uri.getHost() << std::endl;
return INVALID_SOCKET;
}
destAddr.sin_family = AF_INET;
destAddr.sin_addr.s_addr = ((in_addr *)(host->h_addr))->s_addr;
destAddr.sin_port = htons(uri.getPort());
// Make nonblocking
/*long arg = fcntl(csocket, F_GETFL, NULL));
arg |= O_NONBLOCK;
fcntl(csocket, F_SETFL, arg) < 0)*/
rc = ::connect(csocket, (struct sockaddr*)&destAddr, sizeof(destAddr));
if (rc < 0) {
if (errno == EINPROGRESS) {
} else {
#ifndef WIN32
close(csocket);
#else
closesocket(csocket);
#endif
std::cerr << "Could not connect" << std::endl;
return INVALID_SOCKET;
}
}
// Make blocking again
/*rg = fcntl(csocket, F_GETFL, NULL));
arg &= (~O_NONBLOCK);
fcntl(csocket, F_SETFL, arg) < 0)*/
// Handshake??
return csocket;
}
static int wsConnect(URI &uri) {
return 1;
}
int ftl::net::raw::listen(const char *pUri) {
URI uri(pUri);
if (uri.getProtocol() == URI::SCHEME_TCP) {
return tcpListen(uri);
} else if (uri.getProtocol() == URI::SCHEME_WS) {
return wsListen(uri);
} else {
return 1;
}
}
void ftl::net::raw::stop() {
for (auto s : sockets) {
if (s != NULL) s->close();
}
sockets.clear();
#ifndef WIN32
if (ssock != INVALID_SOCKET) close(ssock);
#else
if (ssock != INVALID_SOCKET) closesocket(ssock);
#endif
ssock = INVALID_SOCKET;
}
Socket *ftl::net::raw::connect(const char *pUri) {
URI uri(pUri);
if (uri.getProtocol() == URI::SCHEME_TCP) {
int csock = tcpConnect(uri);
Socket *s = new Socket(csock, pUri);
int fs = freeSocket();
if (fs >= 0) {
sockets[fs] = s;
return s;
} else {
return NULL;
}
} else if (uri.getProtocol() == URI::SCHEME_WS) {
wsConnect(uri);
return NULL;
} else {
return NULL;
}
}
int ftl::net::raw::run(bool blocking) {
timeval block;
int n;
int selres = 1;
//if (ssock == INVALID_SOCKET) return 1;
bool active = true;
bool repeat = false;
while (active || repeat) {
n = setDescriptors();
//Wait for a network event or timeout in 3 seconds
block.tv_sec = (repeat) ? 0 : 3;
block.tv_usec = 0;
selres = select(n+1, &sfdread, 0, &sfderror, &block);
repeat = false;
active = blocking;
//Some kind of error occured, it is usually possible to recover from this.
if (selres <= 0) {
return 1;
}
//If connection request is waiting
if (FD_ISSET(ssock, &sfdread)) {
int rsize = sizeof(sockaddr_storage);
sockaddr_storage addr;
int freeclient = freeSocket();
if (freeclient >= 0) {
// TODO Limit connection rate or allow a pause in accepting
// TODO Send auto reject message under heavy load
//Finally accept this client connection.
int csock = accept(ssock, (sockaddr*)&addr, (socklen_t*)&rsize);
if (csock != INVALID_SOCKET) {
Socket *sock = new Socket(csock, NULL);
sockets[freeclient] = sock;
//Save the ip address
// deal with both IPv4 and IPv6:
if (addr.ss_family == AF_INET) {
struct sockaddr_in *s = (struct sockaddr_in *)&addr;
//port = ntohs(s->sin_port);
inet_ntop(AF_INET, &s->sin_addr, sock->m_addr, INET6_ADDRSTRLEN);
} else { // AF_INET6
struct sockaddr_in6 *s = (struct sockaddr_in6 *)&addr;
//port = ntohs(s->sin6_port);
inet_ntop(AF_INET6, &s->sin6_addr, sock->m_addr, INET6_ADDRSTRLEN);
}
}
}
}
//Also check each clients socket to see if any messages or errors are waiting
for (auto s : sockets) {
if (s != NULL && s->isConnected()) {
//If message received from this client then deal with it
if (FD_ISSET(s->_socket(), &sfdread)) {
repeat |= s->data();
//An error occured with this client.
} else if (FD_ISSET(s->_socket(), &sfderror)) {
s->error();
}
}
}
}
return 1;
}
int Socket::close() {
if (isConnected()) {
#ifndef WIN32
::close(m_sock);
#else
closesocket(m_sock);
#endif
m_sock = INVALID_SOCKET;
// Attempt auto reconnect?
}
return 0;
}
void Socket::error() {
int err;
uint32_t optlen = sizeof(err);
getsockopt(m_sock, SOL_SOCKET, SO_ERROR, &err, &optlen);
std::cerr << "GOT A SOCKET ERROR : " << err << std::endl;
//close();
}
bool Socket::data() {
//std::cerr << "GOT SOCKET DATA" << std::endl;
//Read data from socket
size_t n = 0;
uint32_t len = 0;
if (m_pos < 4) {
n = 4 - m_pos;
} else {
len = *(int*)m_buffer;
n = len+4-m_pos;
}
while (m_pos < len+4) {
if (len > MAX_MESSAGE) {
close();
return false; // Prevent DoS
}
const int rc = recv(m_sock, m_buffer+m_pos, n, 0);
if (rc > 0) {
m_pos += static_cast<size_t>(rc);
if (m_pos < 4) {
n = 4 - m_pos;
} else {
len = *(int*)m_buffer;
n = len+4-m_pos;
}
} else if (rc == EWOULDBLOCK || rc == 0) {
// Data not yet available
return false;
} else {
// Close socket due to error
close();
return false;
}
}
// All data available
if (m_handler) {
uint32_t service = ((uint32_t*)m_buffer)[1];
auto d = std::string(m_buffer+8, len-4);
//std::cerr << "DATA : " << service << " -> " << d << std::endl;
m_handler(service, d);
}
m_pos = 0;
return true;
}
Socket::Socket(int s, const char *uri) : m_uri(uri), m_sock(s), m_pos(0) {
// Allocate buffer
m_buffer = new char[BUFFER_SIZE];
}
Socket::~Socket() {
// Delete socket buffer
delete [] m_buffer;
}
#include <ftl/uri.hpp>
#include <ftl/net/socket.hpp>
#ifndef WIN32
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <arpa/inet.h>
#define INVALID_SOCKET -1
#define SOCKET_ERROR -1
#endif
#ifdef WIN32
#include <windows.h>
#include <winsock.h>
typedef int socklen_t;
#define MSG_WAITALL 0
#endif
#include <iostream>
using namespace ftl;
using ftl::net::Socket;
using namespace std;
static int tcpConnect(URI &uri) {
int rc;
sockaddr_in destAddr;
//std::cerr << "TCP Connect: " << uri.getHost() << " : " << uri.getPort() << std::endl;
#ifdef WIN32
WSAData wsaData;
if (WSAStartup(MAKEWORD(1,1), &wsaData) != 0) {
//ERROR
return INVALID_SOCKET;
}
#endif
//We want a TCP socket
int csocket = socket(AF_INET, SOCK_STREAM, 0);
if (csocket == INVALID_SOCKET) {
return INVALID_SOCKET;
}
#ifdef WIN32
HOSTENT *host = gethostbyname(uri.getHost().c_str());
#else
hostent *host = gethostbyname(uri.getHost().c_str());
#endif
if (host == NULL) {
#ifndef WIN32
close(csocket);
#else
closesocket(csocket);
#endif
std::cerr << "Address not found : " << uri.getHost() << std::endl;
return INVALID_SOCKET;
}
destAddr.sin_family = AF_INET;
destAddr.sin_addr.s_addr = ((in_addr *)(host->h_addr))->s_addr;
destAddr.sin_port = htons(uri.getPort());
// Make nonblocking
/*long arg = fcntl(csocket, F_GETFL, NULL));
arg |= O_NONBLOCK;
fcntl(csocket, F_SETFL, arg) < 0)*/
rc = ::connect(csocket, (struct sockaddr*)&destAddr, sizeof(destAddr));
if (rc < 0) {
if (errno == EINPROGRESS) {
} else {
#ifndef WIN32
close(csocket);
#else
closesocket(csocket);
#endif
std::cerr << "Could not connect" << std::endl;
return INVALID_SOCKET;
}
}
// Make blocking again
/*rg = fcntl(csocket, F_GETFL, NULL));
arg &= (~O_NONBLOCK);
fcntl(csocket, F_SETFL, arg) < 0)*/
// Handshake??
return csocket;
}
static int wsConnect(URI &uri) {
return 1;
}
Socket::Socket(int s) : m_sock(s), m_pos(0) {
// TODO Get the remote address.
m_valid = true;
}
Socket::Socket(const char *pUri) : m_uri(pUri), m_pos(0) {
// Allocate buffer
m_buffer = new char[BUFFER_SIZE];
URI uri(pUri);
m_valid = false;
m_sock = INVALID_SOCKET;
if (uri.getProtocol() == URI::SCHEME_TCP) {
m_sock = tcpConnect(uri);
m_valid = true;
} else if (uri.getProtocol() == URI::SCHEME_WS) {
wsConnect(uri);
} else {
}
}
int Socket::close() {
if (isConnected()) {
#ifndef WIN32
::close(m_sock);
#else
closesocket(m_sock);
#endif
m_sock = INVALID_SOCKET;
// Attempt auto reconnect?
}
return 0;
}
void Socket::error() {
int err;
uint32_t optlen = sizeof(err);
getsockopt(m_sock, SOL_SOCKET, SO_ERROR, &err, &optlen);
std::cerr << "GOT A SOCKET ERROR : " << err << std::endl;
//close();
}
bool Socket::data() {
//std::cerr << "GOT SOCKET DATA" << std::endl;
//Read data from socket
size_t n = 0;
uint32_t len = 0;
if (m_pos < 4) {
n = 4 - m_pos;
} else {
len = *(int*)m_buffer;
n = len+4-m_pos;
}
while (m_pos < len+4) {
if (len > MAX_MESSAGE) {
close();
return false; // Prevent DoS
}
const int rc = recv(m_sock, m_buffer+m_pos, n, 0);
if (rc > 0) {
m_pos += static_cast<size_t>(rc);
if (m_pos < 4) {
n = 4 - m_pos;
} else {
len = *(int*)m_buffer;
n = len+4-m_pos;
}
} else if (rc == EWOULDBLOCK || rc == 0) {
// Data not yet available
return false;
} else {
// Close socket due to error
close();
return false;
}
}
// All data available
if (m_handler) {
uint32_t service = ((uint32_t*)m_buffer)[1];
auto d = std::string(m_buffer+8, len-4);
//std::cerr << "DATA : " << service << " -> " << d << std::endl;
m_handler(service, d);
}
m_pos = 0;
return true;
}
Socket::~Socket() {
close();
// Delete socket buffer
delete [] m_buffer;
}
add_executable(tests EXCLUDE_FROM_ALL
./tests.cpp
./net_raw.cpp
../src/raw.cpp
../src/net.cpp
../src/socket.cpp
../src/listener.cpp
./ice.cpp
../src/ice.cpp
./uri.cpp
......
#include "catch.hpp"
#include <string.h>
#include <ftl/net/raw.hpp>
#include <ftl/net.hpp>
#include <ftl/net/socket.hpp>
#include <ftl/net/listener.hpp>
#include <iostream>
#include <memory>
#ifndef WIN32
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <arpa/inet.h>
#define INVALID_SOCKET -1
#define SOCKET_ERROR -1
#endif
#ifdef WIN32
#include <windows.h>
#include <winsock.h>
typedef int socklen_t;
#define MSG_WAITALL 0
#endif
// ---- MOCK Server Code -------------------------------------------------------
......@@ -14,7 +35,8 @@ static fd_set sfdread;
static fd_set sfderror;
static sockaddr_in slocalAddr;
using ftl::net::raw::Socket;
using ftl::net::Socket;
using std::shared_ptr;
void fin_server() {
if (!server) return;
......@@ -147,6 +169,8 @@ void accept_connection() {
if (selres > 0 && FD_ISSET(ssock, &sfdread)) {
int rsize = sizeof(sockaddr_storage);
sockaddr_storage addr;
std::cout << "Accepted connection!" << std::endl;
//Finally accept this client connection.
csock = accept(ssock, (sockaddr*)&addr, (socklen_t*)&rsize);
......@@ -161,33 +185,33 @@ TEST_CASE("net::connect()", "[net]") {
init_server();
REQUIRE(ssock != INVALID_SOCKET);
Socket *sock = NULL;
shared_ptr<Socket> sock = nullptr;
SECTION("valid tcp connection using ipv4") {
sock = ftl::net::raw::connect("tcp://127.0.0.1:7077");
REQUIRE(sock != NULL);
sock = ftl::net::connect("tcp://127.0.0.1:7077");
REQUIRE(sock != nullptr);
accept_connection();
}
SECTION("valid tcp connection using hostname") {
sock = ftl::net::raw::connect("tcp://localhost:7077");
REQUIRE(sock != NULL);
sock = ftl::net::connect("tcp://localhost:7077");
REQUIRE(sock->isValid());
accept_connection();
}
SECTION("invalid protocol") {
sock = ftl::net::raw::connect("http://127.0.0.1:7077");
REQUIRE(sock == NULL);
sock = ftl::net::connect("http://127.0.0.1:7077");
REQUIRE(!sock->isValid());
}
SECTION("empty uri") {
sock = ftl::net::raw::connect("");
REQUIRE(sock == NULL);
sock = ftl::net::connect("");
REQUIRE(!sock->isValid());
}
SECTION("null uri") {
sock = ftl::net::raw::connect(NULL);
REQUIRE(sock == NULL);
sock = ftl::net::connect(NULL);
REQUIRE(!sock->isValid());
}
// Disabled due to long timeout
......@@ -199,13 +223,13 @@ TEST_CASE("net::connect()", "[net]") {
}*/
SECTION("incorrect dns address") {
sock = ftl::net::raw::connect("tcp://xryyrrgrtgddgr.com:7077");
REQUIRE(sock != NULL);
sock = ftl::net::connect("tcp://xryyrrgrtgddgr.com:7077");
REQUIRE(sock->isValid());
REQUIRE(sock->isConnected() == false);
sock = NULL;
sock = nullptr;
}
if (sock) {
if (sock && sock->isValid()) {
REQUIRE(sock->isConnected());
REQUIRE(csock != INVALID_SOCKET);
sock->close();
......@@ -216,26 +240,26 @@ TEST_CASE("net::connect()", "[net]") {
TEST_CASE("net::listen()", "[net]") {
SECTION("tcp any interface") {
REQUIRE( ftl::net::raw::listen("tcp://*:7078") == 0);
REQUIRE( ftl::net::listen("tcp://*:7078")->isListening() );
SECTION("can connect to listening socket") {
Socket *sock = ftl::net::raw::connect("tcp://127.0.0.1:7078");
REQUIRE(sock != NULL);
shared_ptr<Socket> sock = ftl::net::connect("tcp://127.0.0.1:7078");
REQUIRE(sock->isValid());
REQUIRE(sock->isConnected());
ftl::net::raw::run(false);
ftl::net::wait();
// TODO Need way of knowing about connection
}
ftl::net::raw::stop();
ftl::net::stop();
}
}
TEST_CASE("Socket.onMessage()", "[net]") {
// Need a fake server...
init_server();
Socket *sock = ftl::net::raw::connect("tcp://127.0.0.1:7077");
REQUIRE(sock != NULL);
shared_ptr<Socket> sock = ftl::net::connect("tcp://127.0.0.1:7077");
REQUIRE(sock->isValid());
REQUIRE(sock->isConnected());
accept_connection();
......@@ -250,7 +274,7 @@ TEST_CASE("Socket.onMessage()", "[net]") {
msg = true;
});
ftl::net::raw::run(false);
ftl::net::wait();
REQUIRE(msg);
}
......@@ -265,7 +289,7 @@ TEST_CASE("Socket.onMessage()", "[net]") {
msg = true;
});
ftl::net::raw::run(false);
ftl::net::wait();
REQUIRE(msg);
}
......@@ -282,7 +306,7 @@ TEST_CASE("Socket.onMessage()", "[net]") {
msg++;
});
ftl::net::raw::run(false);
ftl::net::wait();
REQUIRE(msg == 2);
}
......@@ -297,7 +321,7 @@ TEST_CASE("Socket.onMessage()", "[net]") {
sock->close();
ftl::net::raw::run(false);
ftl::net::wait();
REQUIRE(!msg);
}
......
#include <memory.h>
/*void ftl::rm::Blob::write(size_t offset, const char *data, size_t size) {
// Sanity check
if (offset + size > size_) throw -1;
// If local, write direct to data_, otherwise send over network
if (socket_ != NULL) {
Header header{blobid_,static_cast<uint32_t>(offset),static_cast<uint32_t>(size)};
// Send over network
socket_->send2(MEMORY_WRITE, std::string((const char*)&header,sizeof(header)),
std::string(data,size));
} else {
// Copy locally
memcpy(data_+offset, data, size);
}
}*/
/*void ftl::rm::Blob::read(size_t offset, char *data, size_t size) {
// Sanity check
if (offset + size > size_) throw -1;
// If local, write direct to data_, otherwise send over network
if (socket_ != NULL) {
} else {
// Copy locally
memcpy(data,data_+offset, size);
}
}*/
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment