Skip to content
Snippets Groups Projects
Commit 7e4570f1 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Merge branch 'master' of https://gitlab.utu.fi/nicolas.pope/ftl

parents 63bff2eb a19584e6
No related branches found
No related tags found
No related merge requests found
Pipeline #9711 failed
...@@ -17,7 +17,7 @@ target_include_directories(ftlnet PUBLIC ...@@ -17,7 +17,7 @@ target_include_directories(ftlnet PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include> $<INSTALL_INTERFACE:include>
PRIVATE src) PRIVATE src)
target_link_libraries(ftlnet Threads::Threads glog::glog) target_link_libraries(ftlnet Threads::Threads glog::glog ${UUID_LIBRARIES} ${URIPARSER_LIBRARIES})
install(TARGETS ftlnet EXPORT ftlnet-config install(TARGETS ftlnet EXPORT ftlnet-config
ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}
......
/*********************************************************
*
* Copyright (C) 2014 by Vitaliy Vitsentiy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*********************************************************/
#ifndef __ctpl_stl_thread_pool_H__
#define __ctpl_stl_thread_pool_H__
#include <functional>
#include <thread>
#include <atomic>
#include <vector>
#include <memory>
#include <exception>
#include <future>
#include <mutex>
#include <queue>
// thread pool to run user's functors with signature
// ret func(int id, other_params)
// where id is the index of the thread that runs the functor
// ret is some return type
namespace ctpl {
namespace detail {
template <typename T>
class Queue {
public:
bool push(T const & value) {
std::unique_lock<std::mutex> lock(this->mutex);
this->q.push(value);
return true;
}
// deletes the retrieved element, do not use for non integral types
bool pop(T & v) {
std::unique_lock<std::mutex> lock(this->mutex);
if (this->q.empty())
return false;
v = this->q.front();
this->q.pop();
return true;
}
bool empty() {
std::unique_lock<std::mutex> lock(this->mutex);
return this->q.empty();
}
private:
std::queue<T> q;
std::mutex mutex;
};
}
class thread_pool {
public:
thread_pool() { this->init(); }
thread_pool(int nThreads) { this->init(); this->resize(nThreads); }
// the destructor waits for all the functions in the queue to be finished
~thread_pool() {
this->stop(true);
}
// get the number of running threads in the pool
int size() { return static_cast<int>(this->threads.size()); }
// number of idle threads
int n_idle() { return this->nWaiting; }
std::thread & get_thread(int i) { return *this->threads[i]; }
// change the number of threads in the pool
// should be called from one thread, otherwise be careful to not interleave, also with this->stop()
// nThreads must be >= 0
void resize(int nThreads) {
if (!this->isStop && !this->isDone) {
int oldNThreads = static_cast<int>(this->threads.size());
if (oldNThreads <= nThreads) { // if the number of threads is increased
this->threads.resize(nThreads);
this->flags.resize(nThreads);
for (int i = oldNThreads; i < nThreads; ++i) {
this->flags[i] = std::make_shared<std::atomic<bool>>(false);
this->set_thread(i);
}
}
else { // the number of threads is decreased
for (int i = oldNThreads - 1; i >= nThreads; --i) {
*this->flags[i] = true; // this thread will finish
this->threads[i]->detach();
}
{
// stop the detached threads that were waiting
std::unique_lock<std::mutex> lock(this->mutex);
this->cv.notify_all();
}
this->threads.resize(nThreads); // safe to delete because the threads are detached
this->flags.resize(nThreads); // safe to delete because the threads have copies of shared_ptr of the flags, not originals
}
}
}
// empty the queue
void clear_queue() {
std::function<void(int id)> * _f;
while (this->q.pop(_f))
delete _f; // empty the queue
}
// pops a functional wrapper to the original function
std::function<void(int)> pop() {
std::function<void(int id)> * _f = nullptr;
this->q.pop(_f);
std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
std::function<void(int)> f;
if (_f)
f = *_f;
return f;
}
// wait for all computing threads to finish and stop all threads
// may be called asynchronously to not pause the calling thread while waiting
// if isWait == true, all the functions in the queue are run, otherwise the queue is cleared without running the functions
void stop(bool isWait = false) {
if (!isWait) {
if (this->isStop)
return;
this->isStop = true;
for (int i = 0, n = this->size(); i < n; ++i) {
*this->flags[i] = true; // command the threads to stop
}
this->clear_queue(); // empty the queue
}
else {
if (this->isDone || this->isStop)
return;
this->isDone = true; // give the waiting threads a command to finish
}
{
std::unique_lock<std::mutex> lock(this->mutex);
this->cv.notify_all(); // stop all waiting threads
}
for (int i = 0; i < static_cast<int>(this->threads.size()); ++i) { // wait for the computing threads to finish
if (this->threads[i]->joinable())
this->threads[i]->join();
}
// if there were no threads in the pool but some functors in the queue, the functors are not deleted by the threads
// therefore delete them here
this->clear_queue();
this->threads.clear();
this->flags.clear();
}
template<typename F, typename... Rest>
auto push(F && f, Rest&&... rest) ->std::future<decltype(f(0, rest...))> {
auto pck = std::make_shared<std::packaged_task<decltype(f(0, rest...))(int)>>(
std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...)
);
auto _f = new std::function<void(int id)>([pck](int id) {
(*pck)(id);
});
this->q.push(_f);
std::unique_lock<std::mutex> lock(this->mutex);
this->cv.notify_one();
return pck->get_future();
}
// run the user's function that excepts argument int - id of the running thread. returned value is templatized
// operator returns std::future, where the user can get the result and rethrow the catched exceptins
template<typename F>
auto push(F && f) ->std::future<decltype(f(0))> {
auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(std::forward<F>(f));
auto _f = new std::function<void(int id)>([pck](int id) {
(*pck)(id);
});
this->q.push(_f);
std::unique_lock<std::mutex> lock(this->mutex);
this->cv.notify_one();
return pck->get_future();
}
private:
// deleted
thread_pool(const thread_pool &);// = delete;
thread_pool(thread_pool &&);// = delete;
thread_pool & operator=(const thread_pool &);// = delete;
thread_pool & operator=(thread_pool &&);// = delete;
void set_thread(int i) {
std::shared_ptr<std::atomic<bool>> flag(this->flags[i]); // a copy of the shared ptr to the flag
auto f = [this, i, flag/* a copy of the shared ptr to the flag */]() {
std::atomic<bool> & _flag = *flag;
std::function<void(int id)> * _f;
bool isPop = this->q.pop(_f);
while (true) {
while (isPop) { // if there is anything in the queue
std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
(*_f)(i);
if (_flag)
return; // the thread is wanted to stop, return even if the queue is not empty yet
else
isPop = this->q.pop(_f);
}
// the queue is empty here, wait for the next command
std::unique_lock<std::mutex> lock(this->mutex);
++this->nWaiting;
this->cv.wait(lock, [this, &_f, &isPop, &_flag](){ isPop = this->q.pop(_f); return isPop || this->isDone || _flag; });
--this->nWaiting;
if (!isPop)
return; // if the queue is empty and this->isDone == true or *flag then return
}
};
this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique()
}
void init() { this->nWaiting = 0; this->isStop = false; this->isDone = false; }
std::vector<std::unique_ptr<std::thread>> threads;
std::vector<std::shared_ptr<std::atomic<bool>>> flags;
detail::Queue<std::function<void(int id)> *> q;
std::atomic<bool> isDone;
std::atomic<bool> isStop;
std::atomic<int> nWaiting; // how many threads are waiting
std::mutex mutex;
std::condition_variable cv;
};
}
#endif // __ctpl_stl_thread_pool_H__
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include <vector> #include <vector>
#include <string> #include <string>
#include <unordered_map> #include <unordered_map>
#include <optional>
namespace ftl { namespace ftl {
...@@ -46,7 +47,7 @@ class Peer; ...@@ -46,7 +47,7 @@ class Peer;
class Dispatcher { class Dispatcher {
public: public:
Dispatcher() {} explicit Dispatcher(Dispatcher *parent=nullptr) : parent_(parent) {}
//void dispatch(Peer &, const std::string &msg); //void dispatch(Peer &, const std::string &msg);
void dispatch(Peer &, const msgpack::object &msg); void dispatch(Peer &, const msgpack::object &msg);
...@@ -134,8 +135,11 @@ class Dispatcher { ...@@ -134,8 +135,11 @@ class Dispatcher {
std::tuple<uint32_t, uint32_t, msgpack::object, msgpack::object>; std::tuple<uint32_t, uint32_t, msgpack::object, msgpack::object>;
private: private:
Dispatcher *parent_;
std::unordered_map<std::string, adaptor_type> funcs_; std::unordered_map<std::string, adaptor_type> funcs_;
std::optional<adaptor_type> _locateHandler(const std::string &name) const;
static void enforce_arg_count(std::string const &func, std::size_t found, static void enforce_arg_count(std::string const &func, std::size_t found,
std::size_t expected); std::size_t expected);
......
...@@ -20,6 +20,10 @@ ...@@ -20,6 +20,10 @@
#include <tuple> #include <tuple>
#include <vector> #include <vector>
#include <type_traits> #include <type_traits>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
# define ENABLE_IF(...) \ # define ENABLE_IF(...) \
typename std::enable_if<(__VA_ARGS__), bool>::type = true typename std::enable_if<(__VA_ARGS__), bool>::type = true
...@@ -56,6 +60,7 @@ struct decrypt{};*/ ...@@ -56,6 +60,7 @@ struct decrypt{};*/
class Peer { class Peer {
public: public:
friend class Universe; friend class Universe;
friend class Dispatcher;
enum Status { enum Status {
kInvalid, kConnecting, kConnected, kDisconnected, kReconnecting kInvalid, kConnecting, kConnected, kDisconnected, kReconnecting
...@@ -135,14 +140,21 @@ class Peer { ...@@ -135,14 +140,21 @@ class Peer {
void onConnect(std::function<void()> &f); void onConnect(std::function<void()> &f);
void onDisconnect(std::function<void()> &f) {} void onDisconnect(std::function<void()> &f) {}
bool isWaiting() const { return is_waiting_; }
public: public:
static const int kMaxMessage = 10*1024*1024; // 10Mb currently static const int kMaxMessage = 10*1024*1024; // 10Mb currently
protected: protected:
bool data(); // Process one message from socket void data(); // Process one message from socket
void socketError(); // Process one error from socket void socketError(); // Process one error from socket
void error(int e); void error(int e);
bool _data();
void _dispatchResponse(uint32_t id, msgpack::object &obj);
void _sendResponse(uint32_t id, const msgpack::object &obj);
/** /**
* Get the internal OS dependent socket. * Get the internal OS dependent socket.
* TODO(nick) Work out if this should be private. * TODO(nick) Work out if this should be private.
...@@ -161,7 +173,6 @@ class Peer { ...@@ -161,7 +173,6 @@ class Peer {
private: // Functions private: // Functions
void _connected(); void _connected();
void _updateURI(); void _updateURI();
void _dispatchReturn(const std::string &d);
int _send(); int _send();
...@@ -186,13 +197,15 @@ class Peer { ...@@ -186,13 +197,15 @@ class Peer {
int sock_; int sock_;
ftl::URI::scheme_t scheme_; ftl::URI::scheme_t scheme_;
uint32_t version_; uint32_t version_;
bool destroy_disp_;
// Receive buffers // Receive buffers
bool is_waiting_;
msgpack::unpacker recv_buf_; msgpack::unpacker recv_buf_;
std::mutex recv_mtx_;
// Send buffers // Send buffers
msgpack::vrefbuffer send_buf_; msgpack::vrefbuffer send_buf_;
std::mutex send_mtx_;
std::string uri_; std::string uri_;
ftl::UUID peerid_; ftl::UUID peerid_;
...@@ -210,6 +223,7 @@ class Peer { ...@@ -210,6 +223,7 @@ class Peer {
template <typename... ARGS> template <typename... ARGS>
int Peer::send(const std::string &s, ARGS... args) { int Peer::send(const std::string &s, ARGS... args) {
std::unique_lock<std::mutex> lk(send_mtx_);
// Leave a blank entry for websocket header // Leave a blank entry for websocket header
if (scheme_ == ftl::URI::SCHEME_WS) send_buf_.append_ref(nullptr,0); if (scheme_ == ftl::URI::SCHEME_WS) send_buf_.append_ref(nullptr,0);
auto args_obj = std::make_tuple(args...); auto args_obj = std::make_tuple(args...);
...@@ -228,17 +242,26 @@ void Peer::bind(const std::string &name, F func) { ...@@ -228,17 +242,26 @@ void Peer::bind(const std::string &name, F func) {
template <typename R, typename... ARGS> template <typename R, typename... ARGS>
R Peer::call(const std::string &name, ARGS... args) { R Peer::call(const std::string &name, ARGS... args) {
bool hasreturned = false; bool hasreturned = false;
std::mutex m;
std::condition_variable cv;
R result; R result;
asyncCall<R>(name, [&result,&hasreturned](const R &r) { asyncCall<R>(name, [&](const R &r) {
std::unique_lock<std::mutex> lk(m);
hasreturned = true; hasreturned = true;
result = r; result = r;
lk.unlock();
cv.notify_one();
}, std::forward<ARGS>(args)...); }, std::forward<ARGS>(args)...);
// Loop the network { // Block thread until async callback notifies us
int limit = 10; std::unique_lock<std::mutex> lk(m);
while (limit > 0 && !hasreturned) { cv.wait_for(lk, std::chrono::seconds(1), [&hasreturned]{return hasreturned;});
limit--; }
// TODO REPLACE ftl::net::wait();
if (!hasreturned) {
// TODO(nick) remove callback
throw 1;
} }
return result; return result;
...@@ -255,13 +278,12 @@ void Peer::asyncCall( ...@@ -255,13 +278,12 @@ void Peer::asyncCall(
LOG(INFO) << "RPC " << name << "() -> " << uri_; LOG(INFO) << "RPC " << name << "() -> " << uri_;
std::stringstream buf; msgpack::pack(send_buf_, call_obj);
msgpack::pack(buf, call_obj);
// Register the CB // Register the CB
callbacks_[rpcid] = std::make_unique<caller<T>>(cb); callbacks_[rpcid] = std::make_unique<caller<T>>(cb);
send("__rpc__", buf.str()); _send();
} }
}; };
......
...@@ -51,6 +51,8 @@ class Universe { ...@@ -51,6 +51,8 @@ class Universe {
*/ */
bool connect(const std::string &addr); bool connect(const std::string &addr);
int numberOfPeers() const { return peers_.size(); }
/** /**
* Bind a function to an RPC or service call name. This will implicitely * Bind a function to an RPC or service call name. This will implicitely
* be called by any peer making the request. * be called by any peer making the request.
......
...@@ -8,6 +8,7 @@ using ftl::net::Peer; ...@@ -8,6 +8,7 @@ using ftl::net::Peer;
using ftl::net::Dispatcher; using ftl::net::Dispatcher;
using std::vector; using std::vector;
using std::string; using std::string;
using std::optional;
/*static std::string hexStr(const std::string &s) /*static std::string hexStr(const std::string &s)
{ {
...@@ -48,43 +49,69 @@ void ftl::net::Dispatcher::dispatch(Peer &s, const msgpack::object &msg) { ...@@ -48,43 +49,69 @@ void ftl::net::Dispatcher::dispatch(Peer &s, const msgpack::object &msg) {
void ftl::net::Dispatcher::dispatch_call(Peer &s, const msgpack::object &msg) { void ftl::net::Dispatcher::dispatch_call(Peer &s, const msgpack::object &msg) {
call_t the_call; call_t the_call;
msg.convert(the_call);
try {
msg.convert(the_call);
} catch(...) {
LOG(ERROR) << "Bad message format";
return;
}
// TODO: proper validation of protocol (and responding to it) // TODO: proper validation of protocol (and responding to it)
// auto &&type = std::get<0>(the_call); auto &&type = std::get<0>(the_call);
// assert(type == 0);
auto &&id = std::get<1>(the_call); auto &&id = std::get<1>(the_call);
auto &&name = std::get<2>(the_call); auto &&name = std::get<2>(the_call);
auto &&args = std::get<3>(the_call); auto &&args = std::get<3>(the_call);
// assert(type == 0);
LOG(INFO) << "RPC " << name << "() <- " << s.getURI(); if (type == 1) {
LOG(INFO) << "RPC return for " << id;
auto it_func = funcs_.find(name); s._dispatchResponse(id, args);
} else if (type == 0) {
LOG(INFO) << "RPC " << name << "() <- " << s.getURI();
auto it_func = funcs_.find(name);
if (it_func != end(funcs_)) {
try {
auto result = (it_func->second)(args); //->get();
s._sendResponse(id, result->get());
/*response_t res_obj = std::make_tuple(1,id,msgpack::object(),result->get());
std::stringstream buf;
msgpack::pack(buf, res_obj);
s.send("__return__", buf.str());*/
} catch (const std::exception &e) {
//throw;
//LOG(ERROR) << "Exception when attempting to call RPC (" << e << ")";
/*response_t res_obj = std::make_tuple(1,id,msgpack::object(e.what()),msgpack::object());
std::stringstream buf;
msgpack::pack(buf, res_obj);
s.send("__return__", buf.str());*/
} catch (int e) {
//throw;
//LOG(ERROR) << "Exception when attempting to call RPC (" << e << ")";
/*response_t res_obj = std::make_tuple(1,id,msgpack::object(e),msgpack::object());
std::stringstream buf;
msgpack::pack(buf, res_obj);
s.send("__return__", buf.str());*/
}
}
} else {
// TODO(nick) Some error
}
}
if (it_func != end(funcs_)) { optional<Dispatcher::adaptor_type> ftl::net::Dispatcher::_locateHandler(const std::string &name) const {
try { auto it_func = funcs_.find(name);
auto result = (it_func->second)(args); //->get(); if (it_func == end(funcs_)) {
response_t res_obj = std::make_tuple(1,id,msgpack::object(),result->get()); if (parent_ != nullptr) {
std::stringstream buf; return parent_->_locateHandler(name);
msgpack::pack(buf, res_obj); } else {
s.send("__return__", buf.str()); return {};
} catch (const std::exception &e) {
//throw;
//LOG(ERROR) << "Exception when attempting to call RPC (" << e << ")";
response_t res_obj = std::make_tuple(1,id,msgpack::object(e.what()),msgpack::object());
std::stringstream buf;
msgpack::pack(buf, res_obj);
s.send("__return__", buf.str());
} catch (int e) {
//throw;
//LOG(ERROR) << "Exception when attempting to call RPC (" << e << ")";
response_t res_obj = std::make_tuple(1,id,msgpack::object(e),msgpack::object());
std::stringstream buf;
msgpack::pack(buf, res_obj);
s.send("__return__", buf.str());
} }
} } else {
return it_func->second;
}
} }
void ftl::net::Dispatcher::dispatch_notification(Peer &s, msgpack::object const &msg) { void ftl::net::Dispatcher::dispatch_notification(Peer &s, msgpack::object const &msg) {
...@@ -100,11 +127,11 @@ void ftl::net::Dispatcher::dispatch_notification(Peer &s, msgpack::object const ...@@ -100,11 +127,11 @@ void ftl::net::Dispatcher::dispatch_notification(Peer &s, msgpack::object const
LOG(INFO) << "NOTIFICATION " << name << "() <- " << s.getURI(); LOG(INFO) << "NOTIFICATION " << name << "() <- " << s.getURI();
auto it_func = funcs_.find(name); auto binding = _locateHandler(name);
if (it_func != end(funcs_)) { if (binding) {
try { try {
auto result = (it_func->second)(args); auto result = (*binding)(args);
} catch (int e) { } catch (int e) {
throw e; throw e;
} }
...@@ -124,6 +151,7 @@ void ftl::net::Dispatcher::enforce_arg_count(std::string const &func, std::size_ ...@@ -124,6 +151,7 @@ void ftl::net::Dispatcher::enforce_arg_count(std::string const &func, std::size_
void ftl::net::Dispatcher::enforce_unique_name(std::string const &func) { void ftl::net::Dispatcher::enforce_unique_name(std::string const &func) {
auto pos = funcs_.find(func); auto pos = funcs_.find(func);
if (pos != end(funcs_)) { if (pos != end(funcs_)) {
LOG(ERROR) << "RPC non unique binding for " << func;
throw -1; throw -1;
} }
} }
......
#define GLOG_NO_ABBREVIATED_SEVERITIES #define GLOG_NO_ABBREVIATED_SEVERITIES
#include <glog/logging.h> #include <glog/logging.h>
#include <ctpl_stl.h>
#include <fcntl.h> #include <fcntl.h>
#ifdef WIN32 #ifdef WIN32
...@@ -50,6 +51,8 @@ using ftl::net::Dispatcher; ...@@ -50,6 +51,8 @@ using ftl::net::Dispatcher;
int Peer::rpcid__ = 0; int Peer::rpcid__ = 0;
static ctpl::thread_pool pool(5);
// TODO(nick) Move to tcp_internal.cpp // TODO(nick) Move to tcp_internal.cpp
static int tcpConnect(URI &uri) { static int tcpConnect(URI &uri) {
int rc; int rc;
...@@ -127,18 +130,15 @@ Peer::Peer(int s, Dispatcher *d) : sock_(s) { ...@@ -127,18 +130,15 @@ Peer::Peer(int s, Dispatcher *d) : sock_(s) {
status_ = (s == INVALID_SOCKET) ? kInvalid : kConnecting; status_ = (s == INVALID_SOCKET) ? kInvalid : kConnecting;
_updateURI(); _updateURI();
if (d != nullptr) { disp_ = new Dispatcher(d);
disp_ = d;
destroy_disp_ = false; is_waiting_ = true;
} else {
disp_ = new Dispatcher();
destroy_disp_ = true;
}
// Send the initiating handshake if valid // Send the initiating handshake if valid
if (status_ == kConnecting) { if (status_ == kConnecting) {
// Install return handshake handler. // Install return handshake handler.
bind("__handshake__", [this](uint64_t magic, uint32_t version, UUID pid) { bind("__handshake__", [this](uint64_t magic, uint32_t version, UUID pid) {
LOG(INFO) << "Handshake 2 received";
if (magic != ftl::net::kMagic) { if (magic != ftl::net::kMagic) {
close(); close();
LOG(ERROR) << "Invalid magic during handshake"; LOG(ERROR) << "Invalid magic during handshake";
...@@ -160,13 +160,7 @@ Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) { ...@@ -160,13 +160,7 @@ Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) {
status_ = kInvalid; status_ = kInvalid;
sock_ = INVALID_SOCKET; sock_ = INVALID_SOCKET;
if (d != nullptr) { disp_ = new Dispatcher(d);
disp_ = d;
destroy_disp_ = false;
} else {
disp_ = new Dispatcher();
destroy_disp_ = true;
}
scheme_ = uri.getProtocol(); scheme_ = uri.getProtocol();
if (uri.getProtocol() == URI::SCHEME_TCP) { if (uri.getProtocol() == URI::SCHEME_TCP) {
...@@ -204,9 +198,12 @@ Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) { ...@@ -204,9 +198,12 @@ Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) {
LOG(ERROR) << "Unrecognised connection protocol: " << pUri; LOG(ERROR) << "Unrecognised connection protocol: " << pUri;
} }
is_waiting_ = true;
if (status_ == kConnecting) { if (status_ == kConnecting) {
// Install return handshake handler. // Install return handshake handler.
bind("__handshake__", [this](uint64_t magic, uint32_t version, UUID pid) { bind("__handshake__", [this](uint64_t magic, uint32_t version, UUID pid) {
LOG(INFO) << "Handshake 1 received";
if (magic != ftl::net::kMagic) { if (magic != ftl::net::kMagic) {
close(); close();
LOG(ERROR) << "Invalid magic during handshake"; LOG(ERROR) << "Invalid magic during handshake";
...@@ -298,9 +295,25 @@ void Peer::error(int e) { ...@@ -298,9 +295,25 @@ void Peer::error(int e) {
} }
bool Peer::data() { void Peer::data() {
//if (!is_waiting_) return;
is_waiting_ = false;
pool.push([](int id, Peer *p) {
p->_data();
p->is_waiting_ = true;
}, this);
}
bool Peer::_data() {
//std::unique_lock<std::mutex> lk(recv_mtx_);
recv_buf_.reserve_buffer(kMaxMessage); recv_buf_.reserve_buffer(kMaxMessage);
size_t rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), kMaxMessage, 0); int rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), kMaxMessage, 0);
if (rc < 0) {
return false;
}
recv_buf_.buffer_consumed(rc); recv_buf_.buffer_consumed(rc);
msgpack::object_handle msg; msgpack::object_handle msg;
...@@ -308,12 +321,18 @@ bool Peer::data() { ...@@ -308,12 +321,18 @@ bool Peer::data() {
msgpack::object obj = msg.get(); msgpack::object obj = msg.get();
if (status_ != kConnected) { if (status_ != kConnected) {
// First message must be a handshake // First message must be a handshake
tuple<uint32_t, std::string, msgpack::object> hs; try {
obj.convert(hs); tuple<uint32_t, std::string, msgpack::object> hs;
obj.convert(hs);
if (get<1>(hs) != "__handshake__") {
if (get<1>(hs) != "__handshake__") {
close();
LOG(ERROR) << "Missing handshake";
return false;
}
} catch(...) {
close(); close();
LOG(ERROR) << "Missing handshake"; LOG(ERROR) << "Bad first message format";
return false; return false;
} }
} }
...@@ -432,21 +451,7 @@ void Socket::handshake2() { ...@@ -432,21 +451,7 @@ void Socket::handshake2() {
_connected(); _connected();
}*/ }*/
void Peer::_dispatchReturn(const std::string &d) { void Peer::_dispatchResponse(uint32_t id, msgpack::object &res) {
auto unpacked = msgpack::unpack(d.data(), d.size());
Dispatcher::response_t the_result;
unpacked.get().convert(the_result);
// Msgpack stipulates that 1 means return message
if (std::get<0>(the_result) != 1) {
LOG(ERROR) << "Bad RPC return message";
return;
}
auto &&id = std::get<1>(the_result);
//auto &&err = std::get<2>(the_result);
auto &&res = std::get<3>(the_result);
// TODO Handle error reporting... // TODO Handle error reporting...
if (callbacks_.count(id) > 0) { if (callbacks_.count(id) > 0) {
...@@ -460,6 +465,12 @@ void Peer::_dispatchReturn(const std::string &d) { ...@@ -460,6 +465,12 @@ void Peer::_dispatchReturn(const std::string &d) {
} }
} }
void Peer::_sendResponse(uint32_t id, const msgpack::object &res) {
Dispatcher::response_t res_obj = std::make_tuple(1,id,msgpack::object(),res);
msgpack::pack(send_buf_, res_obj);
_send();
}
void Peer::onConnect(std::function<void()> &f) { void Peer::onConnect(std::function<void()> &f) {
if (status_ == kConnected) { if (status_ == kConnected) {
f(); f();
...@@ -516,9 +527,7 @@ int Peer::_send() { ...@@ -516,9 +527,7 @@ int Peer::_send() {
Peer::~Peer() { Peer::~Peer() {
close(); close();
if (destroy_disp_) { delete disp_;
delete disp_;
}
} }
#define GLOG_NO_ABBREVIATED_SEVERITIES
#include <glog/logging.h>
#include <ftl/net/socket.hpp>
#include <ftl/net/protocol.hpp>
#include <functional>
#include <iostream>
using ftl::net::Socket;
using ftl::net::Protocol;
std::map<std::string,Protocol*> Protocol::protocols__;
Protocol *Protocol::find(const std::string &id) {
if (protocols__.count(id) > 0) return protocols__[id];
else return NULL;
}
Protocol::Protocol(const std::string &id) : id_(id) {
protocols__[id] = this;
}
Protocol::~Protocol() {
protocols__.erase(id_);
// TODO Make sure all dependent sockets are closed!
}
void Protocol::bind(int service, std::function<void(uint32_t,Socket&)> func) {
if (handlers_.count(service) == 0) {
handlers_[service] = func;
} else {
LOG(ERROR) << "Message service " << service << " already bound";
}
}
void Protocol::dispatchRPC(Socket &s, const std::string &d) {
disp_.dispatch(s,d);
}
void Protocol::dispatchRaw(uint32_t service, Socket &s) {
// Lookup raw message handler
if (handlers_.count(service) > 0) {
handlers_[service](service, s);
} else {
LOG(ERROR) << "Unrecognised service request (" << service << ") from " << s.getURI();
}
}
...@@ -77,7 +77,9 @@ int Universe::_setDescriptors() { ...@@ -77,7 +77,9 @@ int Universe::_setDescriptors() {
n = s->_socket(); n = s->_socket();
} }
FD_SET(s->_socket(), &sfdread_); if (s->isWaiting()) {
FD_SET(s->_socket(), &sfdread_);
}
FD_SET(s->_socket(), &sfderror_); FD_SET(s->_socket(), &sfderror_);
} }
} }
...@@ -105,8 +107,8 @@ void Universe::_run() { ...@@ -105,8 +107,8 @@ void Universe::_run() {
int selres = 1; int selres = 1;
//Wait for a network event or timeout in 3 seconds //Wait for a network event or timeout in 3 seconds
block.tv_sec = 3; block.tv_sec = 0;
block.tv_usec = 0; block.tv_usec = 10000;
selres = select(n+1, &sfdread_, 0, &sfderror_, &block); selres = select(n+1, &sfdread_, 0, &sfderror_, &block);
//Some kind of error occured, it is usually possible to recover from this. //Some kind of error occured, it is usually possible to recover from this.
...@@ -151,7 +153,12 @@ void Universe::_run() { ...@@ -151,7 +153,12 @@ void Universe::_run() {
if (s != NULL && s->isValid()) { if (s != NULL && s->isValid()) {
//If message received from this client then deal with it //If message received from this client then deal with it
if (FD_ISSET(s->_socket(), &sfdread_)) { if (FD_ISSET(s->_socket(), &sfdread_)) {
s->data(); //s->data();
//std::cout << "QUEUE DATA PROC" << std::endl;
//p.push([](int id, Peer *s) {
// std::cout << "Processing in thread " << std::to_string(id) << std::endl;
s->data();
//}, s);
} }
if (FD_ISSET(s->_socket(), &sfderror_)) { if (FD_ISSET(s->_socket(), &sfderror_)) {
s->socketError(); s->socketError();
......
...@@ -9,6 +9,7 @@ add_executable(peer_unit ...@@ -9,6 +9,7 @@ add_executable(peer_unit
target_link_libraries(peer_unit target_link_libraries(peer_unit
${URIPARSER_LIBRARIES} ${URIPARSER_LIBRARIES}
glog::glog glog::glog
Threads::Threads
${UUID_LIBRARIES}) ${UUID_LIBRARIES})
### URI ######################################################################## ### URI ########################################################################
...@@ -32,14 +33,17 @@ target_link_libraries(uri_unit ...@@ -32,14 +33,17 @@ target_link_libraries(uri_unit
# ${UUID_LIBRARIES}) # ${UUID_LIBRARIES})
### Net Integration ############################################################ ### Net Integration ############################################################
#add_executable(net_integration add_executable(net_integration
# ./tests.cpp ./tests.cpp
# ./net_integration.cpp) ../../../common/cpp/src/config.cpp
#add_dependencies(net_integration ftlnet) ./net_integration.cpp)
#target_link_libraries(net_integration add_dependencies(net_integration ftlnet)
# ftlnet target_link_libraries(net_integration
# ${URIPARSER_LIBRARIES} ftlnet
# glog::glog) ${URIPARSER_LIBRARIES}
glog::glog
Threads::Threads
${UUID_LIBRARIES})
...@@ -48,7 +52,7 @@ target_link_libraries(uri_unit ...@@ -48,7 +52,7 @@ target_link_libraries(uri_unit
add_test(URIUnitTest uri_unit) add_test(URIUnitTest uri_unit)
#add_test(ProtocolUnitTest protocol_unit) #add_test(ProtocolUnitTest protocol_unit)
add_test(PeerUnitTest peer_unit) add_test(PeerUnitTest peer_unit)
#add_test(NetIntegrationTest net_integration) add_test(NetIntegrationTest net_integration)
add_custom_target(tests) add_custom_target(tests)
add_dependencies(tests peer_unit uri_unit) add_dependencies(tests peer_unit uri_unit)
......
#include "catch.hpp" #include "catch.hpp"
#include <ftl/net.hpp> #include <ftl/net.hpp>
#include <ftl/net/socket.hpp>
#include <ftl/net/listener.hpp>
#include <memory> #include <thread>
#include <iostream> #include <chrono>
using ftl::net::Socket; using ftl::net::Universe;
using ftl::net::Protocol; using std::this_thread::sleep_for;
using std::shared_ptr; using std::chrono::milliseconds;
// --- Support ----------------------------------------------------------------- // --- Support -----------------------------------------------------------------
#ifndef WIN32
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <arpa/inet.h>
#define INVALID_SOCKET -1
#define SOCKET_ERROR -1
#endif
#ifdef WIN32
#include <winsock2.h>
#include <Ws2tcpip.h>
#include <windows.h>
#pragma comment(lib, "Ws2_32.lib")
#endif
static int ssock = INVALID_SOCKET;
static sockaddr_in slocalAddr;
void fin_server() {
//int t = 1;
//setsockopt(ssock,SOL_SOCKET,SO_REUSEADDR,&t,sizeof(int));
#ifndef WIN32
if (ssock != INVALID_SOCKET) close(ssock);
#else
if (ssock != INVALID_SOCKET) closesocket(ssock);
#endif
ssock = INVALID_SOCKET;
}
void init_server() {
//fin_server();
int port = 7077;
#ifdef WIN32
WSAData wsaData;
//If Win32 then load winsock
if (WSAStartup(MAKEWORD(1,1), &wsaData) != 0) {
std::cerr << "Socket error\n";
return;
}
#endif
ssock = socket(AF_INET, SOCK_STREAM, 0);
if (ssock == INVALID_SOCKET) {
std::cerr << "Socket error 1\n";
return;
}
int enable = 1;
if (setsockopt(ssock, SOL_SOCKET, SO_REUSEADDR, (char*)&enable, sizeof(int)) < 0)
std::cerr << "setsockopt(SO_REUSEADDR) failed" << std::endl;
//Specify listen port and address
//memset(&s_localAddr, 0, sizeof(s_localAddr));
slocalAddr.sin_family = AF_INET;
slocalAddr.sin_addr.s_addr = htonl(INADDR_ANY);
slocalAddr.sin_port = htons(port);
int rc = bind(ssock, (struct sockaddr*)&slocalAddr, sizeof(slocalAddr));
if (rc == SOCKET_ERROR) {
std::cerr << "Socket error 2\n";
#ifndef WIN32
close(ssock);
#else
closesocket(ssock);
#endif
ssock = INVALID_SOCKET;
return;
}
//Attempt to start listening for connection requests.
rc = ::listen(ssock, 1);
if (rc == SOCKET_ERROR) {
std::cerr << "Socket error 3\n";
#ifndef WIN32
close(ssock);
#else
closesocket(ssock);
#endif
ssock = INVALID_SOCKET;
return;
}
}
// --- Tests ------------------------------------------------------------------- // --- Tests -------------------------------------------------------------------
TEST_CASE("net::connect()", "[net]") { TEST_CASE("Universe::connect()", "[net]") {
init_server(); Universe a("ftl://utu.fi");
shared_ptr<Socket> sock = nullptr; Universe b("ftl://utu.fi");
a.listen("tcp://localhost:7077");
SECTION("valid tcp connection using ipv4") { SECTION("valid tcp connection using ipv4") {
sock = ftl::net::connect("tcp://127.0.0.1:7077"); REQUIRE( b.connect("tcp://127.0.0.1:7077") );
REQUIRE(sock != nullptr);
REQUIRE(sock->isValid()); sleep_for(milliseconds(100));
REQUIRE( a.numberOfPeers() == 1 );
REQUIRE( b.numberOfPeers() == 1 );
} }
SECTION("valid tcp connection using hostname") { SECTION("valid tcp connection using hostname") {
sock = ftl::net::connect("tcp://localhost:7077"); REQUIRE( b.connect("tcp://localhost:7077") );
REQUIRE(sock->isValid());
sleep_for(milliseconds(100));
REQUIRE( a.numberOfPeers() == 1 );
REQUIRE( b.numberOfPeers() == 1 );
} }
SECTION("invalid protocol") { SECTION("invalid protocol") {
sock = ftl::net::connect("http://127.0.0.1:7077"); REQUIRE( !b.connect("http://127.0.0.1:7077") );
REQUIRE(!sock->isValid());
sleep_for(milliseconds(100));
REQUIRE( a.numberOfPeers() == 0 );
REQUIRE( b.numberOfPeers() == 0 );
} }
SECTION("empty uri") { /*SECTION("empty uri") {
sock = ftl::net::connect(""); sock = ftl::net::connect("");
REQUIRE(!sock->isValid()); REQUIRE(!sock->isValid());
} }
...@@ -136,7 +53,7 @@ TEST_CASE("net::connect()", "[net]") { ...@@ -136,7 +53,7 @@ TEST_CASE("net::connect()", "[net]") {
SECTION("null uri") { SECTION("null uri") {
sock = ftl::net::connect(NULL); sock = ftl::net::connect(NULL);
REQUIRE(!sock->isValid()); REQUIRE(!sock->isValid());
} }*/
// Disabled due to long timeout // Disabled due to long timeout
/*SECTION("incorrect ipv4 address") { /*SECTION("incorrect ipv4 address") {
...@@ -152,10 +69,85 @@ TEST_CASE("net::connect()", "[net]") { ...@@ -152,10 +69,85 @@ TEST_CASE("net::connect()", "[net]") {
REQUIRE(!sock->isValid()); REQUIRE(!sock->isValid());
}*/ }*/
fin_server(); //fin_server();
}
TEST_CASE("Universe::broadcast()", "[net]") {
Universe a("ftl://utu.fi");
Universe b("ftl://utu.fi");
a.listen("tcp://localhost:7077");
SECTION("no arguments to no peers") {
bool done = false;
a.bind("hello", [&done]() {
done = true;
});
b.broadcast("done");
sleep_for(milliseconds(100));
}
SECTION("no arguments to one peer") {
b.connect("tcp://localhost:7077");
while (a.numberOfPeers() == 0) sleep_for(milliseconds(20));
bool done = false;
a.bind("hello", [&done]() {
done = true;
});
b.broadcast("hello");
sleep_for(milliseconds(100));
REQUIRE( done );
}
SECTION("one argument to one peer") {
b.connect("tcp://localhost:7077");
while (a.numberOfPeers() == 0) sleep_for(milliseconds(20));
int done = 0;
a.bind("hello", [&done](int v) {
done = v;
});
b.broadcast("hello", 676);
sleep_for(milliseconds(100));
REQUIRE( done == 676 );
}
SECTION("one argument to two peers") {
Universe c("ftl://utu.fi");
b.connect("tcp://localhost:7077");
c.connect("tcp://localhost:7077");
while (a.numberOfPeers() < 2) sleep_for(milliseconds(20));
int done1 = 0;
b.bind("hello", [&done1](int v) {
done1 = v;
});
int done2 = 0;
c.bind("hello", [&done2](int v) {
done2 = v;
});
a.broadcast("hello", 676);
sleep_for(milliseconds(100));
REQUIRE( done1 == 676 );
REQUIRE( done2 == 676 );
}
} }
TEST_CASE("net::listen()", "[net]") { /*TEST_CASE("net::listen()", "[net]") {
SECTION("tcp any interface") { SECTION("tcp any interface") {
REQUIRE( ftl::net::listen("tcp://localhost:9001")->isListening() ); REQUIRE( ftl::net::listen("tcp://localhost:9001")->isListening() );
...@@ -229,5 +221,5 @@ TEST_CASE("Net Integration", "[integrate]") { ...@@ -229,5 +221,5 @@ TEST_CASE("Net Integration", "[integrate]") {
// TODO s2->wait(100); // TODO s2->wait(100);
REQUIRE( data == "hello world" ); REQUIRE( data == "hello world" );
} }*/
...@@ -116,6 +116,14 @@ tuple<std::string, T> readResponse(int s) { ...@@ -116,6 +116,14 @@ tuple<std::string, T> readResponse(int s) {
return std::make_tuple(get<1>(req), get<2>(req)); return std::make_tuple(get<1>(req), get<2>(req));
} }
template <typename T>
tuple<uint32_t, T> readRPC(int s) {
msgpack::object_handle msg = msgpack::unpack(fakedata[s].data(), fakedata[s].size());
tuple<uint8_t, uint32_t, std::string, T> req;
msg.get().convert(req);
return std::make_tuple(get<1>(req), get<3>(req));
}
// --- Files to test ----------------------------------------------------------- // --- Files to test -----------------------------------------------------------
#include "../src/peer.cpp" #include "../src/peer.cpp"
...@@ -171,52 +179,64 @@ TEST_CASE("Peer(int)", "[]") { ...@@ -171,52 +179,64 @@ TEST_CASE("Peer(int)", "[]") {
} }
} }
/*TEST_CASE("Peer::call()", "[rpc]") { TEST_CASE("Peer::call()", "[rpc]") {
MockPeer s; MockPeer s;
send_handshake(s);
s.mock_data();
SECTION("no argument call") { SECTION("one argument call") {
waithandler = [&]() { REQUIRE( s.isConnected() );
// Read fakedata sent
// TODO Validate data fakedata[0] = "";
// Thread to provide response to otherwise blocking call
std::thread thr([&s]() {
while (fakedata[0].size() == 0) std::this_thread::sleep_for(std::chrono::milliseconds(20));
// Do a fake send auto [id,value] = readRPC<tuple<int>>(0);
auto res_obj = std::make_tuple(1,0,msgpack::object(),66); auto res_obj = std::make_tuple(1,id,"__return__",get<0>(value)+22);
std::stringstream buf; std::stringstream buf;
msgpack::pack(buf, res_obj); msgpack::pack(buf, res_obj);
fakedata[0] = buf.str();
fake_send(0, FTL_PROTOCOL_RPCRETURN, buf.str());
s.mock_data(); s.mock_data();
}; });
int res = s.call<int>("test1"); int res = s.call<int>("test1", 44);
thr.join();
REQUIRE( (res == 66) ); REQUIRE( (res == 66) );
} }
SECTION("one argument call") { SECTION("no argument call") {
waithandler = [&]() { REQUIRE( s.isConnected() );
// Read fakedata sent
// TODO Validate data fakedata[0] = "";
// Thread to provide response to otherwise blocking call
std::thread thr([&s]() {
while (fakedata[0].size() == 0) std::this_thread::sleep_for(std::chrono::milliseconds(20));
// Do a fake send auto [id,value] = readRPC<tuple<>>(0);
auto res_obj = std::make_tuple(1,1,msgpack::object(),43); auto res_obj = std::make_tuple(1,id,"__return__",77);
std::stringstream buf; std::stringstream buf;
msgpack::pack(buf, res_obj); msgpack::pack(buf, res_obj);
fakedata[0] = buf.str();
fake_send(0, FTL_PROTOCOL_RPCRETURN, buf.str());
s.mock_data(); s.mock_data();
}; });
int res = s.call<int>("test1", 78); int res = s.call<int>("test1");
REQUIRE( (res == 43) ); thr.join();
REQUIRE( (res == 77) );
} }
}
waithandler = nullptr;
}*/
TEST_CASE("Peer::bind()", "[rpc]") { TEST_CASE("Peer::bind()", "[rpc]") {
MockPeer s; MockPeer s;
send_handshake(s);
s.mock_data();
SECTION("no argument call") { SECTION("no argument call") {
bool done = false; bool done = false;
...@@ -225,8 +245,6 @@ TEST_CASE("Peer::bind()", "[rpc]") { ...@@ -225,8 +245,6 @@ TEST_CASE("Peer::bind()", "[rpc]") {
done = true; done = true;
}); });
send_handshake(s);
s.mock_data();
s.send("hello"); s.send("hello");
s.mock_data(); // Force it to read the fake send... s.mock_data(); // Force it to read the fake send...
...@@ -240,8 +258,6 @@ TEST_CASE("Peer::bind()", "[rpc]") { ...@@ -240,8 +258,6 @@ TEST_CASE("Peer::bind()", "[rpc]") {
done = a; done = a;
}); });
send_handshake(s);
s.mock_data();
s.send("hello", 55); s.send("hello", 55);
s.mock_data(); // Force it to read the fake send... s.mock_data(); // Force it to read the fake send...
...@@ -254,9 +270,7 @@ TEST_CASE("Peer::bind()", "[rpc]") { ...@@ -254,9 +270,7 @@ TEST_CASE("Peer::bind()", "[rpc]") {
s.bind("hello", [&](int a, std::string b) { s.bind("hello", [&](int a, std::string b) {
done = b; done = b;
}); });
send_handshake(s);
s.mock_data();
s.send("hello", 55, "world"); s.send("hello", 55, "world");
s.mock_data(); // Force it to read the fake send... s.mock_data(); // Force it to read the fake send...
...@@ -264,26 +278,6 @@ TEST_CASE("Peer::bind()", "[rpc]") { ...@@ -264,26 +278,6 @@ TEST_CASE("Peer::bind()", "[rpc]") {
} }
} }
/*TEST_CASE("Socket::operator>>()", "[io]") {
MockPeer s;
SECTION("stream ints") {
int i[2];
i[0] = 99;
i[1] = 101;
fake_send(0, 100, std::string((char*)&i,2*sizeof(int)));
i[0] = 0;
i[1] = 0;
s.mock_data(); // Force a message read, but no protocol...
REQUIRE( (s.size() == 2*sizeof(int)) );
s >> i;
REQUIRE( (i[0] == 99) );
REQUIRE( (i[1] == 101) );
}
}*/
TEST_CASE("Socket::send()", "[io]") { TEST_CASE("Socket::send()", "[io]") {
MockPeer s; MockPeer s;
...@@ -297,133 +291,53 @@ TEST_CASE("Socket::send()", "[io]") { ...@@ -297,133 +291,53 @@ TEST_CASE("Socket::send()", "[io]") {
REQUIRE( (get<0>(value) == 607) ); REQUIRE( (get<0>(value) == 607) );
} }
/*SECTION("send a string") { SECTION("send a string") {
std::string str("hello world"); std::string str("hello world");
s.send(100,str); s.send("dummy",str);
REQUIRE( (get_service(0) == 100) ); auto [name, value] = readResponse<tuple<std::string>>(0);
REQUIRE( (get_size(0) == str.size()) );
REQUIRE( (get_value<std::string>(0) == "hello world") ); REQUIRE( (name == "dummy") );
REQUIRE( (get<0>(value) == "hello world") );
} }
SECTION("send const char* string") { SECTION("send const char* string") {
s.send(100,"hello world"); s.send("dummy","hello world");
REQUIRE( (get_service(0) == 100) ); auto [name, value] = readResponse<tuple<std::string>>(0);
REQUIRE( (get_size(0) == 11) );
REQUIRE( (get_value<std::string>(0) == "hello world") ); REQUIRE( (name == "dummy") );
REQUIRE( (get<0>(value) == "hello world") );
} }
SECTION("send const char* array") { /*SECTION("send const char* array") {
s.send(100,ftl::net::array{"hello world",10}); s.send(100,ftl::net::array{"hello world",10});
REQUIRE( (get_service(0) == 100) ); REQUIRE( (get_service(0) == 100) );
REQUIRE( (get_size(0) == 10) ); REQUIRE( (get_size(0) == 10) );
REQUIRE( (get_value<std::string>(0) == "hello worl") ); REQUIRE( (get_value<std::string>(0) == "hello worl") );
} }*/
SECTION("send a tuple") { SECTION("send a tuple") {
auto tup = std::make_tuple(55,66,true,6.7); auto tup = std::make_tuple(55,66,true,6.7);
s.send(100,tup); s.send("dummy",tup);
REQUIRE( (get_service(0) == 100) ); auto [name, value] = readResponse<tuple<decltype(tup)>>(0);
REQUIRE( (get_size(0) == sizeof(tup)) );
REQUIRE( (get_value<decltype(tup)>(0) == tup) ); REQUIRE( (name == "dummy") );
REQUIRE( (get<1>(get<0>(value)) == 66) );
} }
SECTION("send multiple strings") { SECTION("send multiple strings") {
std::string str("hello "); std::string str("hello ");
std::string str2("world"); std::string str2("world");
s.send(100,str,str2); s.send("dummy2",str,str2);
REQUIRE( (get_service(0) == 100) );
REQUIRE( (get_size(0) == str.size()+str2.size()) );
REQUIRE( (get_value<std::string>(0) == "hello world") );
}*/
}
/*TEST_CASE("Socket::read()", "[io]") {
MockSocket s;
SECTION("read an int") {
int i = 99;
fake_send(0, 100, std::string((char*)&i,4));
i = 0;
s.mock_data(); // Force a message read, but no protocol...
REQUIRE( (s.size() == sizeof(int)) );
REQUIRE( (s.read(i) == sizeof(int)) );
REQUIRE( (i == 99) );
}
SECTION("read two ints") {
int i[2];
i[0] = 99;
i[1] = 101;
fake_send(0, 100, std::string((char*)&i,2*sizeof(int)));
i[0] = 0;
i[1] = 0;
s.mock_data(); // Force a message read, but no protocol...
REQUIRE( (s.size() == 2*sizeof(int)) );
REQUIRE( (s.read(&i,2) == 2*sizeof(int)) );
REQUIRE( (i[0] == 99) );
REQUIRE( (i[1] == 101) );
}
SECTION("multiple reads") {
int i[2];
i[0] = 99;
i[1] = 101;
fake_send(0, 100, std::string((char*)&i,2*sizeof(int)));
i[0] = 0;
i[1] = 0;
s.mock_data(); // Force a message read, but no protocol...
REQUIRE( (s.read(&i[0],1) == sizeof(int)) );
REQUIRE( (i[0] == 99) );
REQUIRE( (s.read(&i[1],1) == sizeof(int)) );
REQUIRE( (i[1] == 101) );
}
SECTION("read a string") {
std::string str;
fake_send(0, 100, std::string("hello world"));
s.mock_data(); // Force a message read, but no protocol... auto [name, value] = readResponse<tuple<std::string,std::string>>(0);
REQUIRE( (s.size() == 11) ); REQUIRE( (name == "dummy2") );
REQUIRE( (s.read(str) == 11) ); REQUIRE( (get<0>(value) == "hello ") );
REQUIRE( (str == "hello world") ); REQUIRE( (get<1>(value) == "world") );
} }
}
SECTION("read into existing string") {
std::string str;
str.reserve(11);
void *ptr = str.data();
fake_send(0, 100, std::string("hello world"));
s.mock_data(); // Force a message read, but no protocol...
REQUIRE( (s.size() == 11) );
REQUIRE( (s.read(str) == 11) );
REQUIRE( (str == "hello world") );
REQUIRE( (str.data() == ptr) );
}
SECTION("read too much data") {
int i = 99;
fake_send(0, 100, std::string((char*)&i,4));
i = 0;
s.mock_data(); // Force a message read, but no protocol...
REQUIRE( (s.size() == sizeof(int)) );
REQUIRE( (s.read(&i,2) == sizeof(int)) );
REQUIRE( (i == 99) );
}
}*/
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment