Skip to content
Snippets Groups Projects
Commit 21dee176 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Implements #3 net auto reconnect

parent 3ce5684d
Branches
Tags
No related merge requests found
Showing
with 328 additions and 241 deletions
...@@ -134,7 +134,7 @@ check_include_file_cxx("opencv2/cudastereo.hpp" HAVE_OPENCVCUDA) ...@@ -134,7 +134,7 @@ check_include_file_cxx("opencv2/cudastereo.hpp" HAVE_OPENCVCUDA)
find_program(CPPCHECK_FOUND cppcheck) find_program(CPPCHECK_FOUND cppcheck)
if (CPPCHECK_FOUND) if (CPPCHECK_FOUND)
message(STATUS "Found cppcheck: will perform source checks") message(STATUS "Found cppcheck: will perform source checks")
set(CMAKE_CXX_CPPCHECK "cppcheck" "--enable=warning,performance,portability,style" "--inline-suppr" "--std=c++11" "--suppress=*:*catch.hpp" "--suppress=*:*elas*" "--suppress=*:*json.hpp" "--quiet") set(CMAKE_CXX_CPPCHECK "cppcheck" "--enable=warning,performance,portability,style" "--inline-suppr" "--std=c++11" "--suppress=*:*catch.hpp" "--suppress=*:*elas*" "--suppress=*:*nanogui*" "--suppress=*:*json.hpp" "--quiet")
endif() endif()
# include_directories(${PROJECT_SOURCE_DIR}/common/cpp/include) # include_directories(${PROJECT_SOURCE_DIR}/common/cpp/include)
......
...@@ -59,7 +59,8 @@ int main(int argc, char **argv) { ...@@ -59,7 +59,8 @@ int main(int argc, char **argv) {
auto root = ftl::configure(argc, argv, "gui_default"); auto root = ftl::configure(argc, argv, "gui_default");
ftl::net::Universe *net = ftl::create<ftl::net::Universe>(root, "net"); ftl::net::Universe *net = ftl::create<ftl::net::Universe>(root, "net");
net->waitConnections(); net->start();
//net->waitConnections();
ftl::ctrl::Master controller(root, net); ftl::ctrl::Master controller(root, net);
controller.onLog([](const ftl::ctrl::LogEvent &e){ controller.onLog([](const ftl::ctrl::LogEvent &e){
......
...@@ -166,11 +166,16 @@ SourceWindow::SourceWindow(nanogui::Widget *parent, ftl::ctrl::Master *ctrl) ...@@ -166,11 +166,16 @@ SourceWindow::SourceWindow(nanogui::Widget *parent, ftl::ctrl::Master *ctrl)
Alignment::Middle, 0, 6)); Alignment::Middle, 0, 6));
new Label(tools, "Select source","sans-bold"); new Label(tools, "Select source","sans-bold");
auto available = ctrl->getNet()->findAll<string>("list_streams"); available_ = ctrl->getNet()->findAll<string>("list_streams");
auto select = new ComboBox(tools, available); auto select = new ComboBox(tools, available_);
select->setCallback([this,available](int ix) { select->setCallback([this,select](int ix) {
LOG(INFO) << "Change source: " << ix; LOG(INFO) << "Change source: " << ix;
src_->set("uri", available[ix]); src_->set("uri", available_[ix]);
});
ctrl->getNet()->onConnect([this,select](ftl::net::Peer *p) {
available_ = ctrl_->getNet()->findAll<string>("list_streams");
select->setItems(available_);
}); });
auto depth = new Button(tools, "Depth"); auto depth = new Button(tools, "Depth");
......
...@@ -5,6 +5,8 @@ ...@@ -5,6 +5,8 @@
#include <ftl/master.hpp> #include <ftl/master.hpp>
#include <ftl/uuid.hpp> #include <ftl/uuid.hpp>
#include <ftl/net_source.hpp> #include <ftl/net_source.hpp>
#include <vector>
#include <string>
class VirtualCameraView; class VirtualCameraView;
...@@ -23,6 +25,7 @@ class SourceWindow : public nanogui::Window { ...@@ -23,6 +25,7 @@ class SourceWindow : public nanogui::Window {
ftl::ctrl::Master *ctrl_; ftl::ctrl::Master *ctrl_;
ftl::rgbd::NetSource *src_; ftl::rgbd::NetSource *src_;
VirtualCameraView *image_; VirtualCameraView *image_;
std::vector<std::string> available_;
}; };
......
...@@ -30,7 +30,7 @@ namespace MatrixConversion ...@@ -30,7 +30,7 @@ namespace MatrixConversion
return Eigen::Matrix4f(mat.ptr()).transpose(); return Eigen::Matrix4f(mat.ptr()).transpose();
}*/ }*/
static Eigen::Vector4f VecH(const Eigen::Vector3f& v) /*static Eigen::Vector4f VecH(const Eigen::Vector3f& v)
{ {
return Eigen::Vector4f(v[0], v[1], v[2], 1.0); return Eigen::Vector4f(v[0], v[1], v[2], 1.0);
} }
...@@ -38,7 +38,7 @@ namespace MatrixConversion ...@@ -38,7 +38,7 @@ namespace MatrixConversion
static Eigen::Vector3f VecDH(const Eigen::Vector4f& v) static Eigen::Vector3f VecDH(const Eigen::Vector4f& v)
{ {
return Eigen::Vector3f(v[0]/v[3], v[1]/v[3], v[2]/v[3]); return Eigen::Vector3f(v[0]/v[3], v[1]/v[3], v[2]/v[3]);
} }*/
/*static Eigen::Vector3f VecToEig(const vec3f& v) /*static Eigen::Vector3f VecToEig(const vec3f& v)
{ {
......
...@@ -44,7 +44,7 @@ bool findChessboardCorners(cv::Mat &rgb, const cv::Mat &depth, const ftl::rgbd:: ...@@ -44,7 +44,7 @@ bool findChessboardCorners(cv::Mat &rgb, const cv::Mat &depth, const ftl::rgbd::
*/ */
class Registration : public ftl::Configurable { class Registration : public ftl::Configurable {
public: public:
Registration(nlohmann::json &config); explicit Registration(nlohmann::json &config);
void addSource(ftl::rgbd::RGBDSource* source); void addSource(ftl::rgbd::RGBDSource* source);
size_t getSourcesCount() { return sources_.size(); } size_t getSourcesCount() { return sources_.size(); }
...@@ -161,7 +161,7 @@ private: ...@@ -161,7 +161,7 @@ private:
*/ */
class ChessboardRegistration : public Registration { class ChessboardRegistration : public Registration {
public: public:
ChessboardRegistration(nlohmann::json &config); explicit ChessboardRegistration(nlohmann::json &config);
/** /**
* @brief Creates new ChessboardRegistration or ChessboardRegistrationChain * @brief Creates new ChessboardRegistration or ChessboardRegistrationChain
* object depending on chain option in config. User of the method * object depending on chain option in config. User of the method
...@@ -189,7 +189,7 @@ protected: ...@@ -189,7 +189,7 @@ protected:
*/ */
class ChessboardRegistrationChain : public ChessboardRegistration { class ChessboardRegistrationChain : public ChessboardRegistration {
public: public:
ChessboardRegistrationChain(nlohmann::json &config); explicit ChessboardRegistrationChain(nlohmann::json &config);
bool findTransformations(std::vector<Eigen::Matrix4f> &data) override; bool findTransformations(std::vector<Eigen::Matrix4f> &data) override;
......
...@@ -195,7 +195,7 @@ class SceneRep : public ftl::Configurable { ...@@ -195,7 +195,7 @@ class SceneRep : public ftl::Configurable {
std::unordered_set<unsigned int> pointersFreeHash; std::unordered_set<unsigned int> pointersFreeHash;
std::vector<unsigned int> pointersFreeVec(m_hashParams.m_numSDFBlocks, 0); std::vector<int> pointersFreeVec(m_hashParams.m_numSDFBlocks, 0); // CHECK Nick Changed to int from unsigned in
for (unsigned int i = 0; i < heapCounterCPU; i++) { for (unsigned int i = 0; i < heapCounterCPU; i++) {
pointersFreeHash.insert(heapCPU[i]); pointersFreeHash.insert(heapCPU[i]);
pointersFreeVec[heapCPU[i]] = FREE_ENTRY; pointersFreeVec[heapCPU[i]] = FREE_ENTRY;
...@@ -207,7 +207,7 @@ class SceneRep : public ftl::Configurable { ...@@ -207,7 +207,7 @@ class SceneRep : public ftl::Configurable {
unsigned int numOccupied = 0; unsigned int numOccupied = 0;
unsigned int numMinusOne = 0; unsigned int numMinusOne = 0;
unsigned int listOverallFound = 0; //unsigned int listOverallFound = 0;
std::list<myint3Voxel> l; std::list<myint3Voxel> l;
//std::vector<myint3Voxel> v; //std::vector<myint3Voxel> v;
......
...@@ -75,9 +75,7 @@ struct Cameras { ...@@ -75,9 +75,7 @@ struct Cameras {
static void run(ftl::Configurable *root) { static void run(ftl::Configurable *root) {
Universe *net = ftl::create<Universe>(root, "net"); Universe *net = ftl::create<Universe>(root, "net");
// Make sure connections are complete net->start();
// sleep_for(milliseconds(500));
// TODO: possible to do more reliably?
net->waitConnections(); net->waitConnections();
std::vector<Cameras> inputs; std::vector<Cameras> inputs;
......
...@@ -58,5 +58,5 @@ void VirtualSource::grab() { ...@@ -58,5 +58,5 @@ void VirtualSource::grab() {
} }
bool VirtualSource::isReady() { bool VirtualSource::isReady() {
return true;
} }
...@@ -6,7 +6,6 @@ ...@@ -6,7 +6,6 @@
set(CVNODESRC set(CVNODESRC
src/main.cpp src/main.cpp
src/streamer.cpp
src/middlebury.cpp src/middlebury.cpp
) )
......
...@@ -96,6 +96,7 @@ static void run(ftl::Configurable *root) { ...@@ -96,6 +96,7 @@ static void run(ftl::Configurable *root) {
stream->add(source); stream->add(source);
stream->run(); stream->run();
net->start();
LOG(INFO) << "Running..."; LOG(INFO) << "Running...";
while (ftl::running && display->active()) { while (ftl::running && display->active()) {
......
...@@ -68,12 +68,25 @@ Configurable *find(const std::string &uri); ...@@ -68,12 +68,25 @@ Configurable *find(const std::string &uri);
*/ */
void registerConfigurable(Configurable *cfg); void registerConfigurable(Configurable *cfg);
/**
* Create a new configurable directly from a raw object. This should not be used.
*/
template <typename T, typename... ARGS> template <typename T, typename... ARGS>
T *create(json_t &link, ARGS ...args); T *create(json_t &link, ARGS ...args);
/**
* Create a configurable from an attribute of a parent configurable.
*/
template <typename T, typename... ARGS> template <typename T, typename... ARGS>
T *create(ftl::Configurable *parent, const std::string &name, ARGS ...args); T *create(ftl::Configurable *parent, const std::string &name, ARGS ...args);
/**
* Create a configurable rooted on a parent but with a specific object
* that is not directly a child of the parent. Used by RGB-D Factory.
*/
template <typename T, typename... ARGS>
T *create(ftl::Configurable *parent, json_t &obj, const std::string &name, ARGS ...args);
void set(const std::string &uri, const nlohmann::json &); void set(const std::string &uri, const nlohmann::json &);
} // namespace config } // namespace config
...@@ -138,12 +151,38 @@ T *ftl::config::create(ftl::Configurable *parent, const std::string &name, ARGS ...@@ -138,12 +151,38 @@ T *ftl::config::create(ftl::Configurable *parent, const std::string &name, ARGS
id_str = id_str + std::string("#") + name; id_str = id_str + std::string("#") + name;
} }
parent->getConfig()[name] = { parent->getConfig()[name] = {
// cppcheck-suppress constStatement
{"$id", id_str} {"$id", id_str}
}; };
nlohmann::json &entity2 = parent->getConfig()[name]; nlohmann::json &entity2 = parent->getConfig()[name];
return create<T>(entity2, args...); return create<T>(entity2, args...);
} }
LOG(ERROR) << "Unable to create Configurable entity '" << name << "'";
return nullptr;
}
template <typename T, typename... ARGS>
T *ftl::config::create(ftl::Configurable *parent, json_t &obj, const std::string &name, ARGS ...args) {
//nlohmann::json &entity = ftl::config::resolve(parent->getConfig()[name]);
nlohmann::json &entity = obj;
if (entity.is_object()) {
if (!entity["$id"].is_string()) {
std::string id_str = *parent->get<std::string>("$id");
if (id_str.find('#') != std::string::npos) {
entity["$id"] = id_str + std::string("/") + name;
} else {
entity["$id"] = id_str + std::string("#") + name;
}
}
return create<T>(entity, args...);
} else if (entity.is_null()) {
LOG(FATAL) << "Invalid raw object in Configurable construction";
return nullptr;
}
} }
#endif // _FTL_COMMON_CONFIGURATION_HPP_ #endif // _FTL_COMMON_CONFIGURATION_HPP_
......
...@@ -24,6 +24,7 @@ pcl::PointCloud<pcl::PointXYZRGB>::Ptr ftl::utility::matToPointXYZ(const cv::Mat ...@@ -24,6 +24,7 @@ pcl::PointCloud<pcl::PointXYZRGB>::Ptr ftl::utility::matToPointXYZ(const cv::Mat
// when color needs to be added: // when color needs to be added:
uint32_t rgb = (static_cast<uint32_t>(prgb.z) << 16 | static_cast<uint32_t>(prgb.y) << 8 | static_cast<uint32_t>(prgb.x)); uint32_t rgb = (static_cast<uint32_t>(prgb.z) << 16 | static_cast<uint32_t>(prgb.y) << 8 | static_cast<uint32_t>(prgb.x));
// cppcheck-suppress invalidPointerCast
point.rgb = *reinterpret_cast<float*>(&rgb); point.rgb = *reinterpret_cast<float*>(&rgb);
point_cloud_ptr -> points.push_back(point); point_cloud_ptr -> points.push_back(point);
......
...@@ -8,6 +8,7 @@ using std::string; ...@@ -8,6 +8,7 @@ using std::string;
SCENARIO( "Configurable::get()" ) { SCENARIO( "Configurable::get()" ) {
GIVEN( "a non-existent property" ) { GIVEN( "a non-existent property" ) {
// cppcheck-suppress constStatement
nlohmann::json json = {{"test",5}}; nlohmann::json json = {{"test",5}};
Configurable cfg(json); Configurable cfg(json);
...@@ -16,6 +17,7 @@ SCENARIO( "Configurable::get()" ) { ...@@ -16,6 +17,7 @@ SCENARIO( "Configurable::get()" ) {
} }
GIVEN( "a valid property" ) { GIVEN( "a valid property" ) {
// cppcheck-suppress constStatement
nlohmann::json json = {{"test",5}}; nlohmann::json json = {{"test",5}};
Configurable cfg(json); Configurable cfg(json);
...@@ -27,6 +29,7 @@ SCENARIO( "Configurable::get()" ) { ...@@ -27,6 +29,7 @@ SCENARIO( "Configurable::get()" ) {
SCENARIO( "Configurable::on()" ) { SCENARIO( "Configurable::on()" ) {
GIVEN( "a changed property with no handlers" ) { GIVEN( "a changed property with no handlers" ) {
// cppcheck-suppress constStatement
nlohmann::json json = {{"test",5}}; nlohmann::json json = {{"test",5}};
Configurable cfg(json); Configurable cfg(json);
...@@ -36,6 +39,7 @@ SCENARIO( "Configurable::on()" ) { ...@@ -36,6 +39,7 @@ SCENARIO( "Configurable::on()" ) {
} }
GIVEN( "a changed property one handler" ) { GIVEN( "a changed property one handler" ) {
// cppcheck-suppress constStatement
nlohmann::json json = {{"test",5}}; nlohmann::json json = {{"test",5}};
Configurable cfg(json); Configurable cfg(json);
bool trig = false; bool trig = false;
...@@ -50,6 +54,7 @@ SCENARIO( "Configurable::on()" ) { ...@@ -50,6 +54,7 @@ SCENARIO( "Configurable::on()" ) {
} }
GIVEN( "a changed property two handlers" ) { GIVEN( "a changed property two handlers" ) {
// cppcheck-suppress constStatement
nlohmann::json json = {{"test",5}}; nlohmann::json json = {{"test",5}};
Configurable cfg(json); Configurable cfg(json);
bool trig1 = false; bool trig1 = false;
......
...@@ -68,8 +68,8 @@ class Peer { ...@@ -68,8 +68,8 @@ class Peer {
}; };
public: public:
explicit Peer(const char *uri, ftl::net::Dispatcher *d=nullptr); explicit Peer(const char *uri, ftl::net::Universe *, ftl::net::Dispatcher *d=nullptr);
explicit Peer(SOCKET s, ftl::net::Dispatcher *d=nullptr); explicit Peer(SOCKET s, ftl::net::Universe *, ftl::net::Dispatcher *d=nullptr);
~Peer(); ~Peer();
/** /**
...@@ -94,6 +94,11 @@ class Peer { ...@@ -94,6 +94,11 @@ class Peer {
*/ */
bool waitConnection(); bool waitConnection();
/**
* Make a reconnect attempt. Called internally by Universe object.
*/
bool reconnect();
/** /**
* Test if the connection is valid. This returns true in all conditions * Test if the connection is valid. This returns true in all conditions
* except where the socket has been disconnected permenantly or was never * except where the socket has been disconnected permenantly or was never
...@@ -176,8 +181,8 @@ class Peer { ...@@ -176,8 +181,8 @@ class Peer {
void bind(const std::string &name, F func); void bind(const std::string &name, F func);
// void onError(std::function<void(Peer &, int err, const char *msg)> &f) {} // void onError(std::function<void(Peer &, int err, const char *msg)> &f) {}
void onConnect(const std::function<void(Peer &)> &f); //void onConnect(const std::function<void(Peer &)> &f);
void onDisconnect(std::function<void(Peer &)> &f) {} //void onDisconnect(std::function<void(Peer &)> &f) {}
bool isWaiting() const { return is_waiting_; } bool isWaiting() const { return is_waiting_; }
...@@ -226,11 +231,13 @@ class Peer { ...@@ -226,11 +231,13 @@ class Peer {
} }
} }
private: // Data private:
Status status_; Status status_; // Connected, errored, reconnecting...
SOCKET sock_; SOCKET sock_; // Raw OS socket
ftl::URI::scheme_t scheme_; ftl::URI::scheme_t scheme_; // TCP / WS / UDP...
uint32_t version_; uint32_t version_; // Received protocol version in handshake
bool can_reconnect_; // Client connections can retry
ftl::net::Universe *universe_; // Origin net universe
// Receive buffers // Receive buffers
bool is_waiting_; bool is_waiting_;
...@@ -241,16 +248,16 @@ class Peer { ...@@ -241,16 +248,16 @@ class Peer {
msgpack::vrefbuffer send_buf_; msgpack::vrefbuffer send_buf_;
std::mutex send_mtx_; std::mutex send_mtx_;
std::string uri_; std::string uri_; // Original connection URI, or assumed URI
ftl::UUID peerid_; ftl::UUID peerid_; // Received in handshake or allocated
ftl::net::Dispatcher *disp_; ftl::net::Dispatcher *disp_; // For RPC call dispatch
std::vector<std::function<void(Peer &)>> open_handlers_; //std::vector<std::function<void(Peer &)>> open_handlers_;
//std::vector<std::function<void(const ftl::net::Error &)>> error_handlers_ //std::vector<std::function<void(const ftl::net::Error &)>> error_handlers_
std::vector<std::function<void(Peer &)>> close_handlers_; //std::vector<std::function<void(Peer &)>> close_handlers_;
std::map<int, std::unique_ptr<virtual_caller>> callbacks_; std::map<int, std::unique_ptr<virtual_caller>> callbacks_;
static int rpcid__; static int rpcid__; // Return ID for RPC calls
}; };
// --- Inline Template Implementations ----------------------------------------- // --- Inline Template Implementations -----------------------------------------
......
...@@ -12,9 +12,10 @@ ...@@ -12,9 +12,10 @@
#include <ftl/uuid.hpp> #include <ftl/uuid.hpp>
#include <nlohmann/json.hpp> #include <nlohmann/json.hpp>
#include <vector> #include <vector>
#include <list>
#include <string> #include <string>
#include <thread> #include <thread>
#include <mutex> #include <shared_mutex>
#include <map> #include <map>
namespace ftl { namespace ftl {
...@@ -24,6 +25,14 @@ struct Error { ...@@ -24,6 +25,14 @@ struct Error {
int errno; int errno;
}; };
struct ReconnectInfo {
int tries;
float delay;
Peer *peer;
};
typedef unsigned int callback_t;
/** /**
* Represents a group of network peers and their resources, managing the * Represents a group of network peers and their resources, managing the
* searching of and sharing of resources across peers. Each universe can * searching of and sharing of resources across peers. Each universe can
...@@ -34,6 +43,9 @@ struct Error { ...@@ -34,6 +43,9 @@ struct Error {
* their actions. * their actions.
*/ */
class Universe : public ftl::Configurable { class Universe : public ftl::Configurable {
public:
friend class Peer;
public: public:
Universe(); Universe();
/** /**
...@@ -49,6 +61,8 @@ class Universe : public ftl::Configurable { ...@@ -49,6 +61,8 @@ class Universe : public ftl::Configurable {
*/ */
~Universe(); ~Universe();
void start();
/** /**
* Open a new listening port on a given interfaces. * Open a new listening port on a given interfaces.
* eg. "tcp://localhost:9000" * eg. "tcp://localhost:9000"
...@@ -82,10 +96,10 @@ class Universe : public ftl::Configurable { ...@@ -82,10 +96,10 @@ class Universe : public ftl::Configurable {
Peer *getPeer(const ftl::UUID &pid) const; Peer *getPeer(const ftl::UUID &pid) const;
int numberOfSubscribers(const std::string &res) const; //int numberOfSubscribers(const std::string &res) const;
bool hasSubscribers(const std::string &res) const; //bool hasSubscribers(const std::string &res) const;
bool hasSubscribers(const ftl::URI &res) const; //bool hasSubscribers(const ftl::URI &res) const;
/** /**
* Bind a function to an RPC or service call name. This will implicitely * Bind a function to an RPC or service call name. This will implicitely
...@@ -101,16 +115,18 @@ class Universe : public ftl::Configurable { ...@@ -101,16 +115,18 @@ class Universe : public ftl::Configurable {
* triggered whenever that resource is published to. It is akin to * triggered whenever that resource is published to. It is akin to
* RPC broadcast (no return value) to a subgroup of peers. * RPC broadcast (no return value) to a subgroup of peers.
*/ */
template <typename F> //template <typename F>
bool subscribe(const std::string &res, F func); //[[deprecated("Pub sub no longer to be used")]]
//bool subscribe(const std::string &res, F func);
/** /**
* Subscribe a function to a resource. The subscribed function is * Subscribe a function to a resource. The subscribed function is
* triggered whenever that resource is published to. It is akin to * triggered whenever that resource is published to. It is akin to
* RPC broadcast (no return value) to a subgroup of peers. * RPC broadcast (no return value) to a subgroup of peers.
*/ */
template <typename F> //template <typename F>
bool subscribe(const ftl::URI &res, F func); //[[deprecated("Pub sub no longer to be used")]]
//bool subscribe(const ftl::URI &res, F func);
/** /**
* Send a non-blocking RPC call with no return value to all connected * Send a non-blocking RPC call with no return value to all connected
...@@ -136,75 +152,78 @@ class Universe : public ftl::Configurable { ...@@ -136,75 +152,78 @@ class Universe : public ftl::Configurable {
* of a resource. There may be no subscribers. Note that query parameter * of a resource. There may be no subscribers. Note that query parameter
* order in the URI string is not important. * order in the URI string is not important.
*/ */
//template <typename... ARGS>
//[[deprecated("Pub sub no longer to be used")]] //[[deprecated("Pub sub no longer to be used")]]
template <typename... ARGS> //void publish(const std::string &res, ARGS... args);
void publish(const std::string &res, ARGS... args);
/** /**
* Send a non-blocking RPC call with no return value to all subscribers * Send a non-blocking RPC call with no return value to all subscribers
* of a resource. There may be no subscribers. This overload accepts a * of a resource. There may be no subscribers. This overload accepts a
* URI object directly to enable more efficient modification of parameters. * URI object directly to enable more efficient modification of parameters.
*/ */
//template <typename... ARGS>
//[[deprecated("Pub sub no longer to be used")]] //[[deprecated("Pub sub no longer to be used")]]
template <typename... ARGS> //void publish(const ftl::URI &res, ARGS... args);
void publish(const ftl::URI &res, ARGS... args);
/** /**
* Register your ownership of a new resource. This must be called before * Register your ownership of a new resource. This must be called before
* publishing to this resource and before any peers attempt to subscribe. * publishing to this resource and before any peers attempt to subscribe.
*/ */
//[[deprecated("Pub sub no longer to be used")]] //[[deprecated("Pub sub no longer to be used")]]
bool createResource(const std::string &uri); //bool createResource(const std::string &uri);
//[[deprecated("Pub sub no longer to be used")]] //[[deprecated("Pub sub no longer to be used")]]
std::optional<ftl::UUID> findOwner(const std::string &res); //std::optional<ftl::UUID> findOwner(const std::string &res);
void setLocalID(const ftl::UUID &u) { this_peer = u; }; void setLocalID(const ftl::UUID &u) { this_peer = u; };
const ftl::UUID &id() const { return this_peer; } const ftl::UUID &id() const { return this_peer; }
// --- Event Handlers ------------------------------------------------------ // --- Event Handlers ------------------------------------------------------
void onConnect(const std::string &, std::function<void(ftl::net::Peer*)>); ftl::net::callback_t onConnect(const std::function<void(ftl::net::Peer*)>&);
void onDisconnect(const std::string &, std::function<void(ftl::net::Peer*)>); ftl::net::callback_t onDisconnect(const std::function<void(ftl::net::Peer*)>&);
void onError(const std::string &, std::function<void(ftl::net::Peer*, const ftl::net::Error &)>); ftl::net::callback_t onError(const std::function<void(ftl::net::Peer*, const ftl::net::Error &)>&);
void removeCallbacks(const std::string &); void removeCallback(ftl::net::callback_t cbid);
private: private:
void _run(); void _run();
int _setDescriptors(); int _setDescriptors();
void _installBindings(); void _installBindings();
void _installBindings(Peer *); void _installBindings(Peer *);
bool _subscribe(const std::string &res); //bool _subscribe(const std::string &res);
void _cleanupPeers(); void _cleanupPeers();
void _notifyConnect(Peer *); void _notifyConnect(Peer *);
void _notifyDisconnect(Peer *); void _notifyDisconnect(Peer *);
void _notifyError(Peer *, const ftl::net::Error &); void _notifyError(Peer *, const ftl::net::Error &);
void _periodic();
static void __start(Universe *u); static void __start(Universe *u);
private: private:
bool active_; bool active_;
ftl::UUID this_peer; ftl::UUID this_peer;
std::mutex net_mutex_; std::shared_mutex net_mutex_;
std::shared_mutex handler_mutex_;
fd_set sfderror_; fd_set sfderror_;
fd_set sfdread_; fd_set sfdread_;
std::vector<ftl::net::Listener*> listeners_; std::vector<ftl::net::Listener*> listeners_;
std::vector<ftl::net::Peer*> peers_; std::vector<ftl::net::Peer*> peers_;
std::map<std::string, std::vector<ftl::UUID>> subscribers_; //std::map<std::string, std::vector<ftl::UUID>> subscribers_;
std::unordered_set<std::string> owned_; //std::unordered_set<std::string> owned_;
std::map<ftl::UUID, ftl::net::Peer*> peer_ids_; std::map<ftl::UUID, ftl::net::Peer*> peer_ids_;
ftl::UUID id_; ftl::UUID id_;
ftl::net::Dispatcher disp_; ftl::net::Dispatcher disp_;
std::thread thread_; std::thread thread_;
std::list<ReconnectInfo> reconnects_;
struct ConnHandler { struct ConnHandler {
std::string name; callback_t id;
std::function<void(ftl::net::Peer*)> h; std::function<void(ftl::net::Peer*)> h;
}; };
struct ErrHandler { struct ErrHandler {
std::string name; callback_t id;
std::function<void(ftl::net::Peer*, const ftl::net::Error &)> h; std::function<void(ftl::net::Peer*, const ftl::net::Error &)> h;
}; };
...@@ -213,6 +232,8 @@ class Universe : public ftl::Configurable { ...@@ -213,6 +232,8 @@ class Universe : public ftl::Configurable {
std::list<ConnHandler> on_disconnect_; std::list<ConnHandler> on_disconnect_;
std::list<ErrHandler> on_error_; std::list<ErrHandler> on_error_;
static callback_t cbid__;
// std::map<std::string, std::vector<ftl::net::Peer*>> subscriptions_; // std::map<std::string, std::vector<ftl::net::Peer*>> subscriptions_;
}; };
...@@ -220,13 +241,13 @@ class Universe : public ftl::Configurable { ...@@ -220,13 +241,13 @@ class Universe : public ftl::Configurable {
template <typename F> template <typename F>
void Universe::bind(const std::string &name, F func) { void Universe::bind(const std::string &name, F func) {
// CHECK Need mutex? std::unique_lock<std::shared_mutex> lk(net_mutex_);
disp_.bind(name, func, disp_.bind(name, func,
typename ftl::internal::func_kind_info<F>::result_kind(), typename ftl::internal::func_kind_info<F>::result_kind(),
typename ftl::internal::func_kind_info<F>::args_kind()); typename ftl::internal::func_kind_info<F>::args_kind());
} }
template <typename F> /*template <typename F>
bool Universe::subscribe(const std::string &res, F func) { bool Universe::subscribe(const std::string &res, F func) {
return subscribe(ftl::URI(res), func); return subscribe(ftl::URI(res), func);
} }
...@@ -235,12 +256,13 @@ template <typename F> ...@@ -235,12 +256,13 @@ template <typename F>
bool Universe::subscribe(const ftl::URI &res, F func) { bool Universe::subscribe(const ftl::URI &res, F func) {
bind(res.to_string(), func); bind(res.to_string(), func);
return _subscribe(res.to_string()); return _subscribe(res.to_string());
} }*/
template <typename... ARGS> template <typename... ARGS>
void Universe::broadcast(const std::string &name, ARGS... args) { void Universe::broadcast(const std::string &name, ARGS... args) {
std::shared_lock<std::shared_mutex> lk(net_mutex_);
for (auto p : peers_) { for (auto p : peers_) {
p->send(name, args...); if (p->isConnected()) p->send(name, args...);
} }
} }
...@@ -262,15 +284,18 @@ std::optional<R> Universe::findOne(const std::string &name, ARGS... args) { ...@@ -262,15 +284,18 @@ std::optional<R> Universe::findOne(const std::string &name, ARGS... args) {
}; };
std::map<Peer*, int> record; std::map<Peer*, int> record;
std::shared_lock<std::shared_mutex> lk(net_mutex_);
for (auto p : peers_) { for (auto p : peers_) {
record[p] = p->asyncCall<std::optional<R>>(name, handler, args...); if (p->isConnected()) record[p] = p->asyncCall<std::optional<R>>(name, handler, args...);
} }
lk.unlock();
{ // Block thread until async callback notifies us { // Block thread until async callback notifies us
std::unique_lock<std::mutex> lk(m); std::unique_lock<std::mutex> llk(m);
cv.wait_for(lk, std::chrono::seconds(1), [&hasreturned]{return hasreturned;}); cv.wait_for(llk, std::chrono::seconds(1), [&hasreturned]{return hasreturned;});
// Cancel any further results // Cancel any further results
lk.lock();
for (auto p : peers_) { for (auto p : peers_) {
auto m = record.find(p); auto m = record.find(p);
if (m != record.end()) { if (m != record.end()) {
...@@ -300,16 +325,20 @@ std::vector<R> Universe::findAll(const std::string &name, ARGS... args) { ...@@ -300,16 +325,20 @@ std::vector<R> Universe::findAll(const std::string &name, ARGS... args) {
}; };
std::map<Peer*, int> record; std::map<Peer*, int> record;
std::shared_lock<std::shared_mutex> lk(net_mutex_);
for (auto p : peers_) { for (auto p : peers_) {
if (!p->isConnected()) continue;
sentcount++; sentcount++;
record[p] = p->asyncCall<std::vector<R>>(name, handler, args...); record[p] = p->asyncCall<std::vector<R>>(name, handler, args...);
} }
lk.unlock();
{ // Block thread until async callback notifies us { // Block thread until async callback notifies us
std::unique_lock<std::mutex> lk(m); std::unique_lock<std::mutex> llk(m);
cv.wait_for(lk, std::chrono::seconds(1), [&returncount,&sentcount]{return returncount == sentcount;}); cv.wait_for(llk, std::chrono::seconds(1), [&returncount,&sentcount]{return returncount == sentcount;});
// Cancel any further results // Cancel any further results
lk.lock();
for (auto p : peers_) { for (auto p : peers_) {
auto m = record.find(p); auto m = record.find(p);
if (m != record.end()) { if (m != record.end()) {
...@@ -324,7 +353,7 @@ std::vector<R> Universe::findAll(const std::string &name, ARGS... args) { ...@@ -324,7 +353,7 @@ std::vector<R> Universe::findAll(const std::string &name, ARGS... args) {
template <typename R, typename... ARGS> template <typename R, typename... ARGS>
R Universe::call(const ftl::UUID &pid, const std::string &name, ARGS... args) { R Universe::call(const ftl::UUID &pid, const std::string &name, ARGS... args) {
Peer *p = getPeer(pid); Peer *p = getPeer(pid);
if (p == nullptr) { if (p == nullptr || !p->isConnected()) {
DLOG(WARNING) << "Attempting to call an unknown peer : " << pid.to_string(); DLOG(WARNING) << "Attempting to call an unknown peer : " << pid.to_string();
throw -1; throw -1;
} }
...@@ -336,12 +365,12 @@ bool Universe::send(const ftl::UUID &pid, const std::string &name, ARGS... args) ...@@ -336,12 +365,12 @@ bool Universe::send(const ftl::UUID &pid, const std::string &name, ARGS... args)
Peer *p = getPeer(pid); Peer *p = getPeer(pid);
if (p == nullptr) { if (p == nullptr) {
DLOG(WARNING) << "Attempting to call an unknown peer : " << pid.to_string(); DLOG(WARNING) << "Attempting to call an unknown peer : " << pid.to_string();
throw -1; return false;
} }
return p->send(name, args...) > 0; return p->isConnected() && p->send(name, args...) > 0;
} }
template <typename... ARGS> /*template <typename... ARGS>
void Universe::publish(const std::string &res, ARGS... args) { void Universe::publish(const std::string &res, ARGS... args) {
ftl::URI uri(res); ftl::URI uri(res);
publish(uri, args...); publish(uri, args...);
...@@ -349,7 +378,7 @@ void Universe::publish(const std::string &res, ARGS... args) { ...@@ -349,7 +378,7 @@ void Universe::publish(const std::string &res, ARGS... args) {
template <typename... ARGS> template <typename... ARGS>
void Universe::publish(const ftl::URI &res, ARGS... args) { void Universe::publish(const ftl::URI &res, ARGS... args) {
std::unique_lock<std::mutex> lk(net_mutex_); std::unique_lock<std::shared_mutex> lk(net_mutex_);
auto subs = subscribers_[res.getBaseURI()]; auto subs = subscribers_[res.getBaseURI()];
lk.unlock(); lk.unlock();
for (auto p : subs) { for (auto p : subs) {
...@@ -358,7 +387,7 @@ void Universe::publish(const ftl::URI &res, ARGS... args) { ...@@ -358,7 +387,7 @@ void Universe::publish(const ftl::URI &res, ARGS... args) {
peer->send(res.getBaseURI(), args...); peer->send(res.getBaseURI(), args...);
} }
} }
} }*/
}; // namespace net }; // namespace net
}; // namespace ftl }; // namespace ftl
......
...@@ -140,7 +140,7 @@ void ftl::net::Dispatcher::dispatch_notification(Peer &s, msgpack::object const ...@@ -140,7 +140,7 @@ void ftl::net::Dispatcher::dispatch_notification(Peer &s, msgpack::object const
throw &e; throw &e;
} }
} else { } else {
LOG(ERROR) << "Missing handler for incoming message"; LOG(ERROR) << "Missing handler for incoming message (" << name << ")";
} }
} }
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include <ftl/net/ws_internal.hpp> #include <ftl/net/ws_internal.hpp>
#include <ftl/config.h> #include <ftl/config.h>
#include "net_internal.hpp" #include "net_internal.hpp"
#include <ftl/net/universe.hpp>
#include <iostream> #include <iostream>
#include <memory> #include <memory>
...@@ -37,6 +38,10 @@ using ftl::URI; ...@@ -37,6 +38,10 @@ using ftl::URI;
using ftl::net::ws_connect; using ftl::net::ws_connect;
using ftl::net::Dispatcher; using ftl::net::Dispatcher;
using std::chrono::seconds; using std::chrono::seconds;
using ftl::net::Universe;
using ftl::net::callback_t;
using std::mutex;
using std::unique_lock;
/*static std::string hexStr(const std::string &s) /*static std::string hexStr(const std::string &s)
{ {
...@@ -127,7 +132,7 @@ static SOCKET tcpConnect(URI &uri) { ...@@ -127,7 +132,7 @@ static SOCKET tcpConnect(URI &uri) {
return csocket; return csocket;
} }
Peer::Peer(SOCKET s, Dispatcher *d) : sock_(s) { Peer::Peer(SOCKET s, Universe *u, Dispatcher *d) : sock_(s), can_reconnect_(false), universe_(u) {
status_ = (s == INVALID_SOCKET) ? kInvalid : kConnecting; status_ = (s == INVALID_SOCKET) ? kInvalid : kConnecting;
_updateURI(); _updateURI();
...@@ -149,7 +154,10 @@ Peer::Peer(SOCKET s, Dispatcher *d) : sock_(s) { ...@@ -149,7 +154,10 @@ Peer::Peer(SOCKET s, Dispatcher *d) : sock_(s) {
peerid_ = pid; peerid_ = pid;
if (version != ftl::net::kVersion) LOG(WARNING) << "Net protocol using different versions!"; if (version != ftl::net::kVersion) LOG(WARNING) << "Net protocol using different versions!";
_trigger(open_handlers_); // Ensure handlers called later or in new thread
pool.push([this](int id) {
universe_->_notifyConnect(this);
});
} }
}); });
...@@ -158,11 +166,15 @@ Peer::Peer(SOCKET s, Dispatcher *d) : sock_(s) { ...@@ -158,11 +166,15 @@ Peer::Peer(SOCKET s, Dispatcher *d) : sock_(s) {
LOG(INFO) << "Peer elected to disconnect: " << id().to_string(); LOG(INFO) << "Peer elected to disconnect: " << id().to_string();
}); });
bind("__ping__", [this](unsigned long long timestamp) {
return timestamp;
});
send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, ftl::net::this_peer); send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, ftl::net::this_peer);
} }
} }
Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) { Peer::Peer(const char *pUri, Universe *u, Dispatcher *d) : can_reconnect_(true), universe_(u), uri_(pUri) {
URI uri(pUri); URI uri(pUri);
status_ = kInvalid; status_ = kInvalid;
...@@ -170,6 +182,9 @@ Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) { ...@@ -170,6 +182,9 @@ Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) {
disp_ = new Dispatcher(d); disp_ = new Dispatcher(d);
// Must to to prevent receiving message before handlers are installed
unique_lock<mutex> lk(recv_mtx_);
scheme_ = uri.getProtocol(); scheme_ = uri.getProtocol();
if (uri.getProtocol() == URI::SCHEME_TCP) { if (uri.getProtocol() == URI::SCHEME_TCP) {
sock_ = tcpConnect(uri); sock_ = tcpConnect(uri);
...@@ -194,7 +209,7 @@ Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) { ...@@ -194,7 +209,7 @@ Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) {
is_waiting_ = true; is_waiting_ = true;
if (status_ == kConnecting) { if (status_ == kConnecting || status_ == kReconnecting) {
// Install return handshake handler. // Install return handshake handler.
bind("__handshake__", [this](uint64_t magic, uint32_t version, UUID pid) { bind("__handshake__", [this](uint64_t magic, uint32_t version, UUID pid) {
LOG(INFO) << "Handshake 1 received"; LOG(INFO) << "Handshake 1 received";
...@@ -208,12 +223,46 @@ Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) { ...@@ -208,12 +223,46 @@ Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) {
if (version != ftl::net::kVersion) LOG(WARNING) << "Net protocol using different versions!"; if (version != ftl::net::kVersion) LOG(WARNING) << "Net protocol using different versions!";
send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, ftl::net::this_peer); send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, ftl::net::this_peer);
_trigger(open_handlers_); // Ensure handlers called later or in new thread
pool.push([this](int id) {
universe_->_notifyConnect(this);
});
} }
}); });
bind("__disconnect__", [this]() {
_badClose(false);
LOG(INFO) << "Peer elected to disconnect: " << id().to_string();
});
bind("__ping__", [this](unsigned long long timestamp) {
return timestamp;
});
} }
} }
bool Peer::reconnect() {
if (status_ != kReconnecting || !can_reconnect_) return false;
URI uri(uri_);
LOG(INFO) << "Reconnecting to " << uri_ << " ...";
if (scheme_ == URI::SCHEME_TCP) {
sock_ = tcpConnect(uri);
if (sock_ != INVALID_SOCKET) {
status_ = kConnecting;
is_waiting_ = true;
return true;
} else {
return false;
}
}
// TODO(Nick) allow for other protocols in reconnect
return false;
}
void Peer::_updateURI() { void Peer::_updateURI() {
sockaddr_storage addr; sockaddr_storage addr;
int rsize = sizeof(sockaddr_storage); int rsize = sizeof(sockaddr_storage);
...@@ -265,10 +314,12 @@ void Peer::_badClose(bool retry) { ...@@ -265,10 +314,12 @@ void Peer::_badClose(bool retry) {
//auto i = find(sockets.begin(),sockets.end(),this); //auto i = find(sockets.begin(),sockets.end(),this);
//sockets.erase(i); //sockets.erase(i);
_trigger(close_handlers_); universe_->_notifyDisconnect(this);
// Attempt auto reconnect? // Attempt auto reconnect?
if (retry) LOG(INFO) << "Should attempt reconnect..."; if (retry && can_reconnect_) {
status_ = kReconnecting;
}
} }
} }
...@@ -324,7 +375,7 @@ bool Peer::_data() { ...@@ -324,7 +375,7 @@ bool Peer::_data() {
if (get<1>(hs) != "__handshake__") { if (get<1>(hs) != "__handshake__") {
_badClose(false); _badClose(false);
LOG(ERROR) << "Missing handshake"; LOG(ERROR) << "Missing handshake - got '" << get<1>(hs) << "'";
return false; return false;
} }
} catch(...) { } catch(...) {
...@@ -368,24 +419,28 @@ void Peer::_sendResponse(uint32_t id, const msgpack::object &res) { ...@@ -368,24 +419,28 @@ void Peer::_sendResponse(uint32_t id, const msgpack::object &res) {
bool Peer::waitConnection() { bool Peer::waitConnection() {
if (status_ == kConnected) return true; if (status_ == kConnected) return true;
std::unique_lock<std::mutex> lk(send_mtx_); std::mutex m;
std::unique_lock<std::mutex> lk(m);
std::condition_variable cv; std::condition_variable cv;
onConnect([&](Peer &p) { callback_t h = universe_->onConnect([this,&cv](Peer *p) {
cv.notify_all(); if (p == this) {
cv.notify_one();
}
}); });
cv.wait_for(lk, seconds(5)); cv.wait_for(lk, seconds(5));
universe_->removeCallback(h);
return status_ == kConnected; return status_ == kConnected;
} }
void Peer::onConnect(const std::function<void(Peer&)> &f) { /*void Peer::onConnect(const std::function<void(Peer&)> &f) {
if (status_ == kConnected) { if (status_ == kConnected) {
f(*this); f(*this);
} else { } else {
open_handlers_.push_back(f); open_handlers_.push_back(f);
} }
} }*/
void Peer::_connected() { void Peer::_connected() {
status_ = kConnected; status_ = kConnected;
......
...@@ -20,8 +20,13 @@ using nlohmann::json; ...@@ -20,8 +20,13 @@ using nlohmann::json;
using ftl::UUID; using ftl::UUID;
using std::optional; using std::optional;
using std::unique_lock; using std::unique_lock;
using std::shared_lock;
using std::mutex; using std::mutex;
using std::shared_mutex;
using ftl::config::json_t; using ftl::config::json_t;
using ftl::net::callback_t;
callback_t ftl::net::Universe::cbid__ = 0;
Universe::Universe() : Configurable(), active_(true), this_peer(ftl::net::this_peer), thread_(Universe::__start, this) { Universe::Universe() : Configurable(), active_(true), this_peer(ftl::net::this_peer), thread_(Universe::__start, this) {
_installBindings(); _installBindings();
...@@ -30,6 +35,14 @@ Universe::Universe() : Configurable(), active_(true), this_peer(ftl::net::this_p ...@@ -30,6 +35,14 @@ Universe::Universe() : Configurable(), active_(true), this_peer(ftl::net::this_p
Universe::Universe(nlohmann::json &config) : Universe::Universe(nlohmann::json &config) :
Configurable(config), active_(true), this_peer(ftl::net::this_peer), thread_(Universe::__start, this) { Configurable(config), active_(true), this_peer(ftl::net::this_peer), thread_(Universe::__start, this) {
_installBindings();
}
Universe::~Universe() {
shutdown();
}
void Universe::start() {
auto l = get<json_t>("listen"); auto l = get<json_t>("listen");
if (l && (*l).is_array()) { if (l && (*l).is_array()) {
...@@ -46,12 +59,6 @@ Universe::Universe(nlohmann::json &config) : ...@@ -46,12 +59,6 @@ Universe::Universe(nlohmann::json &config) :
connect(pp); connect(pp);
} }
} }
_installBindings();
}
Universe::~Universe() {
shutdown();
} }
void Universe::shutdown() { void Universe::shutdown() {
...@@ -77,32 +84,26 @@ void Universe::shutdown() { ...@@ -77,32 +84,26 @@ void Universe::shutdown() {
bool Universe::listen(const string &addr) { bool Universe::listen(const string &addr) {
auto l = new Listener(addr.c_str()); auto l = new Listener(addr.c_str());
if (!l) return false; if (!l) return false;
unique_lock<mutex> lk(net_mutex_); unique_lock<shared_mutex> lk(net_mutex_);
listeners_.push_back(l); listeners_.push_back(l);
return l->isListening(); return l->isListening();
} }
Peer *Universe::connect(const string &addr) { Peer *Universe::connect(const string &addr) {
auto p = new Peer(addr.c_str(), &disp_); auto p = new Peer(addr.c_str(), this, &disp_);
if (!p) return nullptr; if (!p) return nullptr;
if (p->status() != Peer::kInvalid) { if (p->status() != Peer::kInvalid) {
unique_lock<mutex> lk(net_mutex_); unique_lock<shared_mutex> lk(net_mutex_);
peers_.push_back(p); peers_.push_back(p);
} }
_installBindings(p); _installBindings(p);
p->onConnect([this](Peer &p) {
peer_ids_[p.id()] = &p;
_notifyConnect(&p);
});
return p; return p;
} }
void Universe::unbind(const std::string &name) { void Universe::unbind(const std::string &name) {
unique_lock<mutex> lk(net_mutex_); unique_lock<shared_mutex> lk(net_mutex_);
disp_.unbind(name); disp_.unbind(name);
} }
...@@ -121,7 +122,7 @@ int Universe::_setDescriptors() { ...@@ -121,7 +122,7 @@ int Universe::_setDescriptors() {
SOCKET n = 0; SOCKET n = 0;
unique_lock<mutex> lk(net_mutex_); unique_lock<shared_mutex> lk(net_mutex_);
//Set file descriptor for the listening sockets. //Set file descriptor for the listening sockets.
for (auto l : listeners_) { for (auto l : listeners_) {
...@@ -156,18 +157,17 @@ void Universe::_installBindings(Peer *p) { ...@@ -156,18 +157,17 @@ void Universe::_installBindings(Peer *p) {
} }
void Universe::_installBindings() { void Universe::_installBindings() {
bind("__subscribe__", [this](const UUID &id, const string &uri) -> bool { /*bind("__subscribe__", [this](const UUID &id, const string &uri) -> bool {
LOG(INFO) << "Subscription to " << uri << " by " << id.to_string(); LOG(INFO) << "Subscription to " << uri << " by " << id.to_string();
unique_lock<mutex> lk(net_mutex_); unique_lock<shared_mutex> lk(net_mutex_);
subscribers_[ftl::URI(uri).to_string()].push_back(id); subscribers_[ftl::URI(uri).to_string()].push_back(id);
return true; return true;
}); });
bind("__owner__", [this](const std::string &res) -> optional<UUID> { bind("__owner__", [this](const std::string &res) -> optional<UUID> {
LOG(INFO) << "SOMEONE ASKS FOR " << res;
if (owned_.count(res) > 0) return this_peer; if (owned_.count(res) > 0) return this_peer;
else return {}; else return {};
}); });*/
} }
// Note: should be called inside a net lock // Note: should be called inside a net lock
...@@ -178,13 +178,18 @@ void Universe::_cleanupPeers() { ...@@ -178,13 +178,18 @@ void Universe::_cleanupPeers() {
if (!(*i)->isValid()) { if (!(*i)->isValid()) {
Peer *p = *i; Peer *p = *i;
LOG(INFO) << "Removing disconnected peer: " << p->id().to_string(); LOG(INFO) << "Removing disconnected peer: " << p->id().to_string();
_notifyDisconnect(p); //_notifyDisconnect(p);
auto ix = peer_ids_.find(p->id()); auto ix = peer_ids_.find(p->id());
if (ix != peer_ids_.end()) peer_ids_.erase(ix); if (ix != peer_ids_.end()) peer_ids_.erase(ix);
delete p;
i = peers_.erase(i); i = peers_.erase(i);
if (p->status() == ftl::net::Peer::kReconnecting) {
reconnects_.push_back({50, 1.0f, p});
} else {
delete p;
}
} else { } else {
i++; i++;
} }
...@@ -197,7 +202,7 @@ Peer *Universe::getPeer(const UUID &id) const { ...@@ -197,7 +202,7 @@ Peer *Universe::getPeer(const UUID &id) const {
else return ix->second; else return ix->second;
} }
optional<UUID> Universe::findOwner(const string &res) { /*optional<UUID> Universe::findOwner(const string &res) {
// TODO(nick) cache this information // TODO(nick) cache this information
return findOne<UUID>("__owner__", res); return findOne<UUID>("__owner__", res);
} }
...@@ -238,6 +243,24 @@ bool Universe::_subscribe(const std::string &res) { ...@@ -238,6 +243,24 @@ bool Universe::_subscribe(const std::string &res) {
LOG(WARNING) << "Subscribe to unknown resource: " << res; LOG(WARNING) << "Subscribe to unknown resource: " << res;
return false; return false;
} }
}*/
void Universe::_periodic() {
auto i = reconnects_.begin();
while (i != reconnects_.end()) {
if ((*i).peer->reconnect()) {
unique_lock<shared_mutex> lk(net_mutex_);
peers_.push_back((*i).peer);
i = reconnects_.erase(i);
} else if ((*i).tries > 0) {
(*i).tries--;
i++;
} else {
delete (*i).peer;
i = reconnects_.erase(i);
LOG(WARNING) << "Reconnection to peer failed";
}
}
} }
void Universe::__start(Universe * u) { void Universe::__start(Universe * u) {
...@@ -259,10 +282,20 @@ void Universe::_run() { ...@@ -259,10 +282,20 @@ void Universe::_run() {
} }
#endif #endif
auto start = std::chrono::high_resolution_clock::now();
while (active_) { while (active_) {
int n = _setDescriptors(); int n = _setDescriptors();
int selres = 1; int selres = 1;
// Do periodics
auto now = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed = now - start;
if (elapsed.count() >= 1.0) {
start = now;
_periodic();
}
// It is an error to use "select" with no sockets ... so just sleep // It is an error to use "select" with no sockets ... so just sleep
if (n == 0) { if (n == 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(300)); std::this_thread::sleep_for(std::chrono::milliseconds(300));
...@@ -287,7 +320,7 @@ void Universe::_run() { ...@@ -287,7 +320,7 @@ void Universe::_run() {
continue; continue;
} }
unique_lock<mutex> lk(net_mutex_); unique_lock<shared_mutex> lk(net_mutex_);
//If connection request is waiting //If connection request is waiting
for (auto l : listeners_) { for (auto l : listeners_) {
...@@ -300,20 +333,16 @@ void Universe::_run() { ...@@ -300,20 +333,16 @@ void Universe::_run() {
SOCKET csock = accept(l->_socket(), (sockaddr*)&addr, (socklen_t*)&rsize); SOCKET csock = accept(l->_socket(), (sockaddr*)&addr, (socklen_t*)&rsize);
if (csock != INVALID_SOCKET) { if (csock != INVALID_SOCKET) {
auto p = new Peer(csock, &disp_); auto p = new Peer(csock, this, &disp_);
peers_.push_back(p); peers_.push_back(p);
_installBindings(p); _installBindings(p);
p->onConnect([this](Peer &p) {
peer_ids_[p.id()] = &p;
// Note, called in another thread so above lock
// does not apply.
_notifyConnect(&p);
});
} }
} }
} }
} }
// TODO(Nick) Might switch to shared lock here?
//Also check each clients socket to see if any messages or errors are waiting //Also check each clients socket to see if any messages or errors are waiting
for (auto s : peers_) { for (auto s : peers_) {
if (s != NULL && s->isValid()) { if (s != NULL && s->isValid()) {
...@@ -332,27 +361,33 @@ void Universe::_run() { ...@@ -332,27 +361,33 @@ void Universe::_run() {
} }
} }
void Universe::onConnect(const std::string &name, std::function<void(ftl::net::Peer*)> cb) { callback_t Universe::onConnect(const std::function<void(ftl::net::Peer*)> &cb) {
unique_lock<mutex> lk(net_mutex_); unique_lock<shared_mutex> lk(handler_mutex_);
on_connect_.push_back({name, cb}); callback_t id = cbid__++;
on_connect_.push_back({id, cb});
return id;
} }
void Universe::onDisconnect(const std::string &name, std::function<void(ftl::net::Peer*)> cb) { callback_t Universe::onDisconnect(const std::function<void(ftl::net::Peer*)> &cb) {
unique_lock<mutex> lk(net_mutex_); unique_lock<shared_mutex> lk(handler_mutex_);
on_disconnect_.push_back({name, cb}); callback_t id = cbid__++;
on_disconnect_.push_back({id, cb});
return id;
} }
void Universe::onError(const std::string &name, std::function<void(ftl::net::Peer*, const ftl::net::Error &)> cb) { callback_t Universe::onError(const std::function<void(ftl::net::Peer*, const ftl::net::Error &)> &cb) {
unique_lock<mutex> lk(net_mutex_); unique_lock<shared_mutex> lk(handler_mutex_);
on_error_.push_back({name, cb}); callback_t id = cbid__++;
on_error_.push_back({id, cb});
return id;
} }
void Universe::removeCallbacks(const std::string &name) { void Universe::removeCallback(callback_t cbid) {
unique_lock<mutex> lk(net_mutex_); unique_lock<shared_mutex> lk(handler_mutex_);
{ {
auto i = on_connect_.begin(); auto i = on_connect_.begin();
while (i != on_connect_.end()) { while (i != on_connect_.end()) {
if ((*i).name == name) { if ((*i).id == cbid) {
i = on_connect_.erase(i); i = on_connect_.erase(i);
} else { } else {
i++; i++;
...@@ -363,7 +398,7 @@ void Universe::removeCallbacks(const std::string &name) { ...@@ -363,7 +398,7 @@ void Universe::removeCallbacks(const std::string &name) {
{ {
auto i = on_disconnect_.begin(); auto i = on_disconnect_.begin();
while (i != on_disconnect_.end()) { while (i != on_disconnect_.end()) {
if ((*i).name == name) { if ((*i).id == cbid) {
i = on_disconnect_.erase(i); i = on_disconnect_.erase(i);
} else { } else {
i++; i++;
...@@ -374,7 +409,7 @@ void Universe::removeCallbacks(const std::string &name) { ...@@ -374,7 +409,7 @@ void Universe::removeCallbacks(const std::string &name) {
{ {
auto i = on_error_.begin(); auto i = on_error_.begin();
while (i != on_error_.end()) { while (i != on_error_.end()) {
if ((*i).name == name) { if ((*i).id == cbid) {
i = on_error_.erase(i); i = on_error_.erase(i);
} else { } else {
i++; i++;
...@@ -384,12 +419,14 @@ void Universe::removeCallbacks(const std::string &name) { ...@@ -384,12 +419,14 @@ void Universe::removeCallbacks(const std::string &name) {
} }
void Universe::_notifyConnect(Peer *p) { void Universe::_notifyConnect(Peer *p) {
unique_lock<mutex> lk(net_mutex_); shared_lock<shared_mutex> lk(handler_mutex_);
peer_ids_[p->id()] = p;
for (auto &i : on_connect_) { for (auto &i : on_connect_) {
try { try {
i.h(p); i.h(p);
} catch(...) { } catch(...) {
LOG(ERROR) << "Exception inside OnConnect hander: " << i.name; LOG(ERROR) << "Exception inside OnConnect hander: " << i.id;
} }
} }
} }
...@@ -397,11 +434,12 @@ void Universe::_notifyConnect(Peer *p) { ...@@ -397,11 +434,12 @@ void Universe::_notifyConnect(Peer *p) {
void Universe::_notifyDisconnect(Peer *p) { void Universe::_notifyDisconnect(Peer *p) {
// In all cases, should already be locked outside this function call // In all cases, should already be locked outside this function call
//unique_lock<mutex> lk(net_mutex_); //unique_lock<mutex> lk(net_mutex_);
shared_lock<shared_mutex> lk(handler_mutex_);
for (auto &i : on_disconnect_) { for (auto &i : on_disconnect_) {
try { try {
i.h(p); i.h(p);
} catch(...) { } catch(...) {
LOG(ERROR) << "Exception inside OnDisconnect hander: " << i.name; LOG(ERROR) << "Exception inside OnDisconnect hander: " << i.id;
} }
} }
} }
......
...@@ -18,7 +18,6 @@ TEST_CASE("Universe::connect()", "[net]") { ...@@ -18,7 +18,6 @@ TEST_CASE("Universe::connect()", "[net]") {
Universe b; Universe b;
a.listen("tcp://localhost:7077"); a.listen("tcp://localhost:7077");
//sleep_for(milliseconds(100));
SECTION("valid tcp connection using ipv4") { SECTION("valid tcp connection using ipv4") {
auto p = b.connect("tcp://127.0.0.1:7077"); auto p = b.connect("tcp://127.0.0.1:7077");
...@@ -86,7 +85,7 @@ TEST_CASE("Universe::onConnect()", "[net]") { ...@@ -86,7 +85,7 @@ TEST_CASE("Universe::onConnect()", "[net]") {
SECTION("single valid remote init connection") { SECTION("single valid remote init connection") {
bool done = false; bool done = false;
a.onConnect("test", [&done](Peer *p) { a.onConnect([&done](Peer *p) {
done = true; done = true;
}); });
...@@ -98,7 +97,7 @@ TEST_CASE("Universe::onConnect()", "[net]") { ...@@ -98,7 +97,7 @@ TEST_CASE("Universe::onConnect()", "[net]") {
SECTION("single valid init connection") { SECTION("single valid init connection") {
bool done = false; bool done = false;
b.onConnect("test", [&done](Peer *p) { b.onConnect([&done](Peer *p) {
done = true; done = true;
}); });
...@@ -117,7 +116,7 @@ TEST_CASE("Universe::onDisconnect()", "[net]") { ...@@ -117,7 +116,7 @@ TEST_CASE("Universe::onDisconnect()", "[net]") {
SECTION("single valid remote close") { SECTION("single valid remote close") {
bool done = false; bool done = false;
a.onDisconnect("test", [&done](Peer *p) { a.onDisconnect([&done](Peer *p) {
done = true; done = true;
}); });
...@@ -132,7 +131,7 @@ TEST_CASE("Universe::onDisconnect()", "[net]") { ...@@ -132,7 +131,7 @@ TEST_CASE("Universe::onDisconnect()", "[net]") {
SECTION("single valid close") { SECTION("single valid close") {
bool done = false; bool done = false;
b.onDisconnect("test", [&done](Peer *p) { b.onDisconnect([&done](Peer *p) {
done = true; done = true;
}); });
...@@ -209,6 +208,9 @@ TEST_CASE("Universe::broadcast()", "[net]") { ...@@ -209,6 +208,9 @@ TEST_CASE("Universe::broadcast()", "[net]") {
done2 = v; done2 = v;
}); });
REQUIRE( a.numberOfPeers() == 2 );
sleep_for(milliseconds(100)); // NOTE: Binding might not be ready
a.broadcast("hello", 676); a.broadcast("hello", 676);
sleep_for(milliseconds(100)); sleep_for(milliseconds(100));
...@@ -222,6 +224,7 @@ TEST_CASE("Universe::findAll()", "") { ...@@ -222,6 +224,7 @@ TEST_CASE("Universe::findAll()", "") {
Universe a; Universe a;
Universe b; Universe b;
Universe c; Universe c;
a.listen("tcp://localhost:7077"); a.listen("tcp://localhost:7077");
b.connect("tcp://localhost:7077")->waitConnection(); b.connect("tcp://localhost:7077")->waitConnection();
c.connect("tcp://localhost:7077")->waitConnection(); c.connect("tcp://localhost:7077")->waitConnection();
...@@ -248,107 +251,10 @@ TEST_CASE("Universe::findAll()", "") { ...@@ -248,107 +251,10 @@ TEST_CASE("Universe::findAll()", "") {
return {6,7,8}; return {6,7,8};
}); });
sleep_for(milliseconds(100)); // NOTE: Binding might not be ready
auto res = a.findAll<int>("test_all"); auto res = a.findAll<int>("test_all");
REQUIRE( (res.size() == 6) ); REQUIRE( (res.size() == 6) );
REQUIRE( (res[0] == 3 || res[0] == 6) ); REQUIRE( (res[0] == 3 || res[0] == 6) );
} }
} }
TEST_CASE("Universe::findOwner()", "") {
Universe a;
Universe b;
a.listen("tcp://localhost:7077");
b.connect("tcp://localhost:7077")->waitConnection();
SECTION("no owners exist") {
REQUIRE( !b.findOwner("ftl://test") );
}
SECTION("one owner exists") {
a.createResource("ftl://test");
REQUIRE( *(b.findOwner("ftl://test")) == ftl::net::this_peer );
}
SECTION("three peers and one owner") {
Universe c;
c.connect("tcp://localhost:7077")->waitConnection();
b.setLocalID(ftl::UUID(7));
b.createResource("ftl://test");
REQUIRE( *(a.findOwner("ftl://test")) == ftl::UUID(7) );
}
SECTION("three peers and one owner (2)") {
Universe c;
c.connect("tcp://localhost:7077")->waitConnection();
c.setLocalID(ftl::UUID(7));
c.createResource("ftl://test");
auto r = a.findOwner("ftl://test");
REQUIRE( r );
REQUIRE( *r == ftl::UUID(7) );
}
}
TEST_CASE("Universe::subscribe()", "") {
Universe a;
Universe b;
a.listen("tcp://localhost:7077");
b.connect("tcp://localhost:7077")->waitConnection();
SECTION("no resource exists") {
REQUIRE( !b.subscribe("ftl://test", []() {}) );
}
SECTION("one resource exists") {
a.createResource("ftl://test");
REQUIRE( b.subscribe("ftl://test", []() {}) );
sleep_for(milliseconds(50));
REQUIRE( a.numberOfSubscribers("ftl://test") == 1);
}
}
TEST_CASE("Universe::publish()", "") {
Universe a;
Universe b;
a.listen("tcp://localhost:7077");
ftl::net::Peer *p = b.connect("tcp://localhost:7077");
p->waitConnection();
SECTION("no subscribers") {
a.createResource("ftl://test");
a.publish("ftl://test", 55);
}
SECTION("one subscriber") {
int done = 0;
a.createResource("ftl://test");
REQUIRE( b.subscribe("ftl://test", [&done](int a) {
done = a;
}) );
sleep_for(milliseconds(50));
a.publish("ftl://test", 56);
sleep_for(milliseconds(50));
REQUIRE( done == 56 );
}
SECTION("publish to disconnected subscriber") {
int done = 0;
a.createResource("ftl://test2");
REQUIRE( b.subscribe("ftl://test2", [&done](int a) {
done = a;
}) );
sleep_for(milliseconds(50));
p->close();
sleep_for(milliseconds(100));
a.publish("ftl://test2", 56);
sleep_for(milliseconds(50));
REQUIRE( done == 0 );
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment