diff --git a/net/CMakeLists.txt b/net/CMakeLists.txt index 98f2364a31ac3841ebf6d5f40ad114a2fbc116c3..a372bcfd036710e528a72dd3c6cf1cb124c44624 100644 --- a/net/CMakeLists.txt +++ b/net/CMakeLists.txt @@ -2,7 +2,7 @@ cmake_minimum_required (VERSION 2.8.11) include (CheckIncludeFile) include (CheckFunctionExists) -project (libftlnet) +project (ftlnet) #find_package(PkgConfig) #pkg_check_modules(GTKMM gtkmm-3.0) @@ -25,6 +25,7 @@ set(FTLSOURCE src/net.cpp src/listener.cpp src/socket.cpp + src/dispatcher.cpp ) check_include_file("uriparser/Uri.h" HAVE_URI_H) @@ -33,8 +34,8 @@ if(NOT HAVE_URI_H) endif() check_function_exists(uriParseSingleUriA HAVE_URIPARSESINGLE) -add_library(libftlnet ${FTLSOURCE}) -target_include_directories(libftlnet PUBLIC ${PROJECT_SOURCE_DIR}/include) -target_link_libraries(libftlnet pthread) +add_library(ftlnet ${FTLSOURCE}) +target_include_directories(ftlnet PUBLIC ${PROJECT_SOURCE_DIR}/include) +target_link_libraries(ftlnet pthread) ADD_SUBDIRECTORY(test) diff --git a/net/include/ftl/net.hpp b/net/include/ftl/net.hpp index 8a3c8f90fdf7207abd293c88a5687dba93942984..9d1178100a4d98df4a38860f1c7b2292a3956e3b 100644 --- a/net/include/ftl/net.hpp +++ b/net/include/ftl/net.hpp @@ -31,7 +31,7 @@ std::shared_ptr<Socket> connect(const char *uri); * * @param async Use a separate thread. */ -bool run(bool async); +bool run(bool async=false); /** * Wait for a bunch of messages, but return once at least one has been diff --git a/net/include/ftl/net/handlers.hpp b/net/include/ftl/net/handlers.hpp index 3ddf3def2e381eaf9de16778efea2db17547733b..e1af54a783cc48d68fecfaa1fd7413e5716ae698 100644 --- a/net/include/ftl/net/handlers.hpp +++ b/net/include/ftl/net/handlers.hpp @@ -16,7 +16,7 @@ typedef std::function<void(int)> sockdisconnecthandler_t; typedef std::function<void(std::shared_ptr<Socket>, int, std::string&)> datahandler_t; typedef std::function<void(std::shared_ptr<Socket>, int)> errorhandler_t; -typedef std::function<void(std::shared_ptr<Socket>)> connecthandler_t; +typedef std::function<void(std::shared_ptr<Socket> &)> connecthandler_t; typedef std::function<void(std::shared_ptr<Socket>)> disconnecthandler_t; }; diff --git a/net/include/ftl/net/listener.hpp b/net/include/ftl/net/listener.hpp index 55e49b3e10f6a10cf91740dbd920b99f0be2b1e9..e6f74a8bc4c7065e49a22e7dfd4283ba5a1575c3 100644 --- a/net/include/ftl/net/listener.hpp +++ b/net/include/ftl/net/listener.hpp @@ -27,7 +27,7 @@ class Listener { void close(); int _socket() { return descriptor_; } - void connection(std::shared_ptr<Socket> s); + void connection(std::shared_ptr<Socket> &s); void onConnection(connecthandler_t h) { handler_connect_.push_back(h); }; private: diff --git a/net/include/ftl/net/socket.hpp b/net/include/ftl/net/socket.hpp index 01eb87f036b4394d4a812a0bc0e3d8b4b057ddc5..630f4821a91fdb858e68f08641fdb3e6a01fc552 100644 --- a/net/include/ftl/net/socket.hpp +++ b/net/include/ftl/net/socket.hpp @@ -1,6 +1,7 @@ #ifndef _FTL_NET_SOCKET_HPP_ #define _FTL_NET_SOCKET_HPP_ +#include <glog/logging.h> #include <ftl/net.hpp> #include <ftl/net/handlers.hpp> #include <ftl/net/dispatcher.hpp> @@ -46,7 +47,11 @@ class Socket { bool isConnected() { return m_sock != INVALID_SOCKET; }; bool isValid() { return m_valid; }; + std::string getURI() const { return m_uri; }; + /** + * Bind a function to a RPC call name. + */ template <typename F> void bind(const std::string &name, F func) { //disp_.enforce_unique_name(name); @@ -54,6 +59,11 @@ class Socket { typename ftl::internal::func_kind_info<F>::args_kind()); } + /** + * Bind a function to a raw message type. + */ + void bind(uint32_t service, std::function<void(std::shared_ptr<Socket>,const std::string&)> func); + template <typename T, typename... ARGS> T call(const std::string &name, ARGS... args) { bool hasreturned = false; @@ -82,6 +92,8 @@ class Socket { auto rpcid = rpcid__++; auto call_obj = std::make_tuple(0,rpcid,name,args_obj); + LOG(INFO) << "RPC call sent: " << name; + std::stringstream buf; msgpack::pack(buf, call_obj); @@ -101,16 +113,12 @@ class Socket { bool data(); void error(); - protected: - - char m_addr[INET6_ADDRSTRLEN]; - private: - const char *m_uri; + std::string m_uri; int m_sock; size_t m_pos; char *m_buffer; - sockdatahandler_t m_handler; + sockdatahandler_t m_handlers; bool m_valid; std::map<int, std::function<void(msgpack::object&)>> callbacks_; ftl::net::Dispatcher disp_; diff --git a/net/src/dispatcher.cpp b/net/src/dispatcher.cpp index 9a3e815ea70b7e376d7d43a1589ee68ba84b21c7..300a89ea2e4111db9338fadb9e4908cfa0bd5400 100644 --- a/net/src/dispatcher.cpp +++ b/net/src/dispatcher.cpp @@ -1,3 +1,4 @@ +#include <glog/logging.h> #include <ftl/net/dispatcher.hpp> #include <ftl/net/socket.hpp> #include <iostream> @@ -26,7 +27,7 @@ void ftl::net::Dispatcher::dispatch(const msgpack::object &msg) { case 4: dispatch_call(msg); break; default: - std::cout << "Unrecognised msgpack : " << msg.via.array.size << std::endl; + LOG(ERROR) << "Unrecognised msgpack : " << msg.via.array.size; return; } } @@ -42,6 +43,8 @@ void ftl::net::Dispatcher::dispatch_call(const msgpack::object &msg) { auto &&id = std::get<1>(the_call); auto &&name = std::get<2>(the_call); auto &&args = std::get<3>(the_call); + + LOG(INFO) << "RPC call received: " << name; auto it_func = funcs_.find(name); diff --git a/net/src/listener.cpp b/net/src/listener.cpp index 4b38ff4b37904c789dba50d3f91e769c74c339fe..028e2ec55825dcefd681a46ca8d50583798912fd 100644 --- a/net/src/listener.cpp +++ b/net/src/listener.cpp @@ -1,3 +1,5 @@ +#include <glog/logging.h> + #include <ftl/uri.hpp> #include <ftl/net/listener.hpp> #include <iostream> @@ -39,6 +41,10 @@ int tcpListen(URI &uri) { if (ssock == INVALID_SOCKET) { return INVALID_SOCKET; } + + int enable = 1; + if (setsockopt(ssock, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0) + LOG(ERROR) << "setsockopt(SO_REUSEADDR) failed"; //Specify listen port and address sockaddr_in slocalAddr; @@ -55,6 +61,8 @@ int tcpListen(URI &uri) { closesocket(ssock); #endif ssock = INVALID_SOCKET; + + LOG(ERROR) << "Could not bind to " << uri.getBaseURI(); return INVALID_SOCKET; } @@ -68,6 +76,8 @@ int tcpListen(URI &uri) { closesocket(ssock); #endif ssock = INVALID_SOCKET; + + LOG(ERROR) << "Could not listen on " << uri.getBaseURI(); return INVALID_SOCKET; } @@ -98,7 +108,7 @@ Listener::~Listener() { close(); } -void Listener::connection(shared_ptr<Socket> s) { +void Listener::connection(shared_ptr<Socket> &s) { for (auto h : handler_connect_) h(s); } diff --git a/net/src/net.cpp b/net/src/net.cpp index 3826df6bbce9be8251f71485d8edaad96235fbf3..d84b6ecc5649055263e24bb9c9fa9365b0ad32eb 100644 --- a/net/src/net.cpp +++ b/net/src/net.cpp @@ -120,7 +120,9 @@ bool _run(bool blocking, bool nodelay) { //Some kind of error occured, it is usually possible to recover from this. if (selres <= 0) { - return false; + //std::cout << "SELECT ERROR" << std::endl; + //return false; + continue; } //If connection request is waiting @@ -139,14 +141,14 @@ bool _run(bool blocking, bool nodelay) { int csock = accept(l->_socket(), (sockaddr*)&addr, (socklen_t*)&rsize); if (csock != INVALID_SOCKET) { - shared_ptr<Socket> sock(new Socket(csock)); + auto sock = make_shared<Socket>(csock); //sockets[freeclient] = sock; - sockets.push_back(sock); - // Call connection handlers l->connection(sock); + sockets.push_back(std::move(sock)); + // TODO Save the ip address // deal with both IPv4 and IPv6: /*if (addr.ss_family == AF_INET) { diff --git a/net/src/socket.cpp b/net/src/socket.cpp index 2fd886031d093e836908a44192b526284ff82841..693a25cf4b3779d3a2afd1f7f81726a9fb6c5f51 100644 --- a/net/src/socket.cpp +++ b/net/src/socket.cpp @@ -1,3 +1,5 @@ +#include <glog/logging.h> + #include <ftl/uri.hpp> #include <ftl/net/socket.hpp> @@ -61,7 +63,7 @@ static int tcpConnect(URI &uri) { closesocket(csocket); #endif - std::cerr << "Address not found : " << uri.getHost() << std::endl; + LOG(ERROR) << "Address not found : " << uri.getHost() << std::endl; return INVALID_SOCKET; } @@ -87,7 +89,7 @@ static int tcpConnect(URI &uri) { closesocket(csocket); #endif - std::cerr << "Could not connect" << std::endl; + LOG(ERROR) << "Could not connect to " << uri.getBaseURI(); return INVALID_SOCKET; } @@ -108,9 +110,31 @@ static int wsConnect(URI &uri) { } Socket::Socket(int s) : m_sock(s), m_pos(0), disp_(this) { - // TODO Get the remote address. m_valid = true; m_buffer = new char[BUFFER_SIZE]; + + sockaddr_storage addr; + int rsize = sizeof(sockaddr_storage); + if (getpeername(s, (sockaddr*)&addr, (socklen_t*)&rsize) == 0) { + char addrbuf[INET6_ADDRSTRLEN]; + int port; + + if (addr.ss_family == AF_INET) { + struct sockaddr_in *s = (struct sockaddr_in *)&addr; + //port = ntohs(s->sin_port); + inet_ntop(AF_INET, &s->sin_addr, addrbuf, INET6_ADDRSTRLEN); + port = s->sin_port; + } else { // AF_INET6 + struct sockaddr_in6 *s = (struct sockaddr_in6 *)&addr; + //port = ntohs(s->sin6_port); + inet_ntop(AF_INET6, &s->sin6_addr, addrbuf, INET6_ADDRSTRLEN); + port = s->sin6_port; + } + + m_uri = std::string("tcp://")+addrbuf; + m_uri += ":"; + m_uri += std::to_string(port); + } } Socket::Socket(const char *pUri) : m_uri(pUri), m_pos(0), disp_(this) { @@ -150,7 +174,7 @@ void Socket::error() { uint32_t optlen = sizeof(err); getsockopt(m_sock, SOL_SOCKET, SO_ERROR, &err, &optlen); - std::cerr << "GOT A SOCKET ERROR : " << err << std::endl; + LOG(ERROR) << "Socket: " << m_uri << " - error " << err; //close(); } @@ -171,7 +195,7 @@ bool Socket::data() { while (m_pos < len+4) { if (len > MAX_MESSAGE) { close(); - std::cout << "Length is too big" << std::endl; + LOG(ERROR) << "Socket: " << m_uri << " - message attack"; return false; // Prevent DoS } @@ -188,10 +212,10 @@ bool Socket::data() { } } else if (rc == EWOULDBLOCK || rc == 0) { // Data not yet available - std::cout << "No data to read" << std::endl; + //std::cout << "No data to read" << std::endl; return false; } else { - std::cout << "Socket error" << std::endl; + LOG(ERROR) << "Socket: " << m_uri << " - error " << rc; // Close socket due to error close(); return false; @@ -220,8 +244,13 @@ bool Socket::data() { //auto &&err = std::get<2>(the_result); auto &&res = std::get<3>(the_result); - if (callbacks_.count(id) > 0) callbacks_[id](res); - else std::cout << "NO CALLBACK FOUND FOR RPC RESULT" << std::endl; + if (callbacks_.count(id) > 0) { + LOG(INFO) << "Received return RPC value"; + callbacks_[id](res); + callbacks_.erase(id); + } else { + LOG(ERROR) << "Missing RPC callback for result"; + } } else { if (m_handler) m_handler(service, d); } @@ -248,7 +277,26 @@ int Socket::send(uint32_t service, const std::string &data) { return 0; } +int Socket::send2(uint32_t service, const std::string &data1, const std::string &data2) { + ftl::net::Header h; + h.size = data1.size()+4+data2.size(); + h.service = service; + + iovec vec[3]; + vec[0].iov_base = &h; + vec[0].iov_len = sizeof(h); + vec[1].iov_base = const_cast<char*>(data1.data()); + vec[1].iov_len = data1.size(); + vec[2].iov_base = const_cast<char*>(data2.data()); + vec[2].iov_len = data2.size(); + + ::writev(m_sock, &vec[0], 3); + + return 0; +} + Socket::~Socket() { + std::cerr << "DESTROYING SOCKET" << std::endl; close(); // Delete socket buffer diff --git a/p2p-rm/CMakeLists.txt b/p2p-rm/CMakeLists.txt index f13d00cef1fd2f847c8976434535ded1c5db6608..a374b86f6d68339667faf0125cf5f71a351cb46d 100644 --- a/p2p-rm/CMakeLists.txt +++ b/p2p-rm/CMakeLists.txt @@ -38,13 +38,14 @@ check_function_exists(uriParseSingleUriA HAVE_URIPARSESINGLE) find_path(FTL_NET "ftl/net.hpp") if (NOT FTL_NET) message(STATUS "FTL Net is not installed") - find_path(FTL_NET "ftl/net.hpp" PATHS ../net/include) - if (NOT FTL_NET) + find_path(FTL_NET_DEV "ftl/net.hpp" PATHS ../net/include) + if (NOT FTL_NET_DEV) message(FATAL_ERROR "FTL Net is required") endif() #TODO Ensure this is built - message(STATUS ${FTL_NET}) - include_directories(${FTL_NET}) + message(STATUS ${FTL_NET_DEV}) + include_directories(${FTL_NET_DEV}) + link_directories(../net/build) endif() target_include_directories(ftl-p2prm PUBLIC ${PROJECT_SOURCE_DIR}/include) diff --git a/p2p-rm/include/ftl/p2p-rm/blob.hpp b/p2p-rm/include/ftl/p2p-rm/blob.hpp index 8dbe537f84769fc2da19d66cacb0da472418cd13..90ef69c4b4b01f920f4bc73e9231bfca02bd2455 100644 --- a/p2p-rm/include/ftl/p2p-rm/blob.hpp +++ b/p2p-rm/include/ftl/p2p-rm/blob.hpp @@ -29,6 +29,8 @@ struct Blob { mutable std::shared_mutex mutex_; }; +void _sync(const Blob &blob, size_t offset, size_t size); + } } diff --git a/p2p-rm/include/ftl/p2p-rm/cluster.hpp b/p2p-rm/include/ftl/p2p-rm/cluster.hpp index f4e7d4860d7e493999e36e408e874757ea5e5285..9a95c05834090066f0972f063b26005d257c6ab1 100644 --- a/p2p-rm/include/ftl/p2p-rm/cluster.hpp +++ b/p2p-rm/include/ftl/p2p-rm/cluster.hpp @@ -3,13 +3,17 @@ #include "ftl/p2p-rm/mapped_ptr.hpp" #include "ftl/p2p-rm/internal.hpp" +#include <ftl/p2p-rm/protocol.hpp> #include <ftl/uri.hpp> +#include <ftl/net/socket.hpp> #include <type_traits> #include <memory> #include <vector> #include <map> +#include <tuple> +#include <msgpack.hpp> namespace ftl { namespace net { @@ -21,8 +25,6 @@ namespace rm { class Blob; -void _sync(const Blob &blob, size_t offset, size_t size); - class Cluster { public: Cluster(const ftl::URI &uri, std::shared_ptr<ftl::net::Listener> l); @@ -93,18 +95,22 @@ class Cluster { /** * Connect to a new peer node using the specified socket. */ - void addPeer(std::shared_ptr<ftl::net::Socket> s); + void addPeer(std::shared_ptr<ftl::net::Socket> &s); /** * Connect to a new peer using a URL string. */ void addPeer(const char *url); + std::string getOwner(const char *uri); + private: + static int rpcid_; std::string root_; std::shared_ptr<ftl::net::Listener> listener_; std::vector<std::shared_ptr<ftl::net::Socket>> peers_; std::map<std::string, ftl::rm::Blob*> blobs_; + std::map<int,std::vector<std::tuple<std::shared_ptr<ftl::net::Socket>,std::string>>> rpc_results_; ftl::rm::Blob *_lookup(const char *uri); Blob *_create(const char *uri, char *addr, size_t size, size_t count, diff --git a/p2p-rm/include/ftl/p2p-rm/mapped_ptr.hpp b/p2p-rm/include/ftl/p2p-rm/mapped_ptr.hpp index 38843ea5d59ac579cba43343d5327a6198211fe1..ef3419d6650033abaa71ee3f8d8634c841d61392 100644 --- a/p2p-rm/include/ftl/p2p-rm/mapped_ptr.hpp +++ b/p2p-rm/include/ftl/p2p-rm/mapped_ptr.hpp @@ -102,7 +102,8 @@ namespace ftl { write_ref &operator=(const T &value) { //ptr_.blob->write(ptr_.offset, (char*)(&value), sizeof(T)); *((T*)&ptr_.blob->data_[ptr_.offset]) = value; - ptr_.blob->sync(ptr_.offset, sizeof(T)); + //ptr_.blob->sync(ptr_.offset, sizeof(T)); + ftl::rm::_sync(*ptr_.blob, ptr_.offset, sizeof(T)); return *this; } diff --git a/p2p-rm/include/ftl/p2p-rm/protocol.hpp b/p2p-rm/include/ftl/p2p-rm/protocol.hpp index 888106b88a7ebcf5ae61544d732d69968531f866..fda11ae670942778bbe8da76d7fc4da9a5945ba5 100644 --- a/p2p-rm/include/ftl/p2p-rm/protocol.hpp +++ b/p2p-rm/include/ftl/p2p-rm/protocol.hpp @@ -10,6 +10,7 @@ #define P2P_NOTIFYOWNERSHIP (FTL_PROTOCOL_P2P + 4) #define P2P_URISEARCH (FTL_PROTOCOL_P2P + 5) #define P2P_PEERSEARCH (FTL_PROTOCOL_P2P + 6) +#define P2P_RPC_CALL (FTL_PROTOCOL_P2P + 7) #endif // _FTL_P2P_RM_PROTOCOL_HPP_ diff --git a/p2p-rm/src/blob.cpp b/p2p-rm/src/blob.cpp new file mode 100644 index 0000000000000000000000000000000000000000..600cff69abb625c4c481008479f2cee22882357f --- /dev/null +++ b/p2p-rm/src/blob.cpp @@ -0,0 +1,31 @@ +#include <ftl/p2p-rm/blob.hpp> +#include <ftl/net/socket.hpp> +#include <ftl/p2p-rm/protocol.hpp> + +struct SyncHeader { + uint32_t blobid; + uint32_t offset; + uint32_t size; +}; + +void ftl::rm::_sync(const Blob &blob, size_t offset, size_t size) { + // Sanity check + if (offset + size > blob.size_) throw -1; + + // TODO Delay send to collate many write operations? + + if (blob.sockets_.size() > 0) { + SyncHeader header{blob.blobid_,static_cast<uint32_t>(offset),static_cast<uint32_t>(size)}; + + for (auto s : blob.sockets_) { + // Send over network + s->send2(P2P_SYNC, std::string((const char*)&header,sizeof(header)), + std::string(&blob.data_[offset],size)); + } + } +} + +void ftl::rm::Blob::finished() { + +} + diff --git a/p2p-rm/src/cluster.cpp b/p2p-rm/src/cluster.cpp index 3647c62a724da0d43b45d4cea8d3d3a4cd40b6bd..5c8e4b982ee539fa9fd7f01e7a1b17dfe2afaf13 100644 --- a/p2p-rm/src/cluster.cpp +++ b/p2p-rm/src/cluster.cpp @@ -19,13 +19,15 @@ using ftl::URI; using ftl::rm::Blob; using ftl::net::Socket; +int Cluster::rpcid_ = 0; + Cluster::Cluster(const URI &uri, shared_ptr<Listener> l) : listener_(l) { - auto me = this; + //auto me = this; root_ = uri.getHost(); if (l != nullptr) { - l->onConnection([&](shared_ptr<Socket> s) { - me->addPeer(s); + l->onConnection([&](shared_ptr<Socket> &s) { + addPeer(s); }); } } @@ -41,69 +43,70 @@ void Cluster::reset() { blobs_.clear(); } -void Cluster::addPeer(shared_ptr<Socket> p) { - -} - -struct SyncHeader { - uint32_t blobid; - uint32_t offset; - uint32_t size; -}; - -void ftl::rm::_sync(const Blob &blob, size_t offset, size_t size) { - // Sanity check - if (offset + size > blob.size_) throw -1; - - // TODO Delay send to collate many write operations? - - if (blob.sockets_.size() > 0) { - SyncHeader header{blob.blobid_,static_cast<uint32_t>(offset),static_cast<uint32_t>(size)}; +void Cluster::addPeer(shared_ptr<Socket> &p) { + LOG(INFO) << "Peer added: " << p->getURI(); + //auto me = this; + peers_.push_back(p); + /*p->onMessage([&](int service, const std::string &data) { + std::cout << "MSG " << service << std::endl; + });*/ + p->bind("getowner", [](const std::string &uri) -> std::string { + std::cout << "GETOWNER" << std::endl; + return ""; + }); - for (auto s : blob.sockets_) { - // Send over network - s->send2(P2P_SYNC, std::string((const char*)&header,sizeof(header)), - std::string(&blob.data_[offset],size)); - } + // TODO Check ownership of my blobs. + for (auto b : blobs_) { + getOwner(b.first.c_str()); } } +void Cluster::addPeer(const char *url) { + auto sock = ftl::net::connect(url); + addPeer(sock); +} + Blob *Cluster::_lookup(const char *uri) { URI u(uri); if (!u.isValid()) return NULL; if (u.getScheme() != ftl::URI::SCHEME_FTL) return NULL; //if (u.getPathSegment(0) != "memory") return NULL; - if (u.getHost() != root_) { std::cerr << "Non matching host : " << u.getHost() << " - " << root_ << std::endl; return NULL; } + if (u.getHost() != root_) { LOG(ERROR) << "Non matching host : " << u.getHost() << " - " << root_ << std::endl; return NULL; } auto b = blobs_[u.getBaseURI()]; std::cout << "Blob Found for " << u.getBaseURI() << " = " << (b != nullptr) << std::endl; - if (b == nullptr) { - // Must do a p2p search for this URI... - int rpcid = rpcid_++; - - for (auto p : peers_) { - p->send(P2P_FINDOWNER, ftl::net::rpc_pack(rpcid,uri)); - } - - int limit = 10; - while (limit >= 0 && !rpc_results_[rpcid] == nullptr) { - ftl::net::wait(); - limit--; - } - - if (rpc_results[rpcid]) { - // Unpack the data - auto res = rpc_results[rpcid]; - - } else { - // No results; - } + if (!b) { + // Whoops, need to map it first! } return b; } +std::string Cluster::getOwner(const char *uri) { + std::string result; + int count = 0; + + auto f = [&](msgpack::object &r) { + count--; + std::string res = r.as<std::string>(); + if (res.size() > 0) result = res; + }; + + for (auto p : peers_) { + count++; + LOG(INFO) << "Request owner of " << uri << " from " << p->getURI(); + p->async_call("getowner", f, std::string(uri)); + } + + // TODO Limit in case of no return. + while (count > 0) { + ftl::net::wait(); + } + + return result; +} + Blob *Cluster::_create(const char *uri, char *addr, size_t size, size_t count, ftl::rm::flags_t flags, const std::string &tname) { URI u(uri); @@ -120,7 +123,16 @@ Blob *Cluster::_create(const char *uri, char *addr, size_t size, size_t count, b->uri_ = std::string(uri); blobs_[u.getBaseURI()] = b; - // TODO : Perhaps broadcast this new allocation? + std::string o = getOwner(uri); + if (o.size() == 0) { + // I am the owner! + std::cout << "I own " << uri << std::endl; + } else { + std::cout << "I do not own " << uri << std::endl; + } + + //std::cout << owners << std::endl; + return b; } diff --git a/p2p-rm/test/CMakeLists.txt b/p2p-rm/test/CMakeLists.txt index 82c07480af28c7fac5aa2f89d686e16eab6a5919..2ff0402ee43dc1fd17bcf775b85977d2c1eefd03 100644 --- a/p2p-rm/test/CMakeLists.txt +++ b/p2p-rm/test/CMakeLists.txt @@ -4,19 +4,30 @@ enable_testing() add_executable(mapped_ptr EXCLUDE_FROM_ALL ./tests.cpp ./mapped_ptr.cpp + ../src/blob.cpp ) add_executable(p2p_rm EXCLUDE_FROM_ALL ./tests.cpp ../src/p2prm.cpp ../src/cluster.cpp + ../src/blob.cpp + ./p2p-rm.cpp ) target_link_libraries(p2p_rm uriparser) +add_executable(peer_test EXCLUDE_FROM_ALL + ./peer.cpp + ../src/p2prm.cpp + ../src/cluster.cpp + ../src/blob.cpp +) +target_link_libraries(peer_test uriparser ftlnet gflags glog) + add_test(Mapped_ptr mapped_ptr) add_test(RM_API p2p_rm) add_custom_target(tests) -add_dependencies(tests mapped_ptr p2p_rm) +add_dependencies(tests mapped_ptr p2p_rm peer_test) diff --git a/p2p-rm/test/mapped_ptr.cpp b/p2p-rm/test/mapped_ptr.cpp index 72aaef8a49a2a4e2b149537ced84824ce867e3f8..5dfa74e6936d0271cc25725f92b19ab1b9960566 100644 --- a/p2p-rm/test/mapped_ptr.cpp +++ b/p2p-rm/test/mapped_ptr.cpp @@ -1,12 +1,11 @@ #include "catch.hpp" #include <ftl/p2p-rm/mapped_ptr.hpp> +#include <ftl/net/socket.hpp> #include <memory.h> +#include <iostream> + // Mock the BLOB -static bool is_finished = false; -void ftl::rm::Blob::finished() { - is_finished = true; -} static bool blob_sync = false; @@ -14,6 +13,11 @@ void ftl::rm::Blob::sync(size_t offset, size_t size) { blob_sync = true; } +int ftl::net::Socket::send2(uint32_t service, const std::string &data1, const std::string &data2) { + std::cout << "SEND2 (" << service << ")" << std::endl; + return 0; +} + SCENARIO( "Reading from a remote pointer", "[remote_ptr]" ) { // Make a dummy blob auto blob = new ftl::rm::Blob(); @@ -38,10 +42,8 @@ SCENARIO( "Writing to a remote pointer", "[remote_ptr]" ) { GIVEN( "a valid POD remote pointer" ) { ftl::mapped_ptr<int> pa{blob,0}; - is_finished = false; *pa = 23; REQUIRE( *pa == 23 ); - REQUIRE( is_finished ); REQUIRE( pa[0] == 23 ); pa[1] = 25; REQUIRE( pa[1] == 25 ); @@ -49,14 +51,11 @@ SCENARIO( "Writing to a remote pointer", "[remote_ptr]" ) { GIVEN( "a persistent write_ref" ) { ftl::mapped_ptr<int> pa{blob,0}; - is_finished = false; auto ra = *pa; ra = 23; REQUIRE( ra == 23 ); - REQUIRE( !is_finished ); ra.reset(); - REQUIRE( is_finished ); } } diff --git a/p2p-rm/test/p2p-rm.cpp b/p2p-rm/test/p2p-rm.cpp index 8aae7527901ecc581387f35d568b0176bdb4b5cc..bf797998585a0e5233e42f8f7a067341481f33c1 100644 --- a/p2p-rm/test/p2p-rm.cpp +++ b/p2p-rm/test/p2p-rm.cpp @@ -1,6 +1,7 @@ #include "catch.hpp" #include <ftl/p2p-rm.hpp> #include <ftl/net/socket.hpp> +#include <ftl/net.hpp> #include <vector> #include <iostream> @@ -14,6 +15,37 @@ int ftl::net::Socket::send2(uint32_t service, const std::string &data1, const st return 0; } +ftl::net::Socket::Socket(int s) : disp_(this) { + +} + +ftl::net::Socket::~Socket() { + +} + +int ftl::net::Socket::rpcid__ = 0; + +int ftl::net::Socket::send(uint32_t service, const std::string &data) { + msgs.push_back(service); + std::cout << "SEND (" << service << ")" << std::endl; + + /*if (service == P2P_RPC_CALL) { + // UNPACK + // PACK RETURN + message(P2P_RPC_RETURN, rdata); + }*/ + + return 0; +} + +bool ftl::net::wait() { + return true; +} + +std::shared_ptr<ftl::net::Socket> ftl::net::connect(const char *url) { + return nullptr; +} + // --- End Mock Socket Send SCENARIO( "Cluster::map()", "[map]" ) { @@ -70,8 +102,13 @@ SCENARIO( "Cluster::map()", "[map]" ) { SCENARIO( "Getting a read_ref", "[get]" ) { auto cluster = ftl::rm::cluster("ftl://utu.fi", nullptr); + // Add fake peer + cluster->addPeer(std::shared_ptr<ftl::net::Socket>(new ftl::net::Socket(0))); + int data = 89; + int data2 = 99; auto m = cluster->map<int>("ftl://utu.fi/memory/test1", &data); + cluster->map<int>("ftl://utu.fi/memory/remote0", &data2); REQUIRE( m.is_valid() ); GIVEN( "a valid URI to local memory" ) { @@ -84,7 +121,7 @@ SCENARIO( "Getting a read_ref", "[get]" ) { GIVEN( "a valid URI to remote memory" ) { const auto r = cluster->getReadable<int>("ftl://utu.fi/memory/remote0"); REQUIRE( r.is_valid() ); - REQUIRE( !r.pointer().is_local() ); + //REQUIRE( !r.pointer().is_local() ); REQUIRE( r == 888 ); } } diff --git a/p2p-rm/test/peer.cpp b/p2p-rm/test/peer.cpp index 3cf2321bed5c5308b117ba62bcaba4b2103ab106..07d9c69e7d045f9f5f1f38c8ab3e9f7bfd62b228 100644 --- a/p2p-rm/test/peer.cpp +++ b/p2p-rm/test/peer.cpp @@ -2,23 +2,26 @@ #include <ftl/net.hpp> #include <gflags/gflags.h> +#include <glog/logging.h> +#include <iostream> DEFINE_string(listen, "tcp://*:9000", "Listen URI"); DEFINE_string(peer, "", "Peer to connect to"); using namespace ftl; -int main(int argc, char *argv) { +int main(int argc, char **argv) { + google::InitGoogleLogging(argv[0]); gflags::ParseCommandLineFlags(&argc, &argv, true); - auto net = net::listen(FLAGS_listen); + auto net = net::listen(FLAGS_listen.c_str()); auto cluster = rm::cluster("ftl://utu.fi", net); int data = 20; auto ptr = cluster->map<int>("ftl://utu.fi/memory/test1", &data); - if (FLAGS_peer) { - cluster->addPeer(FLAGS_peer); + if (FLAGS_peer.size() > 0) { + cluster->addPeer(FLAGS_peer.c_str()); std::cout << "Value = " << *ptr << std::endl; // 25. std::cout << "Raw = " << data << std::endl; // 25. @@ -26,12 +29,13 @@ int main(int argc, char *argv) { } else { *ptr = 25; - ptr.onChange(()=> { + /*ptr.onChange(()=> { std::cout << "Value changed = " << *ptr << std::endl; // 30 - }); + });*/ } - while (net()); + //while (ftl::net::wait()); + ftl::net::run(false); return 0; }