diff --git a/net/CMakeLists.txt b/net/CMakeLists.txt index 04af7370a0854d60b830516c6ad4de618605e902..98f2364a31ac3841ebf6d5f40ad114a2fbc116c3 100644 --- a/net/CMakeLists.txt +++ b/net/CMakeLists.txt @@ -22,7 +22,9 @@ set(CMAKE_CXX_FLAGS_RELEASE "-O3") SET(CMAKE_USE_RELATIVE_PATHS ON) set(FTLSOURCE - src/raw.cpp + src/net.cpp + src/listener.cpp + src/socket.cpp ) check_include_file("uriparser/Uri.h" HAVE_URI_H) diff --git a/net/include/ftl/net.hpp b/net/include/ftl/net.hpp index 9e4f6d49d44a796f71428010539d4fe5fb7e16ea..8a3c8f90fdf7207abd293c88a5687dba93942984 100644 --- a/net/include/ftl/net.hpp +++ b/net/include/ftl/net.hpp @@ -1,12 +1,63 @@ #ifndef _FTL_NET_HPP_ #define _FTL_NET_HPP_ -#include "ftl/net/raw.hpp" +#include <memory> namespace ftl { namespace net { -inline raw::Socket *connect(const char *uri) { return raw::connect(uri); } +class Listener; +class Socket; + +const int MAX_CONNECTIONS = 100; // TODO Is this a good number? + +/** + * Start a listening socket for new connections on the given URI. An example + * URI might be: + * tcp://localhost:9000. + */ +std::shared_ptr<Listener> listen(const char *uri); + +/** + * Accepts tcp, ipc and ws URIs. An example would be: + * ws://ftl.utu.fi/api/connect + */ +std::shared_ptr<Socket> connect(const char *uri); + +/** + * Start a loop to continually check for network messages. If the async + * parameter is false then this function will block as long as any connections + * or listeners remain active. + * + * @param async Use a separate thread. + */ +bool run(bool async); + +/** + * Wait for a bunch of messages, but return once at least one has been + * processed. + */ +bool wait(); + +/** + * Check and process any waiting messages, but do not block if there are none. + */ +bool check(); + +/** + * Ensure that the network loop terminates, whether a separate thread or not. + */ +void stop(); + +/** + * Is the network loop running in another thread? + */ +bool is_async(); + +/** + * Is the network loop currently handling a message? + */ +bool is_handling(); } } diff --git a/net/include/ftl/net/handlers.hpp b/net/include/ftl/net/handlers.hpp new file mode 100644 index 0000000000000000000000000000000000000000..f3904da8b1ff4533b967f08bc2f2ddfe83cb6d20 --- /dev/null +++ b/net/include/ftl/net/handlers.hpp @@ -0,0 +1,23 @@ +#ifndef _FTL_NET_HANDLERS_HPP_ +#define _FTL_NET_HANDLERS_HPP_ + +#include <functional> + +namespace ftl { +namespace net { + +typedef std::function<void(int, std::string&)> sockdatahandler_t; +typedef std::function<void(int)> sockerrorhandler_t; +typedef std::function<void()> sockconnecthandler_t; +typedef std::function<void(int)> sockdisconnecthandler_t; + +typedef std::function<void(Socket&, int, std::string&)> datahandler_t; +typedef std::function<void(Socket&, int)> errorhandler_t; +typedef std::function<void(Socket&)> connecthandler_t; +typedef std::function<void(Socket&)> disconnecthandler_t; + +}; +}; + +#endif // _FTL_NET_HANDLERS_HPP_ + diff --git a/net/include/ftl/net/listener.hpp b/net/include/ftl/net/listener.hpp new file mode 100644 index 0000000000000000000000000000000000000000..ca3b2c5d151aa871358d5929f979c82a5e63b27d --- /dev/null +++ b/net/include/ftl/net/listener.hpp @@ -0,0 +1,34 @@ +#ifndef _FTL_NET_LISTENER_HPP_ +#define _FTL_NET_LISTENER_HPP_ + +#ifndef WIN32 +#include <netinet/in.h> +#endif + +#ifdef WIN32 +//#include <windows.h> +#include <winsock.h> +#endif + +namespace ftl { +namespace net { + +class Listener { + public: + Listener(const char *uri); + Listener(int sfd) : descriptor_(sfd) {} + virtual ~Listener(); + + bool isListening() { return descriptor_ >= 0; } + void close(); + int _socket() { return descriptor_; } + + private: + int descriptor_; + sockaddr_in slocalAddr; +}; + +}; +}; + +#endif // _FTL_NET_LISTENER_HPP_ diff --git a/net/include/ftl/net/raw.hpp b/net/include/ftl/net/raw.hpp index 0dcd8e05b2d011ffd701827455769909d50c37a5..c869a919ee1a095c819c77373e4f360229432127 100644 --- a/net/include/ftl/net/raw.hpp +++ b/net/include/ftl/net/raw.hpp @@ -5,106 +5,7 @@ #include <sstream> #include <string> -#ifndef WIN32 -#include <unistd.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <netinet/in.h> -#include <netdb.h> -#include <arpa/inet.h> -#define INVALID_SOCKET -1 -#define SOCKET_ERROR -1 -#endif -#ifdef WIN32 -#include <windows.h> -#include <winsock.h> -typedef int socklen_t; -#define MSG_WAITALL 0 -#endif -namespace ftl { -namespace net { -namespace raw { - -class Socket; - -int listen(const char *uri); -void stop(); -int run(bool blocking); - -/** - * Accepts tcp, ipc and ws URIs. An example would be: - * ws://ftl.utu.fi/api/connect - */ -Socket *connect(const char *uri); - -typedef std::function<void(int, std::string&)> sockdatahandler_t; -typedef std::function<void(int)> sockerrorhandler_t; -typedef std::function<void()> sockconnecthandler_t; -typedef std::function<void(int)> sockdisconnecthandler_t; - -typedef std::function<void(Socket&, int, std::string&)> datahandler_t; -typedef std::function<void(Socket&, int)> errorhandler_t; -typedef std::function<void(Socket&)> connecthandler_t; -typedef std::function<void(Socket&)> disconnecthandler_t; - -class Socket { - public: - int close(); - - int send(uint32_t service, std::string &data); - int send(uint32_t service, std::ostringstream &data); - int send(uint32_t service, void *data, int length); - - friend int ftl::net::raw::listen(const char*); - friend Socket *ftl::net::raw::connect(const char*); - friend int ftl::net::raw::run(bool); - - int _socket() { return m_sock; }; - - bool isConnected() { return m_sock != INVALID_SOCKET; }; - - void onMessage(sockdatahandler_t handler) { m_handler = handler; } - void onError(sockerrorhandler_t handler) {} - void onConnect(sockconnecthandler_t handler) {} - void onDisconnect(sockdisconnecthandler_t handler) {} - - protected: - Socket(int s, const char *uri); - ~Socket(); - - bool data(); - void error(); - - char m_addr[INET6_ADDRSTRLEN]; - - private: - const char *m_uri; - int m_sock; - size_t m_pos; - char *m_buffer; - sockdatahandler_t m_handler; - - static const int MAX_MESSAGE = 10*1024*1024; // 10Mb currently - static const int BUFFER_SIZE = MAX_MESSAGE + 16; -}; - -/** - * Get the number of current connections. - * @return Connection count - */ -int connections(); - -void onMessage(datahandler_t handler); -void onConnect(connecthandler_t handler); -void onDisconnect(disconnecthandler_t handler); -void onError(errorhandler_t handler); - -const int MAX_CONNECTIONS = 100; // TODO Is this a good number? - -} // raw -} // net -} // ftl #endif // _FTL_NET_RAW_HPP_ diff --git a/net/include/ftl/net/socket.hpp b/net/include/ftl/net/socket.hpp new file mode 100644 index 0000000000000000000000000000000000000000..dd5dc8a8a1a67c0bd9f5ecad0d3fecba3e2553ab --- /dev/null +++ b/net/include/ftl/net/socket.hpp @@ -0,0 +1,66 @@ +#ifndef _FTL_NET_SOCKET_HPP_ +#define _FTL_NET_SOCKET_HPP_ + +#include <ftl/net.hpp> +#include <ftl/net/handlers.hpp> + +#ifndef WIN32 +#define INVALID_SOCKET -1 +#include <netinet/in.h> +#endif + +#ifdef WIN32 +//#include <windows.h> +#include <winsock.h> +#endif + +namespace ftl { +namespace net { + +class Socket { + public: + Socket(const char *uri); + Socket(int s); + ~Socket(); + + int close(); + + int send(uint32_t service, std::string &data); + int send(uint32_t service, std::ostringstream &data); + int send(uint32_t service, void *data, int length); + + //friend bool ftl::net::run(bool); + + int _socket() { return m_sock; }; + + bool isConnected() { return m_sock != INVALID_SOCKET; }; + bool isValid() { return m_valid; }; + + void onMessage(sockdatahandler_t handler) { m_handler = handler; } + void onError(sockerrorhandler_t handler) {} + void onConnect(sockconnecthandler_t handler) {} + void onDisconnect(sockdisconnecthandler_t handler) {} + + bool data(); + void error(); + + protected: + + char m_addr[INET6_ADDRSTRLEN]; + + private: + const char *m_uri; + int m_sock; + size_t m_pos; + char *m_buffer; + sockdatahandler_t m_handler; + bool m_valid; + + static const int MAX_MESSAGE = 10*1024*1024; // 10Mb currently + static const int BUFFER_SIZE = MAX_MESSAGE + 16; +}; + +}; +}; + +#endif // _FTL_NET_SOCKET_HPP_ diff --git a/net/src/listener.cpp b/net/src/listener.cpp new file mode 100644 index 0000000000000000000000000000000000000000..f3a92d8a74ee74922d1da44d79cf417be6670c76 --- /dev/null +++ b/net/src/listener.cpp @@ -0,0 +1,112 @@ +#include <ftl/uri.hpp> +#include <ftl/net/listener.hpp> +#include <iostream> + +#ifndef WIN32 +#include <unistd.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <netdb.h> +#include <arpa/inet.h> +#define INVALID_SOCKET -1 +#define SOCKET_ERROR -1 +#endif + +#ifdef WIN32 +#include <windows.h> +#include <winsock.h> +typedef int socklen_t; +#define MSG_WAITALL 0 +#endif + +using namespace ftl; +using ftl::net::Listener; + +int tcpListen(URI &uri) { + int ssock; + //std::cerr << "TCP Listen: " << uri.getHost() << " : " << uri.getPort() << std::endl; + #ifdef WIN32 + WSAData wsaData; + //If Win32 then load winsock + if (WSAStartup(MAKEWORD(1,1), &wsaData) != 0) { + return INVALID_SOCKET; + } + #endif + + ssock = socket(AF_INET, SOCK_STREAM, 0); + if (ssock == INVALID_SOCKET) { + return INVALID_SOCKET; + } + + //Specify listen port and address + sockaddr_in slocalAddr; + slocalAddr.sin_family = AF_INET; + slocalAddr.sin_addr.s_addr = htonl(INADDR_ANY); // TODO, use that given in URI + slocalAddr.sin_port = htons(uri.getPort()); + + int rc = ::bind(ssock, (struct sockaddr*)&slocalAddr, sizeof(slocalAddr)); + + if (rc == SOCKET_ERROR) { + #ifndef WIN32 + close(ssock); + #else + closesocket(ssock); + #endif + ssock = INVALID_SOCKET; + return INVALID_SOCKET; + } + + //Attempt to start listening for connection requests. + rc = ::listen(ssock, 1); + + if (rc == SOCKET_ERROR) { + #ifndef WIN32 + close(ssock); + #else + closesocket(ssock); + #endif + ssock = INVALID_SOCKET; + return INVALID_SOCKET; + } + + return ssock; +} + +int wsListen(URI &uri) { + return INVALID_SOCKET; +} + +Listener::Listener(const char *pUri) { + URI uri(pUri); + + descriptor_ = INVALID_SOCKET; + + if (uri.getProtocol() == URI::SCHEME_TCP) { + descriptor_ = tcpListen(uri); + std::cout << "Listening: " << pUri << " - " << descriptor_ << std::endl; + } else if (uri.getProtocol() == URI::SCHEME_WS) { + descriptor_ = wsListen(uri); + } else { + + } +} + +Listener::~Listener() { + // Close the socket. + close(); +} + +void Listener::close() { + //if (isConnected()) { + #ifndef WIN32 + ::close(descriptor_); + #else + closesocket(descriptor_); + #endif + descriptor_ = INVALID_SOCKET; + + // Attempt auto reconnect? + //} +} + diff --git a/net/src/net.cpp b/net/src/net.cpp index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..23eaeae68ef5311ad3a1e71d750c2b6e1a21b7c7 100644 --- a/net/src/net.cpp +++ b/net/src/net.cpp @@ -0,0 +1,211 @@ +#include <ftl/net.hpp> +#include <ftl/net/listener.hpp> +#include <ftl/net/socket.hpp> + +#include <vector> +#include <iostream> + +using namespace std; +using ftl::net::Listener; +using ftl::net::Socket; + +static std::vector<shared_ptr<ftl::net::Socket>> sockets; +static std::vector<shared_ptr<ftl::net::Listener>> listeners; +static fd_set sfdread; +static fd_set sfderror; + +static int freeSocket() { + int freeclient = -1; + + //Find a free client slot and allocated it + for (unsigned int i=0; i<sockets.size(); i++) { + if (sockets[i] == nullptr) { // CHECK, was 0 which seems wrong + freeclient = i; + break; + } + } + + //Max clients reached, so send error + if (freeclient == -1) { + if (sockets.size() < ftl::net::MAX_CONNECTIONS) { + sockets.push_back(shared_ptr<Socket>(nullptr)); + freeclient = sockets.size()-1; + } else { + // exceeded max connections + return -1; + } + } + + return freeclient; +} + +static int setDescriptors() { + //Reset all file descriptors + FD_ZERO(&sfdread); + FD_ZERO(&sfderror); + + int n = 0; + + //Set file descriptor for the listening sockets. + for (auto l : listeners) { + if (l != nullptr && l->isListening()) { + FD_SET(l->_socket(), &sfdread); + FD_SET(l->_socket(), &sfderror); + if (l->_socket() > n) n = l->_socket(); + } + } + + //Set the file descriptors for each client + for (auto s : sockets) { + if (s != nullptr && s->isConnected()) { + + if (s->_socket() > n) { + n = s->_socket(); + } + + FD_SET(s->_socket(), &sfdread); + FD_SET(s->_socket(), &sfderror); + } + } + + return n; +} + +shared_ptr<Listener> ftl::net::listen(const char *uri) { + shared_ptr<Listener> l(new Listener(uri)); + listeners.push_back(l); + return l; +} + +shared_ptr<Socket> ftl::net::connect(const char *uri) { + shared_ptr<Socket> s(new Socket(uri)); + int fs = freeSocket(); + if (fs >= 0) { + sockets[fs] = s; + return s; + } else { + return NULL; + } +} + +void ftl::net::stop() { + for (auto s : sockets) { + if (s != NULL) s->close(); + } + + sockets.clear(); + + /*#ifndef WIN32 + if (ssock != INVALID_SOCKET) close(ssock); + #else + if (ssock != INVALID_SOCKET) closesocket(ssock); + #endif + + ssock = INVALID_SOCKET;*/ + + for (auto l : listeners) { + l->close(); + } + + listeners.clear(); +} + +bool _run(bool blocking, bool nodelay) { + timeval block; + int n; + int selres = 1; + + //if (ssock == INVALID_SOCKET) return 1; + + bool active = true; + bool repeat = nodelay; + + while (active || repeat) { + n = setDescriptors(); + + //Wait for a network event or timeout in 3 seconds + block.tv_sec = (repeat) ? 0 : 3; + block.tv_usec = 0; + selres = select(n+1, &sfdread, 0, &sfderror, &block); + + repeat = false; + active = blocking; + + //Some kind of error occured, it is usually possible to recover from this. + if (selres <= 0) { + return false; + } + + //If connection request is waiting + for (auto l : listeners) { + if (l && l->isListening()) { + if (FD_ISSET(l->_socket(), &sfdread)) { + int rsize = sizeof(sockaddr_storage); + sockaddr_storage addr; + //int freeclient = freeSocket(); + + //if (freeclient >= 0) { + // TODO Limit connection rate or allow a pause in accepting + // TODO Send auto reject message under heavy load + + //Finally accept this client connection. + int csock = accept(l->_socket(), (sockaddr*)&addr, (socklen_t*)&rsize); + + if (csock != INVALID_SOCKET) { + shared_ptr<Socket> sock(new Socket(csock)); + //sockets[freeclient] = sock; + + sockets.push_back(sock); + + // TODO Save the ip address + // deal with both IPv4 and IPv6: + /*if (addr.ss_family == AF_INET) { + struct sockaddr_in *s = (struct sockaddr_in *)&addr; + //port = ntohs(s->sin_port); + inet_ntop(AF_INET, &s->sin_addr, sock->m_addr, INET6_ADDRSTRLEN); + } else { // AF_INET6 + struct sockaddr_in6 *s = (struct sockaddr_in6 *)&addr; + //port = ntohs(s->sin6_port); + inet_ntop(AF_INET6, &s->sin6_addr, sock->m_addr, INET6_ADDRSTRLEN); + }*/ + } + //} + } + } + } + + //Also check each clients socket to see if any messages or errors are waiting + for (auto s : sockets) { + if (s != NULL && s->isConnected()) { + //If message received from this client then deal with it + if (FD_ISSET(s->_socket(), &sfdread)) { + repeat |= s->data(); + //An error occured with this client. + } else if (FD_ISSET(s->_socket(), &sfderror)) { + s->error(); + } + } + } + } + + return true; +} + +bool ftl::net::check() { + return _run(false,true); +} + +bool ftl::net::wait() { + return _run(false,false); +} + +bool ftl::net::run(bool async) { + if (async) { + // TODO Start thread + } else { + return _run(true,false); + } + + return false; +} + diff --git a/net/src/raw.cpp b/net/src/raw.cpp index 4725e8dda55f3533d6257cd7947094f57d7588de..08052b2c5861fdb652b3365ddc21b9ec1d0f7483 100644 --- a/net/src/raw.cpp +++ b/net/src/raw.cpp @@ -1,4 +1,5 @@ #include <ftl/net/raw.hpp> +#include <ftl/net/listener.hpp> #include <ftl/uri.hpp> #include <vector> #include <iostream> @@ -13,406 +14,18 @@ using ftl::URI; using ftl::net::raw::Socket; -static std::vector<Socket*> sockets; -static int ssock = INVALID_SOCKET; -static fd_set sfdread; -static fd_set sfderror; -static sockaddr_in slocalAddr; -static int freeSocket() { - int freeclient = -1; +//static sockaddr_in slocalAddr; - //Find a free client slot and allocated it - for (unsigned int i=0; i<sockets.size(); i++) { - if (sockets[i] == 0) { // CHECK, was 0 which seems wrong - freeclient = i; - break; - } - } - //Max clients reached, so send error - if (freeclient == -1) { - if (sockets.size() < ftl::net::raw::MAX_CONNECTIONS) { - sockets.push_back(0); - freeclient = sockets.size()-1; - } else { - // exceeded max connections - return -1; - } - } - return freeclient; -} -static int setDescriptors() { - //Reset all file descriptors - FD_ZERO(&sfdread); - FD_ZERO(&sfderror); - int n = 0; - //Set file descriptor for the listening socket. - if (ssock) { - FD_SET(ssock, &sfdread); - FD_SET(ssock, &sfderror); - n = ssock; - } - //Set the file descriptors for each client - for (auto s : sockets) { - if (s != NULL && s->isConnected()) { - if (s->_socket() > n) { - n = s->_socket(); - } - FD_SET(s->_socket(), &sfdread); - FD_SET(s->_socket(), &sfderror); - } - } - return n; -} -static int tcpListen(URI &uri) { - //std::cerr << "TCP Listen: " << uri.getHost() << " : " << uri.getPort() << std::endl; - #ifdef WIN32 - WSAData wsaData; - //If Win32 then load winsock - if (WSAStartup(MAKEWORD(1,1), &wsaData) != 0) { - return 1; - } - #endif - ssock = socket(AF_INET, SOCK_STREAM, 0); - if (ssock == INVALID_SOCKET) { - return 1; - } - //Specify listen port and address - slocalAddr.sin_family = AF_INET; - slocalAddr.sin_addr.s_addr = htonl(INADDR_ANY); // TODO, use that given in URI - slocalAddr.sin_port = htons(uri.getPort()); - - int rc = ::bind(ssock, (struct sockaddr*)&slocalAddr, sizeof(slocalAddr)); - - if (rc == SOCKET_ERROR) { - #ifndef WIN32 - close(ssock); - #else - closesocket(ssock); - #endif - ssock = INVALID_SOCKET; - return 1; - } - - //Attempt to start listening for connection requests. - rc = ::listen(ssock, 1); - - if (rc == SOCKET_ERROR) { - #ifndef WIN32 - close(ssock); - #else - closesocket(ssock); - #endif - ssock = INVALID_SOCKET; - return 1; - } - - return 0; -} - -static int wsListen(URI &uri) { - return 1; -} - -static int tcpConnect(URI &uri) { - int rc; - sockaddr_in destAddr; - - //std::cerr << "TCP Connect: " << uri.getHost() << " : " << uri.getPort() << std::endl; - - #ifdef WIN32 - WSAData wsaData; - if (WSAStartup(MAKEWORD(1,1), &wsaData) != 0) { - //ERROR - return INVALID_SOCKET; - } - #endif - - //We want a TCP socket - int csocket = socket(AF_INET, SOCK_STREAM, 0); - - if (csocket == INVALID_SOCKET) { - return INVALID_SOCKET; - } - - #ifdef WIN32 - HOSTENT *host = gethostbyname(uri.getHost().c_str()); - #else - hostent *host = gethostbyname(uri.getHost().c_str()); - #endif - - if (host == NULL) { - #ifndef WIN32 - close(csocket); - #else - closesocket(csocket); - #endif - - std::cerr << "Address not found : " << uri.getHost() << std::endl; - - return INVALID_SOCKET; - } - - destAddr.sin_family = AF_INET; - destAddr.sin_addr.s_addr = ((in_addr *)(host->h_addr))->s_addr; - destAddr.sin_port = htons(uri.getPort()); - - // Make nonblocking - /*long arg = fcntl(csocket, F_GETFL, NULL)); - arg |= O_NONBLOCK; - fcntl(csocket, F_SETFL, arg) < 0)*/ - - rc = ::connect(csocket, (struct sockaddr*)&destAddr, sizeof(destAddr)); - - if (rc < 0) { - if (errno == EINPROGRESS) { - - } else { - #ifndef WIN32 - close(csocket); - #else - closesocket(csocket); - #endif - - std::cerr << "Could not connect" << std::endl; - - return INVALID_SOCKET; - } - } - - // Make blocking again - /*rg = fcntl(csocket, F_GETFL, NULL)); - arg &= (~O_NONBLOCK); - fcntl(csocket, F_SETFL, arg) < 0)*/ - - // Handshake?? - - return csocket; -} - -static int wsConnect(URI &uri) { - return 1; -} - -int ftl::net::raw::listen(const char *pUri) { - URI uri(pUri); - - if (uri.getProtocol() == URI::SCHEME_TCP) { - return tcpListen(uri); - } else if (uri.getProtocol() == URI::SCHEME_WS) { - return wsListen(uri); - } else { - return 1; - } -} - -void ftl::net::raw::stop() { - for (auto s : sockets) { - if (s != NULL) s->close(); - } - - sockets.clear(); - - #ifndef WIN32 - if (ssock != INVALID_SOCKET) close(ssock); - #else - if (ssock != INVALID_SOCKET) closesocket(ssock); - #endif - - ssock = INVALID_SOCKET; -} - -Socket *ftl::net::raw::connect(const char *pUri) { - URI uri(pUri); - - if (uri.getProtocol() == URI::SCHEME_TCP) { - int csock = tcpConnect(uri); - Socket *s = new Socket(csock, pUri); - int fs = freeSocket(); - if (fs >= 0) { - sockets[fs] = s; - return s; - } else { - return NULL; - } - } else if (uri.getProtocol() == URI::SCHEME_WS) { - wsConnect(uri); - return NULL; - } else { - return NULL; - } -} - -int ftl::net::raw::run(bool blocking) { - timeval block; - int n; - int selres = 1; - - //if (ssock == INVALID_SOCKET) return 1; - - bool active = true; - bool repeat = false; - - while (active || repeat) { - n = setDescriptors(); - - //Wait for a network event or timeout in 3 seconds - block.tv_sec = (repeat) ? 0 : 3; - block.tv_usec = 0; - selres = select(n+1, &sfdread, 0, &sfderror, &block); - - repeat = false; - active = blocking; - - //Some kind of error occured, it is usually possible to recover from this. - if (selres <= 0) { - return 1; - } - - //If connection request is waiting - if (FD_ISSET(ssock, &sfdread)) { - int rsize = sizeof(sockaddr_storage); - sockaddr_storage addr; - int freeclient = freeSocket(); - - if (freeclient >= 0) { - // TODO Limit connection rate or allow a pause in accepting - // TODO Send auto reject message under heavy load - - //Finally accept this client connection. - int csock = accept(ssock, (sockaddr*)&addr, (socklen_t*)&rsize); - - if (csock != INVALID_SOCKET) { - Socket *sock = new Socket(csock, NULL); - sockets[freeclient] = sock; - - //Save the ip address - // deal with both IPv4 and IPv6: - if (addr.ss_family == AF_INET) { - struct sockaddr_in *s = (struct sockaddr_in *)&addr; - //port = ntohs(s->sin_port); - inet_ntop(AF_INET, &s->sin_addr, sock->m_addr, INET6_ADDRSTRLEN); - } else { // AF_INET6 - struct sockaddr_in6 *s = (struct sockaddr_in6 *)&addr; - //port = ntohs(s->sin6_port); - inet_ntop(AF_INET6, &s->sin6_addr, sock->m_addr, INET6_ADDRSTRLEN); - } - } - } - } - - //Also check each clients socket to see if any messages or errors are waiting - for (auto s : sockets) { - if (s != NULL && s->isConnected()) { - //If message received from this client then deal with it - if (FD_ISSET(s->_socket(), &sfdread)) { - repeat |= s->data(); - //An error occured with this client. - } else if (FD_ISSET(s->_socket(), &sfderror)) { - s->error(); - } - } - } - } - - return 1; -} - -int Socket::close() { - if (isConnected()) { - #ifndef WIN32 - ::close(m_sock); - #else - closesocket(m_sock); - #endif - m_sock = INVALID_SOCKET; - - // Attempt auto reconnect? - } - return 0; -} - -void Socket::error() { - int err; - uint32_t optlen = sizeof(err); - getsockopt(m_sock, SOL_SOCKET, SO_ERROR, &err, &optlen); - - std::cerr << "GOT A SOCKET ERROR : " << err << std::endl; - //close(); -} - -bool Socket::data() { - //std::cerr << "GOT SOCKET DATA" << std::endl; - - //Read data from socket - size_t n = 0; - uint32_t len = 0; - - if (m_pos < 4) { - n = 4 - m_pos; - } else { - len = *(int*)m_buffer; - n = len+4-m_pos; - } - - while (m_pos < len+4) { - if (len > MAX_MESSAGE) { - close(); - return false; // Prevent DoS - } - - const int rc = recv(m_sock, m_buffer+m_pos, n, 0); - - if (rc > 0) { - m_pos += static_cast<size_t>(rc); - - if (m_pos < 4) { - n = 4 - m_pos; - } else { - len = *(int*)m_buffer; - n = len+4-m_pos; - } - } else if (rc == EWOULDBLOCK || rc == 0) { - // Data not yet available - return false; - } else { - // Close socket due to error - close(); - return false; - } - } - - // All data available - if (m_handler) { - uint32_t service = ((uint32_t*)m_buffer)[1]; - auto d = std::string(m_buffer+8, len-4); - //std::cerr << "DATA : " << service << " -> " << d << std::endl; - m_handler(service, d); - } - - m_pos = 0; - - return true; -} - -Socket::Socket(int s, const char *uri) : m_uri(uri), m_sock(s), m_pos(0) { - // Allocate buffer - m_buffer = new char[BUFFER_SIZE]; -} - -Socket::~Socket() { - // Delete socket buffer - delete [] m_buffer; -} diff --git a/net/src/socket.cpp b/net/src/socket.cpp new file mode 100644 index 0000000000000000000000000000000000000000..c2d81c172859a7361d24fe68ecb56363f583ceb0 --- /dev/null +++ b/net/src/socket.cpp @@ -0,0 +1,214 @@ +#include <ftl/uri.hpp> +#include <ftl/net/socket.hpp> + +#ifndef WIN32 +#include <unistd.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <netdb.h> +#include <arpa/inet.h> +#define INVALID_SOCKET -1 +#define SOCKET_ERROR -1 +#endif + +#ifdef WIN32 +#include <windows.h> +#include <winsock.h> +typedef int socklen_t; +#define MSG_WAITALL 0 +#endif + +#include <iostream> + +using namespace ftl; +using ftl::net::Socket; +using namespace std; + +static int tcpConnect(URI &uri) { + int rc; + sockaddr_in destAddr; + + //std::cerr << "TCP Connect: " << uri.getHost() << " : " << uri.getPort() << std::endl; + + #ifdef WIN32 + WSAData wsaData; + if (WSAStartup(MAKEWORD(1,1), &wsaData) != 0) { + //ERROR + return INVALID_SOCKET; + } + #endif + + //We want a TCP socket + int csocket = socket(AF_INET, SOCK_STREAM, 0); + + if (csocket == INVALID_SOCKET) { + return INVALID_SOCKET; + } + + #ifdef WIN32 + HOSTENT *host = gethostbyname(uri.getHost().c_str()); + #else + hostent *host = gethostbyname(uri.getHost().c_str()); + #endif + + if (host == NULL) { + #ifndef WIN32 + close(csocket); + #else + closesocket(csocket); + #endif + + std::cerr << "Address not found : " << uri.getHost() << std::endl; + + return INVALID_SOCKET; + } + + destAddr.sin_family = AF_INET; + destAddr.sin_addr.s_addr = ((in_addr *)(host->h_addr))->s_addr; + destAddr.sin_port = htons(uri.getPort()); + + // Make nonblocking + /*long arg = fcntl(csocket, F_GETFL, NULL)); + arg |= O_NONBLOCK; + fcntl(csocket, F_SETFL, arg) < 0)*/ + + rc = ::connect(csocket, (struct sockaddr*)&destAddr, sizeof(destAddr)); + + if (rc < 0) { + if (errno == EINPROGRESS) { + + } else { + #ifndef WIN32 + close(csocket); + #else + closesocket(csocket); + #endif + + std::cerr << "Could not connect" << std::endl; + + return INVALID_SOCKET; + } + } + + // Make blocking again + /*rg = fcntl(csocket, F_GETFL, NULL)); + arg &= (~O_NONBLOCK); + fcntl(csocket, F_SETFL, arg) < 0)*/ + + // Handshake?? + + return csocket; +} + +static int wsConnect(URI &uri) { + return 1; +} + +Socket::Socket(int s) : m_sock(s), m_pos(0) { + // TODO Get the remote address. + m_valid = true; +} + +Socket::Socket(const char *pUri) : m_uri(pUri), m_pos(0) { + // Allocate buffer + m_buffer = new char[BUFFER_SIZE]; + + URI uri(pUri); + + m_valid = false; + m_sock = INVALID_SOCKET; + + if (uri.getProtocol() == URI::SCHEME_TCP) { + m_sock = tcpConnect(uri); + m_valid = true; + } else if (uri.getProtocol() == URI::SCHEME_WS) { + wsConnect(uri); + } else { + } +} + +int Socket::close() { + if (isConnected()) { + #ifndef WIN32 + ::close(m_sock); + #else + closesocket(m_sock); + #endif + m_sock = INVALID_SOCKET; + + // Attempt auto reconnect? + } + return 0; +} + +void Socket::error() { + int err; + uint32_t optlen = sizeof(err); + getsockopt(m_sock, SOL_SOCKET, SO_ERROR, &err, &optlen); + + std::cerr << "GOT A SOCKET ERROR : " << err << std::endl; + //close(); +} + +bool Socket::data() { + //std::cerr << "GOT SOCKET DATA" << std::endl; + + //Read data from socket + size_t n = 0; + uint32_t len = 0; + + if (m_pos < 4) { + n = 4 - m_pos; + } else { + len = *(int*)m_buffer; + n = len+4-m_pos; + } + + while (m_pos < len+4) { + if (len > MAX_MESSAGE) { + close(); + return false; // Prevent DoS + } + + const int rc = recv(m_sock, m_buffer+m_pos, n, 0); + + if (rc > 0) { + m_pos += static_cast<size_t>(rc); + + if (m_pos < 4) { + n = 4 - m_pos; + } else { + len = *(int*)m_buffer; + n = len+4-m_pos; + } + } else if (rc == EWOULDBLOCK || rc == 0) { + // Data not yet available + return false; + } else { + // Close socket due to error + close(); + return false; + } + } + + // All data available + if (m_handler) { + uint32_t service = ((uint32_t*)m_buffer)[1]; + auto d = std::string(m_buffer+8, len-4); + //std::cerr << "DATA : " << service << " -> " << d << std::endl; + m_handler(service, d); + } + + m_pos = 0; + + return true; +} + +Socket::~Socket() { + close(); + + // Delete socket buffer + delete [] m_buffer; +} + diff --git a/net/test/CMakeLists.txt b/net/test/CMakeLists.txt index cad9d4292f811e382bd1a847e72158de1675ed66..7d9a3198635257280287ad5408a87a1d3c2ea868 100644 --- a/net/test/CMakeLists.txt +++ b/net/test/CMakeLists.txt @@ -1,7 +1,9 @@ add_executable(tests EXCLUDE_FROM_ALL ./tests.cpp ./net_raw.cpp - ../src/raw.cpp + ../src/net.cpp + ../src/socket.cpp + ../src/listener.cpp ./ice.cpp ../src/ice.cpp ./uri.cpp diff --git a/net/test/net_raw.cpp b/net/test/net_raw.cpp index 1aafbc4b39803e0d94e6efb0fcd5a59d5541e7e6..e5b443cff1990063e3f8a468fce1e3c793300ae4 100644 --- a/net/test/net_raw.cpp +++ b/net/test/net_raw.cpp @@ -1,7 +1,28 @@ #include "catch.hpp" #include <string.h> -#include <ftl/net/raw.hpp> +#include <ftl/net.hpp> +#include <ftl/net/socket.hpp> +#include <ftl/net/listener.hpp> #include <iostream> +#include <memory> + +#ifndef WIN32 +#include <unistd.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <netdb.h> +#include <arpa/inet.h> +#define INVALID_SOCKET -1 +#define SOCKET_ERROR -1 +#endif + +#ifdef WIN32 +#include <windows.h> +#include <winsock.h> +typedef int socklen_t; +#define MSG_WAITALL 0 +#endif // ---- MOCK Server Code ------------------------------------------------------- @@ -14,7 +35,8 @@ static fd_set sfdread; static fd_set sfderror; static sockaddr_in slocalAddr; -using ftl::net::raw::Socket; +using ftl::net::Socket; +using std::shared_ptr; void fin_server() { if (!server) return; @@ -147,6 +169,8 @@ void accept_connection() { if (selres > 0 && FD_ISSET(ssock, &sfdread)) { int rsize = sizeof(sockaddr_storage); sockaddr_storage addr; + + std::cout << "Accepted connection!" << std::endl; //Finally accept this client connection. csock = accept(ssock, (sockaddr*)&addr, (socklen_t*)&rsize); @@ -161,33 +185,33 @@ TEST_CASE("net::connect()", "[net]") { init_server(); REQUIRE(ssock != INVALID_SOCKET); - Socket *sock = NULL; + shared_ptr<Socket> sock = nullptr; SECTION("valid tcp connection using ipv4") { - sock = ftl::net::raw::connect("tcp://127.0.0.1:7077"); - REQUIRE(sock != NULL); + sock = ftl::net::connect("tcp://127.0.0.1:7077"); + REQUIRE(sock != nullptr); accept_connection(); } SECTION("valid tcp connection using hostname") { - sock = ftl::net::raw::connect("tcp://localhost:7077"); - REQUIRE(sock != NULL); + sock = ftl::net::connect("tcp://localhost:7077"); + REQUIRE(sock->isValid()); accept_connection(); } SECTION("invalid protocol") { - sock = ftl::net::raw::connect("http://127.0.0.1:7077"); - REQUIRE(sock == NULL); + sock = ftl::net::connect("http://127.0.0.1:7077"); + REQUIRE(!sock->isValid()); } SECTION("empty uri") { - sock = ftl::net::raw::connect(""); - REQUIRE(sock == NULL); + sock = ftl::net::connect(""); + REQUIRE(!sock->isValid()); } SECTION("null uri") { - sock = ftl::net::raw::connect(NULL); - REQUIRE(sock == NULL); + sock = ftl::net::connect(NULL); + REQUIRE(!sock->isValid()); } // Disabled due to long timeout @@ -199,13 +223,13 @@ TEST_CASE("net::connect()", "[net]") { }*/ SECTION("incorrect dns address") { - sock = ftl::net::raw::connect("tcp://xryyrrgrtgddgr.com:7077"); - REQUIRE(sock != NULL); + sock = ftl::net::connect("tcp://xryyrrgrtgddgr.com:7077"); + REQUIRE(sock->isValid()); REQUIRE(sock->isConnected() == false); - sock = NULL; + sock = nullptr; } - if (sock) { + if (sock && sock->isValid()) { REQUIRE(sock->isConnected()); REQUIRE(csock != INVALID_SOCKET); sock->close(); @@ -216,26 +240,26 @@ TEST_CASE("net::connect()", "[net]") { TEST_CASE("net::listen()", "[net]") { SECTION("tcp any interface") { - REQUIRE( ftl::net::raw::listen("tcp://*:7078") == 0); + REQUIRE( ftl::net::listen("tcp://*:7078")->isListening() ); SECTION("can connect to listening socket") { - Socket *sock = ftl::net::raw::connect("tcp://127.0.0.1:7078"); - REQUIRE(sock != NULL); + shared_ptr<Socket> sock = ftl::net::connect("tcp://127.0.0.1:7078"); + REQUIRE(sock->isValid()); REQUIRE(sock->isConnected()); - ftl::net::raw::run(false); + ftl::net::wait(); // TODO Need way of knowing about connection } - ftl::net::raw::stop(); + ftl::net::stop(); } } TEST_CASE("Socket.onMessage()", "[net]") { // Need a fake server... init_server(); - Socket *sock = ftl::net::raw::connect("tcp://127.0.0.1:7077"); - REQUIRE(sock != NULL); + shared_ptr<Socket> sock = ftl::net::connect("tcp://127.0.0.1:7077"); + REQUIRE(sock->isValid()); REQUIRE(sock->isConnected()); accept_connection(); @@ -250,7 +274,7 @@ TEST_CASE("Socket.onMessage()", "[net]") { msg = true; }); - ftl::net::raw::run(false); + ftl::net::wait(); REQUIRE(msg); } @@ -265,7 +289,7 @@ TEST_CASE("Socket.onMessage()", "[net]") { msg = true; }); - ftl::net::raw::run(false); + ftl::net::wait(); REQUIRE(msg); } @@ -282,7 +306,7 @@ TEST_CASE("Socket.onMessage()", "[net]") { msg++; }); - ftl::net::raw::run(false); + ftl::net::wait(); REQUIRE(msg == 2); } @@ -297,7 +321,7 @@ TEST_CASE("Socket.onMessage()", "[net]") { sock->close(); - ftl::net::raw::run(false); + ftl::net::wait(); REQUIRE(!msg); } diff --git a/p2p-rm/src/blob.cpp b/p2p-rm/src/blob.cpp deleted file mode 100644 index 440832de1fc655427a0e53a0f49187440449f470..0000000000000000000000000000000000000000 --- a/p2p-rm/src/blob.cpp +++ /dev/null @@ -1,32 +0,0 @@ -#include <memory.h> - -/*void ftl::rm::Blob::write(size_t offset, const char *data, size_t size) { - // Sanity check - if (offset + size > size_) throw -1; - - // If local, write direct to data_, otherwise send over network - if (socket_ != NULL) { - Header header{blobid_,static_cast<uint32_t>(offset),static_cast<uint32_t>(size)}; - - // Send over network - socket_->send2(MEMORY_WRITE, std::string((const char*)&header,sizeof(header)), - std::string(data,size)); - } else { - // Copy locally - memcpy(data_+offset, data, size); - } -}*/ - -/*void ftl::rm::Blob::read(size_t offset, char *data, size_t size) { - // Sanity check - if (offset + size > size_) throw -1; - - // If local, write direct to data_, otherwise send over network - if (socket_ != NULL) { - - } else { - // Copy locally - memcpy(data,data_+offset, size); - } -}*/ -