diff --git a/CMakeLists.txt b/CMakeLists.txt index e448ccf53aa0d14c8d012b33a2272202ba16d77e..efb6f222f6593725042c08e3d74b4a00a611ca8f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -134,7 +134,7 @@ check_include_file_cxx("opencv2/cudastereo.hpp" HAVE_OPENCVCUDA) find_program(CPPCHECK_FOUND cppcheck) if (CPPCHECK_FOUND) message(STATUS "Found cppcheck: will perform source checks") - set(CMAKE_CXX_CPPCHECK "cppcheck" "--enable=warning,performance,portability,style" "--inline-suppr" "--std=c++11" "--suppress=*:*catch.hpp" "--suppress=*:*elas*" "--suppress=*:*json.hpp" "--quiet") + set(CMAKE_CXX_CPPCHECK "cppcheck" "--enable=warning,performance,portability,style" "--inline-suppr" "--std=c++11" "--suppress=*:*catch.hpp" "--suppress=*:*elas*" "--suppress=*:*nanogui*" "--suppress=*:*json.hpp" "--quiet") endif() # include_directories(${PROJECT_SOURCE_DIR}/common/cpp/include) diff --git a/applications/gui/src/main.cpp b/applications/gui/src/main.cpp index b5f911b15c014199aa3ae9fa5a50ce31a0b66e72..184e38c889e823a748dea5d041bd2e8ccf195dfc 100644 --- a/applications/gui/src/main.cpp +++ b/applications/gui/src/main.cpp @@ -59,7 +59,8 @@ int main(int argc, char **argv) { auto root = ftl::configure(argc, argv, "gui_default"); ftl::net::Universe *net = ftl::create<ftl::net::Universe>(root, "net"); - net->waitConnections(); + net->start(); + //net->waitConnections(); ftl::ctrl::Master controller(root, net); controller.onLog([](const ftl::ctrl::LogEvent &e){ diff --git a/applications/gui/src/src_window.cpp b/applications/gui/src/src_window.cpp index f3e84b511868907d6fc65ac14b040bacc537458e..a4f30d997fb80e2839b73108881f4c7aa3e2aa51 100644 --- a/applications/gui/src/src_window.cpp +++ b/applications/gui/src/src_window.cpp @@ -166,13 +166,18 @@ SourceWindow::SourceWindow(nanogui::Widget *parent, ftl::ctrl::Master *ctrl) Alignment::Middle, 0, 6)); new Label(tools, "Select source","sans-bold"); - auto available = ctrl->getNet()->findAll<string>("list_streams"); - auto select = new ComboBox(tools, available); - select->setCallback([this,available](int ix) { + available_ = ctrl->getNet()->findAll<string>("list_streams"); + auto select = new ComboBox(tools, available_); + select->setCallback([this,select](int ix) { LOG(INFO) << "Change source: " << ix; - src_->set("uri", available[ix]); + src_->set("uri", available_[ix]); }); + ctrl->getNet()->onConnect([this,select](ftl::net::Peer *p) { + available_ = ctrl_->getNet()->findAll<string>("list_streams"); + select->setItems(available_); + }); + auto depth = new Button(tools, "Depth"); depth->setFlags(Button::ToggleButton); depth->setChangeCallback([this](bool state) { diff --git a/applications/gui/src/src_window.hpp b/applications/gui/src/src_window.hpp index 347b1706949603dc523635b6fca737c4fc94bc63..bba8f9bfa9b01f3a3cbb856ce0c8e244e1d7d1d7 100644 --- a/applications/gui/src/src_window.hpp +++ b/applications/gui/src/src_window.hpp @@ -5,6 +5,8 @@ #include <ftl/master.hpp> #include <ftl/uuid.hpp> #include <ftl/net_source.hpp> +#include <vector> +#include <string> class VirtualCameraView; @@ -23,6 +25,7 @@ class SourceWindow : public nanogui::Window { ftl::ctrl::Master *ctrl_; ftl::rgbd::NetSource *src_; VirtualCameraView *image_; + std::vector<std::string> available_; }; diff --git a/applications/reconstruct/include/ftl/matrix_conversion.hpp b/applications/reconstruct/include/ftl/matrix_conversion.hpp index 48e27de718aa0fd538a192a239c1fdc0831f008d..ac0bff14da8ad6be553943db2c04f6b9ef22844f 100644 --- a/applications/reconstruct/include/ftl/matrix_conversion.hpp +++ b/applications/reconstruct/include/ftl/matrix_conversion.hpp @@ -30,7 +30,7 @@ namespace MatrixConversion return Eigen::Matrix4f(mat.ptr()).transpose(); }*/ - static Eigen::Vector4f VecH(const Eigen::Vector3f& v) + /*static Eigen::Vector4f VecH(const Eigen::Vector3f& v) { return Eigen::Vector4f(v[0], v[1], v[2], 1.0); } @@ -38,7 +38,7 @@ namespace MatrixConversion static Eigen::Vector3f VecDH(const Eigen::Vector4f& v) { return Eigen::Vector3f(v[0]/v[3], v[1]/v[3], v[2]/v[3]); - } + }*/ /*static Eigen::Vector3f VecToEig(const vec3f& v) { diff --git a/applications/reconstruct/include/ftl/registration.hpp b/applications/reconstruct/include/ftl/registration.hpp index d75d96efae5b168dd5a4065acc54d19d8d87f1ed..e5375dfe096d51b216900bb1c658076573786fc5 100644 --- a/applications/reconstruct/include/ftl/registration.hpp +++ b/applications/reconstruct/include/ftl/registration.hpp @@ -44,7 +44,7 @@ bool findChessboardCorners(cv::Mat &rgb, const cv::Mat &depth, const ftl::rgbd:: */ class Registration : public ftl::Configurable { public: - Registration(nlohmann::json &config); + explicit Registration(nlohmann::json &config); void addSource(ftl::rgbd::RGBDSource* source); size_t getSourcesCount() { return sources_.size(); } @@ -161,7 +161,7 @@ private: */ class ChessboardRegistration : public Registration { public: - ChessboardRegistration(nlohmann::json &config); + explicit ChessboardRegistration(nlohmann::json &config); /** * @brief Creates new ChessboardRegistration or ChessboardRegistrationChain * object depending on chain option in config. User of the method @@ -189,7 +189,7 @@ protected: */ class ChessboardRegistrationChain : public ChessboardRegistration { public: - ChessboardRegistrationChain(nlohmann::json &config); + explicit ChessboardRegistrationChain(nlohmann::json &config); bool findTransformations(std::vector<Eigen::Matrix4f> &data) override; diff --git a/applications/reconstruct/include/ftl/scene_rep_hash_sdf.hpp b/applications/reconstruct/include/ftl/scene_rep_hash_sdf.hpp index a9f32b4c3489ab8c91fc0f69dc02c8a87d8eb6fa..5da37d479f29aefc84e46c73432187579a56d31c 100644 --- a/applications/reconstruct/include/ftl/scene_rep_hash_sdf.hpp +++ b/applications/reconstruct/include/ftl/scene_rep_hash_sdf.hpp @@ -195,7 +195,7 @@ class SceneRep : public ftl::Configurable { std::unordered_set<unsigned int> pointersFreeHash; - std::vector<unsigned int> pointersFreeVec(m_hashParams.m_numSDFBlocks, 0); + std::vector<int> pointersFreeVec(m_hashParams.m_numSDFBlocks, 0); // CHECK Nick Changed to int from unsigned in for (unsigned int i = 0; i < heapCounterCPU; i++) { pointersFreeHash.insert(heapCPU[i]); pointersFreeVec[heapCPU[i]] = FREE_ENTRY; @@ -207,7 +207,7 @@ class SceneRep : public ftl::Configurable { unsigned int numOccupied = 0; unsigned int numMinusOne = 0; - unsigned int listOverallFound = 0; + //unsigned int listOverallFound = 0; std::list<myint3Voxel> l; //std::vector<myint3Voxel> v; diff --git a/applications/reconstruct/src/main.cpp b/applications/reconstruct/src/main.cpp index 88c8c23eea98972033ffd8e3dfdf5ff337b58eba..f3008095733292e7e0626753adc2abf101a015f1 100644 --- a/applications/reconstruct/src/main.cpp +++ b/applications/reconstruct/src/main.cpp @@ -75,9 +75,7 @@ struct Cameras { static void run(ftl::Configurable *root) { Universe *net = ftl::create<Universe>(root, "net"); - // Make sure connections are complete - // sleep_for(milliseconds(500)); - // TODO: possible to do more reliably? + net->start(); net->waitConnections(); std::vector<Cameras> inputs; diff --git a/applications/reconstruct/src/virtual_source.cpp b/applications/reconstruct/src/virtual_source.cpp index 7e7317b68ee9e4e11545b27569e94303002a68c7..ecaec6d2f8b7ad521db1116942074a95c662a93d 100644 --- a/applications/reconstruct/src/virtual_source.cpp +++ b/applications/reconstruct/src/virtual_source.cpp @@ -58,5 +58,5 @@ void VirtualSource::grab() { } bool VirtualSource::isReady() { - + return true; } diff --git a/applications/vision/CMakeLists.txt b/applications/vision/CMakeLists.txt index 5ce6c5240cd9fc81de29eb688984dccb01c7a5be..684b195b5a2a8605e9bc3e04e629d21a35dcd545 100644 --- a/applications/vision/CMakeLists.txt +++ b/applications/vision/CMakeLists.txt @@ -6,7 +6,6 @@ set(CVNODESRC src/main.cpp - src/streamer.cpp src/middlebury.cpp ) diff --git a/applications/vision/src/main.cpp b/applications/vision/src/main.cpp index 4190a90d0cc5b6074e5dbf2e6bacb74597b3e06f..9a49c9c2516b4f1b901dcc985d86e7e56c28a2df 100644 --- a/applications/vision/src/main.cpp +++ b/applications/vision/src/main.cpp @@ -96,6 +96,7 @@ static void run(ftl::Configurable *root) { stream->add(source); stream->run(); + net->start(); LOG(INFO) << "Running..."; while (ftl::running && display->active()) { diff --git a/components/common/cpp/include/ftl/configuration.hpp b/components/common/cpp/include/ftl/configuration.hpp index 3938b29b772d3f660baa363d269826e8d8e4b0d5..fd21c63cd4f9aeecc9c4029974c6b5b025d6adc6 100644 --- a/components/common/cpp/include/ftl/configuration.hpp +++ b/components/common/cpp/include/ftl/configuration.hpp @@ -68,12 +68,25 @@ Configurable *find(const std::string &uri); */ void registerConfigurable(Configurable *cfg); +/** + * Create a new configurable directly from a raw object. This should not be used. + */ template <typename T, typename... ARGS> T *create(json_t &link, ARGS ...args); +/** + * Create a configurable from an attribute of a parent configurable. + */ template <typename T, typename... ARGS> T *create(ftl::Configurable *parent, const std::string &name, ARGS ...args); +/** + * Create a configurable rooted on a parent but with a specific object + * that is not directly a child of the parent. Used by RGB-D Factory. + */ +template <typename T, typename... ARGS> +T *create(ftl::Configurable *parent, json_t &obj, const std::string &name, ARGS ...args); + void set(const std::string &uri, const nlohmann::json &); } // namespace config @@ -138,12 +151,38 @@ T *ftl::config::create(ftl::Configurable *parent, const std::string &name, ARGS id_str = id_str + std::string("#") + name; } parent->getConfig()[name] = { + // cppcheck-suppress constStatement {"$id", id_str} }; nlohmann::json &entity2 = parent->getConfig()[name]; return create<T>(entity2, args...); } + + LOG(ERROR) << "Unable to create Configurable entity '" << name << "'"; + return nullptr; +} + +template <typename T, typename... ARGS> +T *ftl::config::create(ftl::Configurable *parent, json_t &obj, const std::string &name, ARGS ...args) { + //nlohmann::json &entity = ftl::config::resolve(parent->getConfig()[name]); + nlohmann::json &entity = obj; + + if (entity.is_object()) { + if (!entity["$id"].is_string()) { + std::string id_str = *parent->get<std::string>("$id"); + if (id_str.find('#') != std::string::npos) { + entity["$id"] = id_str + std::string("/") + name; + } else { + entity["$id"] = id_str + std::string("#") + name; + } + } + + return create<T>(entity, args...); + } else if (entity.is_null()) { + LOG(FATAL) << "Invalid raw object in Configurable construction"; + return nullptr; + } } #endif // _FTL_COMMON_CONFIGURATION_HPP_ diff --git a/components/common/cpp/src/opencv_to_pcl.cpp b/components/common/cpp/src/opencv_to_pcl.cpp index 7fb1eabe0e762677b46df8a91e029f7867e8dd1a..c56196b31b92ebf51bb6df54cd6029471dd9d5d7 100644 --- a/components/common/cpp/src/opencv_to_pcl.cpp +++ b/components/common/cpp/src/opencv_to_pcl.cpp @@ -24,6 +24,7 @@ pcl::PointCloud<pcl::PointXYZRGB>::Ptr ftl::utility::matToPointXYZ(const cv::Mat // when color needs to be added: uint32_t rgb = (static_cast<uint32_t>(prgb.z) << 16 | static_cast<uint32_t>(prgb.y) << 8 | static_cast<uint32_t>(prgb.x)); + // cppcheck-suppress invalidPointerCast point.rgb = *reinterpret_cast<float*>(&rgb); point_cloud_ptr -> points.push_back(point); diff --git a/components/common/cpp/test/configurable_unit.cpp b/components/common/cpp/test/configurable_unit.cpp index ba3557d253371d0b8ef11841706cb1cade03d5db..428208f7465a85421eb1c7864f384a9dcc3f7a98 100644 --- a/components/common/cpp/test/configurable_unit.cpp +++ b/components/common/cpp/test/configurable_unit.cpp @@ -8,6 +8,7 @@ using std::string; SCENARIO( "Configurable::get()" ) { GIVEN( "a non-existent property" ) { + // cppcheck-suppress constStatement nlohmann::json json = {{"test",5}}; Configurable cfg(json); @@ -16,6 +17,7 @@ SCENARIO( "Configurable::get()" ) { } GIVEN( "a valid property" ) { + // cppcheck-suppress constStatement nlohmann::json json = {{"test",5}}; Configurable cfg(json); @@ -27,6 +29,7 @@ SCENARIO( "Configurable::get()" ) { SCENARIO( "Configurable::on()" ) { GIVEN( "a changed property with no handlers" ) { + // cppcheck-suppress constStatement nlohmann::json json = {{"test",5}}; Configurable cfg(json); @@ -36,6 +39,7 @@ SCENARIO( "Configurable::on()" ) { } GIVEN( "a changed property one handler" ) { + // cppcheck-suppress constStatement nlohmann::json json = {{"test",5}}; Configurable cfg(json); bool trig = false; @@ -50,6 +54,7 @@ SCENARIO( "Configurable::on()" ) { } GIVEN( "a changed property two handlers" ) { + // cppcheck-suppress constStatement nlohmann::json json = {{"test",5}}; Configurable cfg(json); bool trig1 = false; diff --git a/components/net/cpp/include/ftl/net/peer.hpp b/components/net/cpp/include/ftl/net/peer.hpp index 3e51b5361eca58d52f4ca180e0813f5a3173ac44..b167cb05c4840104058555aea90451b36b20aa85 100644 --- a/components/net/cpp/include/ftl/net/peer.hpp +++ b/components/net/cpp/include/ftl/net/peer.hpp @@ -68,8 +68,8 @@ class Peer { }; public: - explicit Peer(const char *uri, ftl::net::Dispatcher *d=nullptr); - explicit Peer(SOCKET s, ftl::net::Dispatcher *d=nullptr); + explicit Peer(const char *uri, ftl::net::Universe *, ftl::net::Dispatcher *d=nullptr); + explicit Peer(SOCKET s, ftl::net::Universe *, ftl::net::Dispatcher *d=nullptr); ~Peer(); /** @@ -93,6 +93,11 @@ class Peer { * @return True if all connections were successful, false if timeout or error. */ bool waitConnection(); + + /** + * Make a reconnect attempt. Called internally by Universe object. + */ + bool reconnect(); /** * Test if the connection is valid. This returns true in all conditions @@ -176,8 +181,8 @@ class Peer { void bind(const std::string &name, F func); // void onError(std::function<void(Peer &, int err, const char *msg)> &f) {} - void onConnect(const std::function<void(Peer &)> &f); - void onDisconnect(std::function<void(Peer &)> &f) {} + //void onConnect(const std::function<void(Peer &)> &f); + //void onDisconnect(std::function<void(Peer &)> &f) {} bool isWaiting() const { return is_waiting_; } @@ -226,11 +231,13 @@ class Peer { } } - private: // Data - Status status_; - SOCKET sock_; - ftl::URI::scheme_t scheme_; - uint32_t version_; + private: + Status status_; // Connected, errored, reconnecting... + SOCKET sock_; // Raw OS socket + ftl::URI::scheme_t scheme_; // TCP / WS / UDP... + uint32_t version_; // Received protocol version in handshake + bool can_reconnect_; // Client connections can retry + ftl::net::Universe *universe_; // Origin net universe // Receive buffers bool is_waiting_; @@ -241,16 +248,16 @@ class Peer { msgpack::vrefbuffer send_buf_; std::mutex send_mtx_; - std::string uri_; - ftl::UUID peerid_; + std::string uri_; // Original connection URI, or assumed URI + ftl::UUID peerid_; // Received in handshake or allocated - ftl::net::Dispatcher *disp_; - std::vector<std::function<void(Peer &)>> open_handlers_; + ftl::net::Dispatcher *disp_; // For RPC call dispatch + //std::vector<std::function<void(Peer &)>> open_handlers_; //std::vector<std::function<void(const ftl::net::Error &)>> error_handlers_ - std::vector<std::function<void(Peer &)>> close_handlers_; + //std::vector<std::function<void(Peer &)>> close_handlers_; std::map<int, std::unique_ptr<virtual_caller>> callbacks_; - static int rpcid__; + static int rpcid__; // Return ID for RPC calls }; // --- Inline Template Implementations ----------------------------------------- diff --git a/components/net/cpp/include/ftl/net/universe.hpp b/components/net/cpp/include/ftl/net/universe.hpp index 5f8207c92715fdc85c892a8781395627f925d5e8..1b233519a4f7d17cba790c6cb8c7ec5fdcf87077 100644 --- a/components/net/cpp/include/ftl/net/universe.hpp +++ b/components/net/cpp/include/ftl/net/universe.hpp @@ -12,9 +12,10 @@ #include <ftl/uuid.hpp> #include <nlohmann/json.hpp> #include <vector> +#include <list> #include <string> #include <thread> -#include <mutex> +#include <shared_mutex> #include <map> namespace ftl { @@ -24,6 +25,14 @@ struct Error { int errno; }; +struct ReconnectInfo { + int tries; + float delay; + Peer *peer; +}; + +typedef unsigned int callback_t; + /** * Represents a group of network peers and their resources, managing the * searching of and sharing of resources across peers. Each universe can @@ -34,6 +43,9 @@ struct Error { * their actions. */ class Universe : public ftl::Configurable { + public: + friend class Peer; + public: Universe(); /** @@ -48,6 +60,8 @@ class Universe : public ftl::Configurable { * The destructor will terminate the network thread before completing. */ ~Universe(); + + void start(); /** * Open a new listening port on a given interfaces. @@ -82,10 +96,10 @@ class Universe : public ftl::Configurable { Peer *getPeer(const ftl::UUID &pid) const; - int numberOfSubscribers(const std::string &res) const; + //int numberOfSubscribers(const std::string &res) const; - bool hasSubscribers(const std::string &res) const; - bool hasSubscribers(const ftl::URI &res) const; + //bool hasSubscribers(const std::string &res) const; + //bool hasSubscribers(const ftl::URI &res) const; /** * Bind a function to an RPC or service call name. This will implicitely @@ -101,16 +115,18 @@ class Universe : public ftl::Configurable { * triggered whenever that resource is published to. It is akin to * RPC broadcast (no return value) to a subgroup of peers. */ - template <typename F> - bool subscribe(const std::string &res, F func); + //template <typename F> + //[[deprecated("Pub sub no longer to be used")]] + //bool subscribe(const std::string &res, F func); /** * Subscribe a function to a resource. The subscribed function is * triggered whenever that resource is published to. It is akin to * RPC broadcast (no return value) to a subgroup of peers. */ - template <typename F> - bool subscribe(const ftl::URI &res, F func); + //template <typename F> + //[[deprecated("Pub sub no longer to be used")]] + //bool subscribe(const ftl::URI &res, F func); /** * Send a non-blocking RPC call with no return value to all connected @@ -136,75 +152,78 @@ class Universe : public ftl::Configurable { * of a resource. There may be no subscribers. Note that query parameter * order in the URI string is not important. */ + //template <typename... ARGS> //[[deprecated("Pub sub no longer to be used")]] - template <typename... ARGS> - void publish(const std::string &res, ARGS... args); + //void publish(const std::string &res, ARGS... args); /** * Send a non-blocking RPC call with no return value to all subscribers * of a resource. There may be no subscribers. This overload accepts a * URI object directly to enable more efficient modification of parameters. */ + //template <typename... ARGS> //[[deprecated("Pub sub no longer to be used")]] - template <typename... ARGS> - void publish(const ftl::URI &res, ARGS... args); + //void publish(const ftl::URI &res, ARGS... args); /** * Register your ownership of a new resource. This must be called before * publishing to this resource and before any peers attempt to subscribe. */ //[[deprecated("Pub sub no longer to be used")]] - bool createResource(const std::string &uri); + //bool createResource(const std::string &uri); //[[deprecated("Pub sub no longer to be used")]] - std::optional<ftl::UUID> findOwner(const std::string &res); + //std::optional<ftl::UUID> findOwner(const std::string &res); void setLocalID(const ftl::UUID &u) { this_peer = u; }; const ftl::UUID &id() const { return this_peer; } // --- Event Handlers ------------------------------------------------------ - void onConnect(const std::string &, std::function<void(ftl::net::Peer*)>); - void onDisconnect(const std::string &, std::function<void(ftl::net::Peer*)>); - void onError(const std::string &, std::function<void(ftl::net::Peer*, const ftl::net::Error &)>); + ftl::net::callback_t onConnect(const std::function<void(ftl::net::Peer*)>&); + ftl::net::callback_t onDisconnect(const std::function<void(ftl::net::Peer*)>&); + ftl::net::callback_t onError(const std::function<void(ftl::net::Peer*, const ftl::net::Error &)>&); - void removeCallbacks(const std::string &); + void removeCallback(ftl::net::callback_t cbid); private: void _run(); int _setDescriptors(); void _installBindings(); void _installBindings(Peer *); - bool _subscribe(const std::string &res); + //bool _subscribe(const std::string &res); void _cleanupPeers(); void _notifyConnect(Peer *); void _notifyDisconnect(Peer *); void _notifyError(Peer *, const ftl::net::Error &); + void _periodic(); static void __start(Universe *u); private: bool active_; ftl::UUID this_peer; - std::mutex net_mutex_; + std::shared_mutex net_mutex_; + std::shared_mutex handler_mutex_; fd_set sfderror_; fd_set sfdread_; std::vector<ftl::net::Listener*> listeners_; std::vector<ftl::net::Peer*> peers_; - std::map<std::string, std::vector<ftl::UUID>> subscribers_; - std::unordered_set<std::string> owned_; + //std::map<std::string, std::vector<ftl::UUID>> subscribers_; + //std::unordered_set<std::string> owned_; std::map<ftl::UUID, ftl::net::Peer*> peer_ids_; ftl::UUID id_; ftl::net::Dispatcher disp_; std::thread thread_; + std::list<ReconnectInfo> reconnects_; struct ConnHandler { - std::string name; + callback_t id; std::function<void(ftl::net::Peer*)> h; }; struct ErrHandler { - std::string name; + callback_t id; std::function<void(ftl::net::Peer*, const ftl::net::Error &)> h; }; @@ -213,6 +232,8 @@ class Universe : public ftl::Configurable { std::list<ConnHandler> on_disconnect_; std::list<ErrHandler> on_error_; + static callback_t cbid__; + // std::map<std::string, std::vector<ftl::net::Peer*>> subscriptions_; }; @@ -220,13 +241,13 @@ class Universe : public ftl::Configurable { template <typename F> void Universe::bind(const std::string &name, F func) { - // CHECK Need mutex? + std::unique_lock<std::shared_mutex> lk(net_mutex_); disp_.bind(name, func, typename ftl::internal::func_kind_info<F>::result_kind(), typename ftl::internal::func_kind_info<F>::args_kind()); } -template <typename F> +/*template <typename F> bool Universe::subscribe(const std::string &res, F func) { return subscribe(ftl::URI(res), func); } @@ -235,12 +256,13 @@ template <typename F> bool Universe::subscribe(const ftl::URI &res, F func) { bind(res.to_string(), func); return _subscribe(res.to_string()); -} +}*/ template <typename... ARGS> void Universe::broadcast(const std::string &name, ARGS... args) { + std::shared_lock<std::shared_mutex> lk(net_mutex_); for (auto p : peers_) { - p->send(name, args...); + if (p->isConnected()) p->send(name, args...); } } @@ -262,15 +284,18 @@ std::optional<R> Universe::findOne(const std::string &name, ARGS... args) { }; std::map<Peer*, int> record; + std::shared_lock<std::shared_mutex> lk(net_mutex_); for (auto p : peers_) { - record[p] = p->asyncCall<std::optional<R>>(name, handler, args...); + if (p->isConnected()) record[p] = p->asyncCall<std::optional<R>>(name, handler, args...); } + lk.unlock(); { // Block thread until async callback notifies us - std::unique_lock<std::mutex> lk(m); - cv.wait_for(lk, std::chrono::seconds(1), [&hasreturned]{return hasreturned;}); + std::unique_lock<std::mutex> llk(m); + cv.wait_for(llk, std::chrono::seconds(1), [&hasreturned]{return hasreturned;}); // Cancel any further results + lk.lock(); for (auto p : peers_) { auto m = record.find(p); if (m != record.end()) { @@ -300,16 +325,20 @@ std::vector<R> Universe::findAll(const std::string &name, ARGS... args) { }; std::map<Peer*, int> record; + std::shared_lock<std::shared_mutex> lk(net_mutex_); for (auto p : peers_) { + if (!p->isConnected()) continue; sentcount++; record[p] = p->asyncCall<std::vector<R>>(name, handler, args...); } + lk.unlock(); { // Block thread until async callback notifies us - std::unique_lock<std::mutex> lk(m); - cv.wait_for(lk, std::chrono::seconds(1), [&returncount,&sentcount]{return returncount == sentcount;}); + std::unique_lock<std::mutex> llk(m); + cv.wait_for(llk, std::chrono::seconds(1), [&returncount,&sentcount]{return returncount == sentcount;}); // Cancel any further results + lk.lock(); for (auto p : peers_) { auto m = record.find(p); if (m != record.end()) { @@ -324,7 +353,7 @@ std::vector<R> Universe::findAll(const std::string &name, ARGS... args) { template <typename R, typename... ARGS> R Universe::call(const ftl::UUID &pid, const std::string &name, ARGS... args) { Peer *p = getPeer(pid); - if (p == nullptr) { + if (p == nullptr || !p->isConnected()) { DLOG(WARNING) << "Attempting to call an unknown peer : " << pid.to_string(); throw -1; } @@ -336,12 +365,12 @@ bool Universe::send(const ftl::UUID &pid, const std::string &name, ARGS... args) Peer *p = getPeer(pid); if (p == nullptr) { DLOG(WARNING) << "Attempting to call an unknown peer : " << pid.to_string(); - throw -1; + return false; } - return p->send(name, args...) > 0; + return p->isConnected() && p->send(name, args...) > 0; } -template <typename... ARGS> +/*template <typename... ARGS> void Universe::publish(const std::string &res, ARGS... args) { ftl::URI uri(res); publish(uri, args...); @@ -349,7 +378,7 @@ void Universe::publish(const std::string &res, ARGS... args) { template <typename... ARGS> void Universe::publish(const ftl::URI &res, ARGS... args) { - std::unique_lock<std::mutex> lk(net_mutex_); + std::unique_lock<std::shared_mutex> lk(net_mutex_); auto subs = subscribers_[res.getBaseURI()]; lk.unlock(); for (auto p : subs) { @@ -358,7 +387,7 @@ void Universe::publish(const ftl::URI &res, ARGS... args) { peer->send(res.getBaseURI(), args...); } } -} +}*/ }; // namespace net }; // namespace ftl diff --git a/components/net/cpp/src/dispatcher.cpp b/components/net/cpp/src/dispatcher.cpp index 719ae2b65fe9b0cdd682c95cf03f33ca688c1768..cd5cb51f39d25ca8edff30657029f56b149d1692 100644 --- a/components/net/cpp/src/dispatcher.cpp +++ b/components/net/cpp/src/dispatcher.cpp @@ -140,7 +140,7 @@ void ftl::net::Dispatcher::dispatch_notification(Peer &s, msgpack::object const throw &e; } } else { - LOG(ERROR) << "Missing handler for incoming message"; + LOG(ERROR) << "Missing handler for incoming message (" << name << ")"; } } diff --git a/components/net/cpp/src/peer.cpp b/components/net/cpp/src/peer.cpp index ef9c223d5179d6ea119c9ffcebae5f6cb20da94f..af5051129032100917b46528eb5740b7e171a255 100644 --- a/components/net/cpp/src/peer.cpp +++ b/components/net/cpp/src/peer.cpp @@ -23,6 +23,7 @@ #include <ftl/net/ws_internal.hpp> #include <ftl/config.h> #include "net_internal.hpp" +#include <ftl/net/universe.hpp> #include <iostream> #include <memory> @@ -37,6 +38,10 @@ using ftl::URI; using ftl::net::ws_connect; using ftl::net::Dispatcher; using std::chrono::seconds; +using ftl::net::Universe; +using ftl::net::callback_t; +using std::mutex; +using std::unique_lock; /*static std::string hexStr(const std::string &s) { @@ -127,7 +132,7 @@ static SOCKET tcpConnect(URI &uri) { return csocket; } -Peer::Peer(SOCKET s, Dispatcher *d) : sock_(s) { +Peer::Peer(SOCKET s, Universe *u, Dispatcher *d) : sock_(s), can_reconnect_(false), universe_(u) { status_ = (s == INVALID_SOCKET) ? kInvalid : kConnecting; _updateURI(); @@ -149,7 +154,10 @@ Peer::Peer(SOCKET s, Dispatcher *d) : sock_(s) { peerid_ = pid; if (version != ftl::net::kVersion) LOG(WARNING) << "Net protocol using different versions!"; - _trigger(open_handlers_); + // Ensure handlers called later or in new thread + pool.push([this](int id) { + universe_->_notifyConnect(this); + }); } }); @@ -158,11 +166,15 @@ Peer::Peer(SOCKET s, Dispatcher *d) : sock_(s) { LOG(INFO) << "Peer elected to disconnect: " << id().to_string(); }); + bind("__ping__", [this](unsigned long long timestamp) { + return timestamp; + }); + send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, ftl::net::this_peer); } } -Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) { +Peer::Peer(const char *pUri, Universe *u, Dispatcher *d) : can_reconnect_(true), universe_(u), uri_(pUri) { URI uri(pUri); status_ = kInvalid; @@ -170,6 +182,9 @@ Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) { disp_ = new Dispatcher(d); + // Must to to prevent receiving message before handlers are installed + unique_lock<mutex> lk(recv_mtx_); + scheme_ = uri.getProtocol(); if (uri.getProtocol() == URI::SCHEME_TCP) { sock_ = tcpConnect(uri); @@ -194,7 +209,7 @@ Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) { is_waiting_ = true; - if (status_ == kConnecting) { + if (status_ == kConnecting || status_ == kReconnecting) { // Install return handshake handler. bind("__handshake__", [this](uint64_t magic, uint32_t version, UUID pid) { LOG(INFO) << "Handshake 1 received"; @@ -208,10 +223,44 @@ Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) { if (version != ftl::net::kVersion) LOG(WARNING) << "Net protocol using different versions!"; send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, ftl::net::this_peer); - _trigger(open_handlers_); + // Ensure handlers called later or in new thread + pool.push([this](int id) { + universe_->_notifyConnect(this); + }); } }); + + bind("__disconnect__", [this]() { + _badClose(false); + LOG(INFO) << "Peer elected to disconnect: " << id().to_string(); + }); + + bind("__ping__", [this](unsigned long long timestamp) { + return timestamp; + }); + } +} + +bool Peer::reconnect() { + if (status_ != kReconnecting || !can_reconnect_) return false; + + URI uri(uri_); + + LOG(INFO) << "Reconnecting to " << uri_ << " ..."; + + if (scheme_ == URI::SCHEME_TCP) { + sock_ = tcpConnect(uri); + if (sock_ != INVALID_SOCKET) { + status_ = kConnecting; + is_waiting_ = true; + return true; + } else { + return false; + } } + + // TODO(Nick) allow for other protocols in reconnect + return false; } void Peer::_updateURI() { @@ -264,11 +313,13 @@ void Peer::_badClose(bool retry) { //auto i = find(sockets.begin(),sockets.end(),this); //sockets.erase(i); - - _trigger(close_handlers_); + + universe_->_notifyDisconnect(this); // Attempt auto reconnect? - if (retry) LOG(INFO) << "Should attempt reconnect..."; + if (retry && can_reconnect_) { + status_ = kReconnecting; + } } } @@ -324,7 +375,7 @@ bool Peer::_data() { if (get<1>(hs) != "__handshake__") { _badClose(false); - LOG(ERROR) << "Missing handshake"; + LOG(ERROR) << "Missing handshake - got '" << get<1>(hs) << "'"; return false; } } catch(...) { @@ -368,24 +419,28 @@ void Peer::_sendResponse(uint32_t id, const msgpack::object &res) { bool Peer::waitConnection() { if (status_ == kConnected) return true; - std::unique_lock<std::mutex> lk(send_mtx_); + std::mutex m; + std::unique_lock<std::mutex> lk(m); std::condition_variable cv; - onConnect([&](Peer &p) { - cv.notify_all(); + callback_t h = universe_->onConnect([this,&cv](Peer *p) { + if (p == this) { + cv.notify_one(); + } }); cv.wait_for(lk, seconds(5)); + universe_->removeCallback(h); return status_ == kConnected; } -void Peer::onConnect(const std::function<void(Peer&)> &f) { +/*void Peer::onConnect(const std::function<void(Peer&)> &f) { if (status_ == kConnected) { f(*this); } else { open_handlers_.push_back(f); } -} +}*/ void Peer::_connected() { status_ = kConnected; diff --git a/components/net/cpp/src/universe.cpp b/components/net/cpp/src/universe.cpp index 53600416b27f419a5a1bacd221ddfc947488de9e..b0a72d1f395ff5f222862f3164e642935f96fad8 100644 --- a/components/net/cpp/src/universe.cpp +++ b/components/net/cpp/src/universe.cpp @@ -20,8 +20,13 @@ using nlohmann::json; using ftl::UUID; using std::optional; using std::unique_lock; +using std::shared_lock; using std::mutex; +using std::shared_mutex; using ftl::config::json_t; +using ftl::net::callback_t; + +callback_t ftl::net::Universe::cbid__ = 0; Universe::Universe() : Configurable(), active_(true), this_peer(ftl::net::this_peer), thread_(Universe::__start, this) { _installBindings(); @@ -30,6 +35,14 @@ Universe::Universe() : Configurable(), active_(true), this_peer(ftl::net::this_p Universe::Universe(nlohmann::json &config) : Configurable(config), active_(true), this_peer(ftl::net::this_peer), thread_(Universe::__start, this) { + _installBindings(); +} + +Universe::~Universe() { + shutdown(); +} + +void Universe::start() { auto l = get<json_t>("listen"); if (l && (*l).is_array()) { @@ -46,12 +59,6 @@ Universe::Universe(nlohmann::json &config) : connect(pp); } } - - _installBindings(); -} - -Universe::~Universe() { - shutdown(); } void Universe::shutdown() { @@ -77,32 +84,26 @@ void Universe::shutdown() { bool Universe::listen(const string &addr) { auto l = new Listener(addr.c_str()); if (!l) return false; - unique_lock<mutex> lk(net_mutex_); + unique_lock<shared_mutex> lk(net_mutex_); listeners_.push_back(l); return l->isListening(); } Peer *Universe::connect(const string &addr) { - auto p = new Peer(addr.c_str(), &disp_); + auto p = new Peer(addr.c_str(), this, &disp_); if (!p) return nullptr; if (p->status() != Peer::kInvalid) { - unique_lock<mutex> lk(net_mutex_); + unique_lock<shared_mutex> lk(net_mutex_); peers_.push_back(p); } _installBindings(p); - - p->onConnect([this](Peer &p) { - peer_ids_[p.id()] = &p; - _notifyConnect(&p); - }); - return p; } void Universe::unbind(const std::string &name) { - unique_lock<mutex> lk(net_mutex_); + unique_lock<shared_mutex> lk(net_mutex_); disp_.unbind(name); } @@ -121,7 +122,7 @@ int Universe::_setDescriptors() { SOCKET n = 0; - unique_lock<mutex> lk(net_mutex_); + unique_lock<shared_mutex> lk(net_mutex_); //Set file descriptor for the listening sockets. for (auto l : listeners_) { @@ -156,18 +157,17 @@ void Universe::_installBindings(Peer *p) { } void Universe::_installBindings() { - bind("__subscribe__", [this](const UUID &id, const string &uri) -> bool { + /*bind("__subscribe__", [this](const UUID &id, const string &uri) -> bool { LOG(INFO) << "Subscription to " << uri << " by " << id.to_string(); - unique_lock<mutex> lk(net_mutex_); + unique_lock<shared_mutex> lk(net_mutex_); subscribers_[ftl::URI(uri).to_string()].push_back(id); return true; }); bind("__owner__", [this](const std::string &res) -> optional<UUID> { - LOG(INFO) << "SOMEONE ASKS FOR " << res; if (owned_.count(res) > 0) return this_peer; else return {}; - }); + });*/ } // Note: should be called inside a net lock @@ -178,13 +178,18 @@ void Universe::_cleanupPeers() { if (!(*i)->isValid()) { Peer *p = *i; LOG(INFO) << "Removing disconnected peer: " << p->id().to_string(); - _notifyDisconnect(p); + //_notifyDisconnect(p); auto ix = peer_ids_.find(p->id()); if (ix != peer_ids_.end()) peer_ids_.erase(ix); - delete p; i = peers_.erase(i); + + if (p->status() == ftl::net::Peer::kReconnecting) { + reconnects_.push_back({50, 1.0f, p}); + } else { + delete p; + } } else { i++; } @@ -197,7 +202,7 @@ Peer *Universe::getPeer(const UUID &id) const { else return ix->second; } -optional<UUID> Universe::findOwner(const string &res) { +/*optional<UUID> Universe::findOwner(const string &res) { // TODO(nick) cache this information return findOne<UUID>("__owner__", res); } @@ -238,6 +243,24 @@ bool Universe::_subscribe(const std::string &res) { LOG(WARNING) << "Subscribe to unknown resource: " << res; return false; } +}*/ + +void Universe::_periodic() { + auto i = reconnects_.begin(); + while (i != reconnects_.end()) { + if ((*i).peer->reconnect()) { + unique_lock<shared_mutex> lk(net_mutex_); + peers_.push_back((*i).peer); + i = reconnects_.erase(i); + } else if ((*i).tries > 0) { + (*i).tries--; + i++; + } else { + delete (*i).peer; + i = reconnects_.erase(i); + LOG(WARNING) << "Reconnection to peer failed"; + } + } } void Universe::__start(Universe * u) { @@ -259,10 +282,20 @@ void Universe::_run() { } #endif + auto start = std::chrono::high_resolution_clock::now(); + while (active_) { int n = _setDescriptors(); int selres = 1; + // Do periodics + auto now = std::chrono::high_resolution_clock::now(); + std::chrono::duration<double> elapsed = now - start; + if (elapsed.count() >= 1.0) { + start = now; + _periodic(); + } + // It is an error to use "select" with no sockets ... so just sleep if (n == 0) { std::this_thread::sleep_for(std::chrono::milliseconds(300)); @@ -287,7 +320,7 @@ void Universe::_run() { continue; } - unique_lock<mutex> lk(net_mutex_); + unique_lock<shared_mutex> lk(net_mutex_); //If connection request is waiting for (auto l : listeners_) { @@ -300,20 +333,16 @@ void Universe::_run() { SOCKET csock = accept(l->_socket(), (sockaddr*)&addr, (socklen_t*)&rsize); if (csock != INVALID_SOCKET) { - auto p = new Peer(csock, &disp_); + auto p = new Peer(csock, this, &disp_); peers_.push_back(p); _installBindings(p); - p->onConnect([this](Peer &p) { - peer_ids_[p.id()] = &p; - // Note, called in another thread so above lock - // does not apply. - _notifyConnect(&p); - }); } } } } + // TODO(Nick) Might switch to shared lock here? + //Also check each clients socket to see if any messages or errors are waiting for (auto s : peers_) { if (s != NULL && s->isValid()) { @@ -332,27 +361,33 @@ void Universe::_run() { } } -void Universe::onConnect(const std::string &name, std::function<void(ftl::net::Peer*)> cb) { - unique_lock<mutex> lk(net_mutex_); - on_connect_.push_back({name, cb}); +callback_t Universe::onConnect(const std::function<void(ftl::net::Peer*)> &cb) { + unique_lock<shared_mutex> lk(handler_mutex_); + callback_t id = cbid__++; + on_connect_.push_back({id, cb}); + return id; } -void Universe::onDisconnect(const std::string &name, std::function<void(ftl::net::Peer*)> cb) { - unique_lock<mutex> lk(net_mutex_); - on_disconnect_.push_back({name, cb}); +callback_t Universe::onDisconnect(const std::function<void(ftl::net::Peer*)> &cb) { + unique_lock<shared_mutex> lk(handler_mutex_); + callback_t id = cbid__++; + on_disconnect_.push_back({id, cb}); + return id; } -void Universe::onError(const std::string &name, std::function<void(ftl::net::Peer*, const ftl::net::Error &)> cb) { - unique_lock<mutex> lk(net_mutex_); - on_error_.push_back({name, cb}); +callback_t Universe::onError(const std::function<void(ftl::net::Peer*, const ftl::net::Error &)> &cb) { + unique_lock<shared_mutex> lk(handler_mutex_); + callback_t id = cbid__++; + on_error_.push_back({id, cb}); + return id; } -void Universe::removeCallbacks(const std::string &name) { - unique_lock<mutex> lk(net_mutex_); +void Universe::removeCallback(callback_t cbid) { + unique_lock<shared_mutex> lk(handler_mutex_); { auto i = on_connect_.begin(); while (i != on_connect_.end()) { - if ((*i).name == name) { + if ((*i).id == cbid) { i = on_connect_.erase(i); } else { i++; @@ -363,7 +398,7 @@ void Universe::removeCallbacks(const std::string &name) { { auto i = on_disconnect_.begin(); while (i != on_disconnect_.end()) { - if ((*i).name == name) { + if ((*i).id == cbid) { i = on_disconnect_.erase(i); } else { i++; @@ -374,7 +409,7 @@ void Universe::removeCallbacks(const std::string &name) { { auto i = on_error_.begin(); while (i != on_error_.end()) { - if ((*i).name == name) { + if ((*i).id == cbid) { i = on_error_.erase(i); } else { i++; @@ -384,12 +419,14 @@ void Universe::removeCallbacks(const std::string &name) { } void Universe::_notifyConnect(Peer *p) { - unique_lock<mutex> lk(net_mutex_); + shared_lock<shared_mutex> lk(handler_mutex_); + peer_ids_[p->id()] = p; + for (auto &i : on_connect_) { try { i.h(p); } catch(...) { - LOG(ERROR) << "Exception inside OnConnect hander: " << i.name; + LOG(ERROR) << "Exception inside OnConnect hander: " << i.id; } } } @@ -397,11 +434,12 @@ void Universe::_notifyConnect(Peer *p) { void Universe::_notifyDisconnect(Peer *p) { // In all cases, should already be locked outside this function call //unique_lock<mutex> lk(net_mutex_); + shared_lock<shared_mutex> lk(handler_mutex_); for (auto &i : on_disconnect_) { try { i.h(p); } catch(...) { - LOG(ERROR) << "Exception inside OnDisconnect hander: " << i.name; + LOG(ERROR) << "Exception inside OnDisconnect hander: " << i.id; } } } diff --git a/components/net/cpp/test/net_integration.cpp b/components/net/cpp/test/net_integration.cpp index fbfa7c22788cf5518720dfda61a6771ae9ccb8b0..7cca78a481a33f57e037765b206d39bb77745867 100644 --- a/components/net/cpp/test/net_integration.cpp +++ b/components/net/cpp/test/net_integration.cpp @@ -18,7 +18,6 @@ TEST_CASE("Universe::connect()", "[net]") { Universe b; a.listen("tcp://localhost:7077"); - //sleep_for(milliseconds(100)); SECTION("valid tcp connection using ipv4") { auto p = b.connect("tcp://127.0.0.1:7077"); @@ -86,7 +85,7 @@ TEST_CASE("Universe::onConnect()", "[net]") { SECTION("single valid remote init connection") { bool done = false; - a.onConnect("test", [&done](Peer *p) { + a.onConnect([&done](Peer *p) { done = true; }); @@ -98,7 +97,7 @@ TEST_CASE("Universe::onConnect()", "[net]") { SECTION("single valid init connection") { bool done = false; - b.onConnect("test", [&done](Peer *p) { + b.onConnect([&done](Peer *p) { done = true; }); @@ -111,13 +110,13 @@ TEST_CASE("Universe::onConnect()", "[net]") { TEST_CASE("Universe::onDisconnect()", "[net]") { Universe a; Universe b; - + a.listen("tcp://localhost:7077"); SECTION("single valid remote close") { bool done = false; - a.onDisconnect("test", [&done](Peer *p) { + a.onDisconnect([&done](Peer *p) { done = true; }); @@ -132,7 +131,7 @@ TEST_CASE("Universe::onDisconnect()", "[net]") { SECTION("single valid close") { bool done = false; - b.onDisconnect("test", [&done](Peer *p) { + b.onDisconnect([&done](Peer *p) { done = true; }); @@ -208,6 +207,9 @@ TEST_CASE("Universe::broadcast()", "[net]") { c.bind("hello", [&done2](int v) { done2 = v; }); + + REQUIRE( a.numberOfPeers() == 2 ); + sleep_for(milliseconds(100)); // NOTE: Binding might not be ready a.broadcast("hello", 676); @@ -222,6 +224,7 @@ TEST_CASE("Universe::findAll()", "") { Universe a; Universe b; Universe c; + a.listen("tcp://localhost:7077"); b.connect("tcp://localhost:7077")->waitConnection(); c.connect("tcp://localhost:7077")->waitConnection(); @@ -248,107 +251,10 @@ TEST_CASE("Universe::findAll()", "") { return {6,7,8}; }); + sleep_for(milliseconds(100)); // NOTE: Binding might not be ready + auto res = a.findAll<int>("test_all"); REQUIRE( (res.size() == 6) ); REQUIRE( (res[0] == 3 || res[0] == 6) ); } } - -TEST_CASE("Universe::findOwner()", "") { - Universe a; - Universe b; - a.listen("tcp://localhost:7077"); - b.connect("tcp://localhost:7077")->waitConnection(); - - SECTION("no owners exist") { - REQUIRE( !b.findOwner("ftl://test") ); - } - - SECTION("one owner exists") { - a.createResource("ftl://test"); - REQUIRE( *(b.findOwner("ftl://test")) == ftl::net::this_peer ); - } - - SECTION("three peers and one owner") { - Universe c; - c.connect("tcp://localhost:7077")->waitConnection(); - b.setLocalID(ftl::UUID(7)); - - b.createResource("ftl://test"); - REQUIRE( *(a.findOwner("ftl://test")) == ftl::UUID(7) ); - } - - SECTION("three peers and one owner (2)") { - Universe c; - c.connect("tcp://localhost:7077")->waitConnection(); - c.setLocalID(ftl::UUID(7)); - - c.createResource("ftl://test"); - auto r = a.findOwner("ftl://test"); - REQUIRE( r ); - REQUIRE( *r == ftl::UUID(7) ); - } -} - -TEST_CASE("Universe::subscribe()", "") { - Universe a; - Universe b; - a.listen("tcp://localhost:7077"); - b.connect("tcp://localhost:7077")->waitConnection(); - - SECTION("no resource exists") { - REQUIRE( !b.subscribe("ftl://test", []() {}) ); - } - - SECTION("one resource exists") { - a.createResource("ftl://test"); - REQUIRE( b.subscribe("ftl://test", []() {}) ); - sleep_for(milliseconds(50)); - REQUIRE( a.numberOfSubscribers("ftl://test") == 1); - } -} - -TEST_CASE("Universe::publish()", "") { - Universe a; - Universe b; - a.listen("tcp://localhost:7077"); - ftl::net::Peer *p = b.connect("tcp://localhost:7077"); - p->waitConnection(); - - SECTION("no subscribers") { - a.createResource("ftl://test"); - a.publish("ftl://test", 55); - } - - SECTION("one subscriber") { - int done = 0; - a.createResource("ftl://test"); - REQUIRE( b.subscribe("ftl://test", [&done](int a) { - done = a; - }) ); - sleep_for(milliseconds(50)); - - a.publish("ftl://test", 56); - sleep_for(milliseconds(50)); - - REQUIRE( done == 56 ); - } - - SECTION("publish to disconnected subscriber") { - int done = 0; - a.createResource("ftl://test2"); - REQUIRE( b.subscribe("ftl://test2", [&done](int a) { - done = a; - }) ); - sleep_for(milliseconds(50)); - - p->close(); - sleep_for(milliseconds(100)); - - a.publish("ftl://test2", 56); - sleep_for(milliseconds(50)); - - REQUIRE( done == 0 ); - } -} - diff --git a/components/net/cpp/test/peer_unit.cpp b/components/net/cpp/test/peer_unit.cpp index d72d96791349818949b65eb2957f7cf3c41fbf70..09eb4bef9e70171b84e57f1e954e833aa9768566 100644 --- a/components/net/cpp/test/peer_unit.cpp +++ b/components/net/cpp/test/peer_unit.cpp @@ -6,6 +6,7 @@ #include <tuple> #include <thread> #include <chrono> +#include <functional> #include <ftl/net/common.hpp> #include <ftl/net/peer.hpp> @@ -18,6 +19,7 @@ /* Allow socket functions to be mocked */ #define TEST_MOCKS +#define _FTL_NET_UNIVERSE_HPP_ #include "../src/net_internal.hpp" using std::tuple; @@ -33,9 +35,28 @@ using std::chrono::milliseconds; // --- Mock -------------------------------------------------------------------- +namespace ftl { +namespace net { + +typedef unsigned int callback_t; + +class Universe { + public: + Universe() {}; + + void _notifyConnect(Peer*) {} + void _notifyDisconnect(Peer*) {} + void removeCallback(callback_t id) {} + + callback_t onConnect(const std::function<void(Peer*)> &f) { return 0; } + callback_t onDisconnect(const std::function<void(Peer*)> &f) { return 0; } +}; +} +} + class MockPeer : public Peer { public: - MockPeer() : Peer((SOCKET)0) {} + MockPeer() : Peer((SOCKET)0, new ftl::net::Universe()) {} void mock_data() { data(); } }; diff --git a/components/renderers/cpp/include/ftl/display.hpp b/components/renderers/cpp/include/ftl/display.hpp index 2c2e8b8d5b80a1f3355e27462798d0fe6f698f12..bf4eb51f5687eb7f905c70cab0a7b451cc7fce57 100644 --- a/components/renderers/cpp/include/ftl/display.hpp +++ b/components/renderers/cpp/include/ftl/display.hpp @@ -46,7 +46,7 @@ class Display : public ftl::Configurable { void wait(int ms); - void onKey(std::function<void(int)> h) { key_handlers_.push_back(h); } + void onKey(const std::function<void(int)> &h) { key_handlers_.push_back(h); } private: #if defined HAVE_VIZ diff --git a/components/renderers/cpp/include/ftl/rgbd_display.hpp b/components/renderers/cpp/include/ftl/rgbd_display.hpp index fead71e645cefaccfbed28fd289541920fb547c0..0467030b0beef75e183aea34302476418196be15 100644 --- a/components/renderers/cpp/include/ftl/rgbd_display.hpp +++ b/components/renderers/cpp/include/ftl/rgbd_display.hpp @@ -20,7 +20,7 @@ class Display : public ftl::Configurable { bool active() const { return active_; } - void onKey(std::function<void(int)> h) { key_handlers_.push_back(h); } + void onKey(const std::function<void(int)> &h) { key_handlers_.push_back(h); } void wait(int ms); diff --git a/components/renderers/cpp/src/display.cpp b/components/renderers/cpp/src/display.cpp index 2911c9ea6e38610e4fd123a1208c5cd61783c2d9..712590d80fbad551fdfb8c4585a187899b753999 100644 --- a/components/renderers/cpp/src/display.cpp +++ b/components/renderers/cpp/src/display.cpp @@ -225,7 +225,7 @@ bool Display::render(pcl::PointCloud<pcl::PointXYZRGB>::ConstPtr pc) { bool Display::render(const cv::Mat &img, style_t s) { if (s == STYLE_NORMAL) { cv::imshow("Image", img); - } else if (s = STYLE_DISPARITY) { + } else if (s == STYLE_DISPARITY) { Mat idepth; if (value("flip_vert", false)) { diff --git a/components/renderers/cpp/src/rgbd_display.cpp b/components/renderers/cpp/src/rgbd_display.cpp index e5ae0ed765a05a34152f32cd8f5a6c049f352900..d8f588ad1c3ccec27590176d75d336aae42ce3c4 100644 --- a/components/renderers/cpp/src/rgbd_display.cpp +++ b/components/renderers/cpp/src/rgbd_display.cpp @@ -119,7 +119,7 @@ void Display::update() { centre_ += (lookPoint_ - centre_) * (lerpSpeed_ * 0.1f); Eigen::Matrix4f viewPose = lookAt<float>(eye_,centre_,up_).inverse(); - //source_->setPose(viewPose); + source_->setPose(viewPose); Mat rgb, depth; //source_->grab(); diff --git a/components/rgbd-sources/include/ftl/net_source.hpp b/components/rgbd-sources/include/ftl/net_source.hpp index 2693128d275e3e4f7ad317e8d93b68dd5f096aa1..4199d0e801d4122f18a16edd468b426aced4371c 100644 --- a/components/rgbd-sources/include/ftl/net_source.hpp +++ b/components/rgbd-sources/include/ftl/net_source.hpp @@ -36,6 +36,7 @@ class NetSource : public RGBDSource { int N_; bool active_; std::string uri_; + ftl::net::callback_t h_; bool _getCalibration(ftl::net::Universe &net, const ftl::UUID &peer, const std::string &src, ftl::rgbd::CameraParameters &p); void _recv(const std::vector<unsigned char> &jpg, const std::vector<unsigned char> &d); diff --git a/components/rgbd-sources/include/ftl/snapshot.hpp b/components/rgbd-sources/include/ftl/snapshot.hpp index 3f0f6d1a403e625e42686a576a68828d51a95c1d..b146c49d8b1a370f154e16b806bcdd935eed987b 100644 --- a/components/rgbd-sources/include/ftl/snapshot.hpp +++ b/components/rgbd-sources/include/ftl/snapshot.hpp @@ -21,7 +21,7 @@ namespace rgbd { class SnapshotWriter { public: - SnapshotWriter(const std::string &filename); + explicit SnapshotWriter(const std::string &filename); ~SnapshotWriter(); bool addCameraRGBD(const std::string &name, const cv::Mat &rgb, const cv::Mat &depth, const Eigen::Matrix4f &pose, const ftl::rgbd::CameraParameters ¶ms); @@ -46,7 +46,7 @@ struct SnapshotEntry { class SnapshotReader { public: - SnapshotReader(const std::string &filename); + explicit SnapshotReader(const std::string &filename); ~SnapshotReader(); bool getCameraRGBD(const std::string &id, cv::Mat &rgb, cv::Mat &depth, Eigen::Matrix4f &pose, ftl::rgbd::CameraParameters ¶ms); diff --git a/components/rgbd-sources/src/calibrate.cpp b/components/rgbd-sources/src/calibrate.cpp index 05364c507c8072c0c4da2f060ec7d8a992945c61..315fc8f35d15f03957fd6d8716244850278d8ddd 100644 --- a/components/rgbd-sources/src/calibrate.cpp +++ b/components/rgbd-sources/src/calibrate.cpp @@ -384,7 +384,7 @@ bool Calibrate::recalibrate() { bool Calibrate::_recalibrate(vector<vector<Point2f>> *imagePoints, Mat *cameraMatrix, Mat *distCoeffs, Size *imageSize) { - int winSize = 11; // parser.get<int>("winSize"); + //int winSize = 11; // parser.get<int>("winSize"); float grid_width = settings_.squareSize * (settings_.boardSize.width - 1); bool release_object = false; diff --git a/components/rgbd-sources/src/net_source.cpp b/components/rgbd-sources/src/net_source.cpp index 8c0187f039e2a60d2a0b5eb041d3efd1464a99f5..ae8bd9f9eb9644775a81ed521b2881f761f475c9 100644 --- a/components/rgbd-sources/src/net_source.cpp +++ b/components/rgbd-sources/src/net_source.cpp @@ -52,12 +52,19 @@ NetSource::NetSource(nlohmann::json &config, ftl::net::Universe *net) }); _updateURI(); + + h_ = net->onConnect([this](ftl::net::Peer *p) { + LOG(INFO) << "NetSource restart..."; + _updateURI(); + }); } NetSource::~NetSource() { if (uri_.size() > 0) { net_->unbind(uri_); } + + net_->removeCallback(h_); } void NetSource::_recv(const vector<unsigned char> &jpg, const vector<unsigned char> &d) { @@ -69,7 +76,9 @@ void NetSource::_recv(const vector<unsigned char> &jpg, const vector<unsigned ch N_--; if (N_ == 0) { N_ += 10; - net_->send(peer_, "get_stream", *get<string>("uri"), 10, 0, net_->id(), *get<string>("uri")); + if (!net_->send(peer_, "get_stream", *get<string>("uri"), 10, 0, net_->id(), *get<string>("uri"))) { + active_ = false; + } } } @@ -78,7 +87,9 @@ void NetSource::setPose(const Eigen::Matrix4f &pose) { vector<unsigned char> vec((unsigned char*)pose.data(), (unsigned char*)(pose.data()+(pose.size()))); try { - net_->send(peer_, "set_pose", *get<string>("uri"), vec); + if (!net_->send(peer_, "set_pose", *get<string>("uri"), vec)) { + active_ = false; + } } catch (...) { } @@ -86,6 +97,7 @@ void NetSource::setPose(const Eigen::Matrix4f &pose) { } void NetSource::_updateURI() { + unique_lock<mutex> lk(mutex_); active_ = false; auto uri = get<string>("uri");