diff --git a/CMakeLists.txt b/CMakeLists.txt index 54cae954b93e7e7211925b648da7169be5fae8e3..e448ccf53aa0d14c8d012b33a2272202ba16d77e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -66,6 +66,15 @@ else() endif() endif() +find_library( NANOGUI_LIBRARY NAMES nanogui libnanogui) +if (NANOGUI_LIBRARY) + set(HAVE_NANOGUI TRUE) + add_library(nanogui UNKNOWN IMPORTED) + #set_property(TARGET nanogui PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${NANOGUI_EXTRA_INCS}) + set_property(TARGET nanogui PROPERTY IMPORTED_LOCATION ${NANOGUI_LIBRARY}) + message(STATUS "Found NanoGUI: ${NANOGUI_LIBRARY}") +endif() + find_program( NODE_NPM NAMES npm ) if (NODE_NPM) message(STATUS "Found NPM: ${NODE_NPM}") @@ -136,8 +145,8 @@ include(ftl_paths) if (WIN32) # TODO(nick) Should do based upon compiler (VS) add_definitions(-DWIN32) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /std:c++17") - set(CMAKE_CXX_FLAGS_DEBUG "-DFTL_DEBUG -Wall") - set(CMAKE_CXX_FLAGS_RELEASE "/O2") + set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} /DFTL_DEBUG /Wall") + set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} /O2") set(OS_LIBS "") else() add_definitions(-DUNIX) @@ -168,6 +177,10 @@ if (BUILD_RECONSTRUCT AND HAVE_PCL) add_subdirectory(applications/reconstruct) endif() +if (HAVE_NANOGUI) + add_subdirectory(applications/gui) +endif() + ### Generate Build Configuration Files ========================================= configure_file(${CMAKE_SOURCE_DIR}/components/common/cpp/include/ftl/config.h.in diff --git a/applications/gui/CMakeLists.txt b/applications/gui/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..ac9ae7ec20881ab9c4112d9baf0696ce87693536 --- /dev/null +++ b/applications/gui/CMakeLists.txt @@ -0,0 +1,25 @@ +# Need to include staged files and libs +#include_directories(${PROJECT_SOURCE_DIR}/reconstruct/include) +#include_directories(${PROJECT_BINARY_DIR}) + +set(GUISRC + src/main.cpp + src/ctrl_window.cpp + src/src_window.cpp +) + +add_executable(ftl-gui ${GUISRC}) + +target_include_directories(ftl-gui PUBLIC + $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> + $<INSTALL_INTERFACE:include> + PRIVATE src) + +#if (CUDA_FOUND) +#set_property(TARGET ftl-gui PROPERTY CUDA_SEPARABLE_COMPILATION ON) +#endif() + +#target_include_directories(cv-node PUBLIC ${PROJECT_SOURCE_DIR}/include) +target_link_libraries(ftl-gui ftlcommon ftlctrl ftlrgbd Threads::Threads ${OpenCV_LIBS} glog::glog ftlnet ftlrender nanogui GL) + + diff --git a/applications/gui/src/ctrl_window.cpp b/applications/gui/src/ctrl_window.cpp new file mode 100644 index 0000000000000000000000000000000000000000..38a496376cd95b5ec96b72a9cd3c8bb0694ed124 --- /dev/null +++ b/applications/gui/src/ctrl_window.cpp @@ -0,0 +1,78 @@ +#include "ctrl_window.hpp" + +#include <nanogui/layout.h> +#include <nanogui/label.h> +#include <nanogui/combobox.h> +#include <nanogui/button.h> +#include <nanogui/entypo.h> + +#include <vector> +#include <string> + +using ftl::gui::ControlWindow; +using std::string; +using std::vector; + + +ControlWindow::ControlWindow(nanogui::Widget *parent, ftl::ctrl::Master *ctrl) + : nanogui::Window(parent, "Node Control"), ctrl_(ctrl) { + setLayout(new nanogui::GroupLayout()); + + using namespace nanogui; + + _updateDetails(); + + auto button = new Button(this, "Add Node", ENTYPO_ICON_PLUS); + button->setCallback([this] { + // Show new connection dialog + }); + button = new Button(this, "Restart All", ENTYPO_ICON_CYCLE); + button->setCallback([this] { + ctrl_->restart(); + }); + button = new Button(this, "Shutdown All", ENTYPO_ICON_POWER_PLUG); + button->setCallback([this] { + ctrl_->shutdown(); + }); + + new Label(this, "Select Node","sans-bold"); + auto select = new ComboBox(this, node_titles_); + select->setCallback([this](int ix) { + LOG(INFO) << "Change node: " << ix; + _changeActive(ix); + }); + + button = new Button(this, "Restart Node", ENTYPO_ICON_CYCLE); + button->setCallback([this] { + ctrl_->restart(_getActiveID()); + }); + + button = new Button(this, "Shutdown Node", ENTYPO_ICON_POWER_PLUG); + button->setCallback([this] { + ctrl_->shutdown(_getActiveID()); + }); + + _changeActive(0); +} + +ControlWindow::~ControlWindow() { + +} + +void ControlWindow::_updateDetails() { + node_details_ = ctrl_->getSlaves(); + + node_titles_.clear(); + for (auto &d : node_details_) { + node_titles_.push_back(d["title"].get<string>()); + } +} + +void ControlWindow::_changeActive(int ix) { + active_ix_ = ix; +} + +ftl::UUID ControlWindow::_getActiveID() { + return ftl::UUID(node_details_[active_ix_]["id"].get<string>()); +} + diff --git a/applications/gui/src/ctrl_window.hpp b/applications/gui/src/ctrl_window.hpp new file mode 100644 index 0000000000000000000000000000000000000000..58df7b9be88e7ddba9b344464336369c53d36d96 --- /dev/null +++ b/applications/gui/src/ctrl_window.hpp @@ -0,0 +1,33 @@ +#ifndef _FTL_GUI_CTRLWINDOW_HPP_ +#define _FTL_GUI_CTRLWINDOW_HPP_ + +#include <nanogui/window.h> +#include <ftl/master.hpp> +#include <ftl/uuid.hpp> + +namespace ftl { +namespace gui { + +/** + * Manage connected nodes and add new connections. + */ +class ControlWindow : public nanogui::Window { + public: + ControlWindow(nanogui::Widget *parent, ftl::ctrl::Master *ctrl); + ~ControlWindow(); + + private: + ftl::ctrl::Master *ctrl_; + std::vector<ftl::config::json_t> node_details_; + std::vector<std::string> node_titles_; + int active_ix_; + + void _updateDetails(); + void _changeActive(int); + ftl::UUID _getActiveID(); +}; + +} +} + +#endif // _FTL_GUI_CTRLWINDOW_HPP_ diff --git a/applications/gui/src/main.cpp b/applications/gui/src/main.cpp new file mode 100644 index 0000000000000000000000000000000000000000..b5f911b15c014199aa3ae9fa5a50ce31a0b66e72 --- /dev/null +++ b/applications/gui/src/main.cpp @@ -0,0 +1,104 @@ +#include <ftl/configuration.hpp> +#include <ftl/net/universe.hpp> +#include <ftl/rgbd.hpp> +#include <ftl/master.hpp> + +#include <loguru.hpp> + +#include <opencv2/opencv.hpp> + +#include <nanogui/opengl.h> +#include <nanogui/glutil.h> +#include <nanogui/screen.h> +#include <nanogui/window.h> +#include <nanogui/layout.h> +#include <nanogui/imageview.h> +#include <nanogui/combobox.h> +#include <nanogui/label.h> + +#include "ctrl_window.hpp" +#include "src_window.hpp" + +using std::string; +using ftl::rgbd::RGBDSource; + +/*struct SourceViews { + ftl::rgbd::RGBDSource *source; + GLTexture texture; + nanogui::ImageView *view; +};*/ + + + +class FTLApplication : public nanogui::Screen { + public: + explicit FTLApplication(ftl::Configurable *root, ftl::net::Universe *net, ftl::ctrl::Master *controller) : nanogui::Screen(Eigen::Vector2i(1024, 768), "FT-Lab GUI") { + using namespace nanogui; + net_ = net; + + auto cwindow = new ftl::gui::ControlWindow(this, controller); + auto swindow = new ftl::gui::SourceWindow(this, controller); + + setVisible(true); + performLayout(); + } + + virtual void draw(NVGcontext *ctx) { + nvgText(ctx, 10, 10, "FT-Lab Remote Presence System", NULL); + + /* Draw the user interface */ + Screen::draw(ctx); + } + + private: + //std::vector<SourceViews> sources_; + ftl::net::Universe *net_; +}; + +int main(int argc, char **argv) { + auto root = ftl::configure(argc, argv, "gui_default"); + ftl::net::Universe *net = ftl::create<ftl::net::Universe>(root, "net"); + + net->waitConnections(); + + ftl::ctrl::Master controller(root, net); + controller.onLog([](const ftl::ctrl::LogEvent &e){ + const int v = e.verbosity; + switch (v) { + case -2: LOG(ERROR) << "Remote log: " << e.message; break; + case -1: LOG(WARNING) << "Remote log: " << e.message; break; + case 0: LOG(INFO) << "Remote log: " << e.message; break; + } + }); + + /*auto available = net.findAll<string>("list_streams"); + for (auto &a : available) { + std::cout << " -- " << a << std::endl; + }*/ + + try { + nanogui::init(); + + /* scoped variables */ { + nanogui::ref<FTLApplication> app = new FTLApplication(root, net, &controller); + app->drawAll(); + app->setVisible(true); + nanogui::mainloop(); + } + + nanogui::shutdown(); + } catch (const std::runtime_error &e) { + std::string error_msg = std::string("Caught a fatal error: ") + std::string(e.what()); + #if defined(_WIN32) + MessageBoxA(nullptr, error_msg.c_str(), NULL, MB_ICONERROR | MB_OK); + #else + std::cerr << error_msg << std::endl; + #endif + return -1; + } + + delete net; + delete root; + + return 0; +} \ No newline at end of file diff --git a/applications/gui/src/src_window.cpp b/applications/gui/src/src_window.cpp new file mode 100644 index 0000000000000000000000000000000000000000..008933886c4cfdf0913dec91add26bff88d6a294 --- /dev/null +++ b/applications/gui/src/src_window.cpp @@ -0,0 +1,188 @@ +#include "src_window.hpp" + +#include <nanogui/imageview.h> +#include <nanogui/combobox.h> +#include <nanogui/label.h> +#include <nanogui/opengl.h> +#include <nanogui/glutil.h> +#include <nanogui/screen.h> +#include <nanogui/layout.h> + +using ftl::gui::SourceWindow; +using ftl::rgbd::RGBDSource; +using std::string; + +class GLTexture { + public: + GLTexture() { + glGenTextures(1, &glid_); + glBindTexture(GL_TEXTURE_2D, glid_); + cv::Mat m(cv::Size(100,100), CV_8UC3); + glTexImage2D(GL_TEXTURE_2D, 0, GL_RGB8, m.cols, m.rows, 0, GL_RGB, GL_UNSIGNED_BYTE, m.data); + glTexParameteri(GL_TEXTURE_2D, GL_TEXTURE_WRAP_S, GL_CLAMP_TO_EDGE); + glTexParameteri(GL_TEXTURE_2D, GL_TEXTURE_WRAP_T, GL_CLAMP_TO_EDGE); + glTexParameteri(GL_TEXTURE_2D, GL_TEXTURE_MAG_FILTER, GL_NEAREST); + glTexParameteri(GL_TEXTURE_2D, GL_TEXTURE_MIN_FILTER, GL_LINEAR); + } + ~GLTexture() { + glDeleteTextures(1, &glid_); + } + + void update(cv::Mat &m) { + if (m.rows == 0) return; + glBindTexture(GL_TEXTURE_2D, glid_); + // TODO Allow for other formats + glTexImage2D(GL_TEXTURE_2D, 0, GL_RGB8, m.cols, m.rows, 0, GL_BGR, GL_UNSIGNED_BYTE, m.data); + auto err = glGetError(); + if (err != 0) LOG(ERROR) << "OpenGL Texture error: " << err; + } + + unsigned int texture() const { return glid_; } + + private: + unsigned int glid_; +}; + +template<class T> +Eigen::Matrix<T,4,4> lookAt +( + Eigen::Matrix<T,3,1> const & eye, + Eigen::Matrix<T,3,1> const & center, + Eigen::Matrix<T,3,1> const & up +) +{ + typedef Eigen::Matrix<T,4,4> Matrix4; + typedef Eigen::Matrix<T,3,1> Vector3; + + Vector3 f = (center - eye).normalized(); + Vector3 u = up.normalized(); + Vector3 s = f.cross(u).normalized(); + u = s.cross(f); + + Matrix4 res; + res << s.x(),s.y(),s.z(),-s.dot(eye), + u.x(),u.y(),u.z(),-u.dot(eye), + -f.x(),-f.y(),-f.z(),f.dot(eye), + 0,0,0,1; + + return res; +} + +class VirtualCameraView : public nanogui::ImageView { + public: + VirtualCameraView(nanogui::Widget *parent) : nanogui::ImageView(parent, 0) { + src_ = nullptr; + eye_ = Eigen::Vector3f(0.0f, 0.0f, 0.0f); + centre_ = Eigen::Vector3f(0.0f, 0.0f, -4.0f); + up_ = Eigen::Vector3f(0,1.0f,0); + lookPoint_ = Eigen::Vector3f(0.0f,0.0f,-4.0f); + lerpSpeed_ = 0.4f; + depth_ = false; + } + + void setSource(RGBDSource *src) { src_ = src; } + + bool mouseButtonEvent(const nanogui::Vector2i &p, int button, bool down, int modifiers) { + //LOG(INFO) << "Mouse move: " << p[0]; + if (src_ && down) { + Eigen::Vector4f camPos = src_->point(p[0],p[1]); + camPos *= -1.0f; + Eigen::Vector4f worldPos = src_->getPose() * camPos; + lookPoint_ = Eigen::Vector3f(worldPos[0],worldPos[1],worldPos[2]); + LOG(INFO) << "Depth at click = " << -camPos[2]; + } + } + + bool keyboardEvent(int key, int scancode, int action, int modifiers) { + LOG(INFO) << "Key press" << key << " - " << action; + if (key == 81 || key == 83) { + // TODO Should rotate around lookAt object, but requires correct depth + Eigen::Quaternion<float> q; q = Eigen::AngleAxis<float>((key == 81) ? 0.01f : -0.01f, up_); + eye_ = (q * (eye_ - centre_)) + centre_; + } else if (key == 84 || key == 82) { + float scalar = (key == 84) ? 0.99f : 1.01f; + eye_ = ((eye_ - centre_) * scalar) + centre_; + } + } + + void draw(NVGcontext *ctx) { + //net_->broadcast("grab"); + if (src_) { + cv::Mat rgb, depth; + centre_ += (lookPoint_ - centre_) * (lerpSpeed_ * 0.1f); + Eigen::Matrix4f viewPose = lookAt<float>(eye_,centre_,up_).inverse(); + + src_->setPose(viewPose); + src_->grab(); + src_->getRGBD(rgb, depth); + + if (depth_) { + if (depth.rows > 0) { + cv::Mat idepth; + depth.convertTo(idepth, CV_8U, 255.0f / 10.0f); // TODO(nick) + applyColorMap(idepth, idepth, cv::COLORMAP_JET); + texture_.update(idepth); + bindImage(texture_.texture()); + } + } else { + if (rgb.rows > 0) { + texture_.update(rgb); + bindImage(texture_.texture()); + } + } + + screen()->performLayout(ctx); + } + ImageView::draw(ctx); + } + + void setDepth(bool d) { depth_ = d; } + + private: + RGBDSource *src_; + GLTexture texture_; + Eigen::Vector3f eye_; + Eigen::Vector3f centre_; + Eigen::Vector3f up_; + Eigen::Vector3f lookPoint_; + float lerpSpeed_; + bool depth_; +}; + +SourceWindow::SourceWindow(nanogui::Widget *parent, ftl::ctrl::Master *ctrl) + : nanogui::Window(parent, "Source View"), ctrl_(ctrl) { + setLayout(new nanogui::GroupLayout()); + + using namespace nanogui; + + src_ = ftl::create<ftl::rgbd::NetSource>(ctrl->getRoot(), "source", ctrl->getNet()); + + Widget *tools = new Widget(this); + tools->setLayout(new BoxLayout(Orientation::Horizontal, + Alignment::Middle, 0, 6)); + + new Label(tools, "Select source","sans-bold"); + auto available = ctrl->getNet()->findAll<string>("list_streams"); + auto select = new ComboBox(tools, available); + select->setCallback([this,available](int ix) { + LOG(INFO) << "Change source: " << ix; + src_->set("uri", available[ix]); + }); + + auto depth = new Button(tools, "Depth"); + depth->setFlags(Button::ToggleButton); + depth->setChangeCallback([this](bool state) { + image_->setDepth(state); + }); + + + auto imageView = new VirtualCameraView(this); + //cam.view = imageView; + imageView->setGridThreshold(20); + imageView->setSource(src_); + image_ = imageView; +} + +SourceWindow::~SourceWindow() { + +} diff --git a/applications/gui/src/src_window.hpp b/applications/gui/src/src_window.hpp new file mode 100644 index 0000000000000000000000000000000000000000..347b1706949603dc523635b6fca737c4fc94bc63 --- /dev/null +++ b/applications/gui/src/src_window.hpp @@ -0,0 +1,32 @@ +#ifndef _FTL_GUI_SRCWINDOW_HPP_ +#define _FTL_GUI_SRCWINDOW_HPP_ + +#include <nanogui/window.h> +#include <ftl/master.hpp> +#include <ftl/uuid.hpp> +#include <ftl/net_source.hpp> + +class VirtualCameraView; + +namespace ftl { +namespace gui { + +/** + * Manage connected nodes and add new connections. + */ +class SourceWindow : public nanogui::Window { + public: + SourceWindow(nanogui::Widget *parent, ftl::ctrl::Master *ctrl); + ~SourceWindow(); + + private: + ftl::ctrl::Master *ctrl_; + ftl::rgbd::NetSource *src_; + VirtualCameraView *image_; + +}; + +} +} + +#endif // _FTL_GUI_SRCWINDOW_HPP_ diff --git a/applications/reconstruct/src/main.cpp b/applications/reconstruct/src/main.cpp index 4ebc6fbd37fc8fe9e68a632f03bac386b940eb8d..88c8c23eea98972033ffd8e3dfdf5ff337b58eba 100644 --- a/applications/reconstruct/src/main.cpp +++ b/applications/reconstruct/src/main.cpp @@ -12,6 +12,7 @@ #include <ftl/scene_rep_hash_sdf.hpp> #include <ftl/rgbd.hpp> #include <ftl/virtual_source.hpp> +#include <ftl/rgbd_streamer.hpp> // #include <zlib.h> // #include <lz4.h> @@ -177,6 +178,13 @@ static void run(ftl::Configurable *root) { virt->setScene(scene); display->setSource(virt); + ftl::rgbd::Streamer *stream = ftl::create<ftl::rgbd::Streamer>(root, "stream", net); + stream->add(virt); + // Also proxy all inputs + for (auto &in : inputs) { + stream->add(in.source); + } + unsigned char frameCount = 0; bool paused = false; @@ -220,7 +228,10 @@ static void run(ftl::Configurable *root) { } frameCount++; + + stream->poll(); display->update(); + //sleep_for(milliseconds(10)); } } diff --git a/applications/vision/src/main.cpp b/applications/vision/src/main.cpp index 64c2cfbb7eb675a73944b7ddd7e2a553842a6021..aba47f6cd970f5548df5e0364c8675fff8c13960 100644 --- a/applications/vision/src/main.cpp +++ b/applications/vision/src/main.cpp @@ -104,7 +104,9 @@ static void run(ftl::Configurable *root) { } LOG(INFO) << "Stopping..."; + slave.stop(); stream->stop(); + net->shutdown(); delete stream; delete display; @@ -126,6 +128,8 @@ int main(int argc, char **argv) { // ftl::middlebury::test(config); //} + delete root; + LOG(INFO) << "Terminating with code " << ftl::exit_code; return ftl::exit_code; } diff --git a/components/common/cpp/include/ftl/config.h.in b/components/common/cpp/include/ftl/config.h.in index 3d2b7d296440a20bb5515b078d9358d90e0e88d7..296da9eec1e60d6848d81d1dec84c1258586664f 100644 --- a/components/common/cpp/include/ftl/config.h.in +++ b/components/common/cpp/include/ftl/config.h.in @@ -19,6 +19,7 @@ #cmakedefine HAVE_PCL #cmakedefine HAVE_RENDER #cmakedefine HAVE_LIBSGM +#cmakedefine HAVE_NANOGUI extern const char *FTL_VERSION_LONG; extern const char *FTL_VERSION; diff --git a/components/common/cpp/include/ftl/configurable.hpp b/components/common/cpp/include/ftl/configurable.hpp index 3fb497c1d3bb9ccc17d543db5716670ca79c55f6..092fc1493a748a330976386e5cd874e599fbaa8f 100644 --- a/components/common/cpp/include/ftl/configurable.hpp +++ b/components/common/cpp/include/ftl/configurable.hpp @@ -38,9 +38,7 @@ struct Event { class Configurable { public: Configurable(); - explicit Configurable(nlohmann::json &config) : config_(config) { - if (config["uri"].is_string()) __changeURI(config["uri"].get<std::string>(), this); - } + explicit Configurable(nlohmann::json &config); virtual ~Configurable() {} /** diff --git a/components/common/cpp/include/ftl/configuration.hpp b/components/common/cpp/include/ftl/configuration.hpp index 046b58a07bf536a691a970b3a9d93bf4b5ab119b..d38472746254587e4f495547ba48e095f750ed12 100644 --- a/components/common/cpp/include/ftl/configuration.hpp +++ b/components/common/cpp/include/ftl/configuration.hpp @@ -126,15 +126,23 @@ T *ftl::config::create(ftl::Configurable *parent, const std::string &name, ARGS entity["$id"] = id_str + std::string("#") + name; } } - } /*else { - nlohmann::json &res = resolve(entity); - if (!res["uri"].is_string()) { - res["uri"] = *parent->get<std::string>("uri") + std::string("/") + name; - LOG(WARNING) << "Creating false URI!!! - " << res["uri"].get<std::string>(); + + return create<T>(entity, args...); + } else if (entity.is_null()) { + // Must create the object from scratch... + std::string id_str = *parent->get<std::string>("$id"); + if (id_str.find('#') != std::string::npos) { + id_str = id_str + std::string("/") + name; + } else { + id_str = id_str + std::string("#") + name; } - }*/ + parent->getConfig()[name] = { + {"$id", id_str} + }; - return create<T>(entity, args...); + nlohmann::json &entity2 = parent->getConfig()[name]; + return create<T>(entity2, args...); + } } #endif // _FTL_COMMON_CONFIGURATION_HPP_ diff --git a/components/common/cpp/include/loguru.hpp b/components/common/cpp/include/loguru.hpp index e43dc742e83d21405dacdafafa830a3251703998..a65ab2be28f1eeff9f918f075a0e7c78f9a04ba3 100644 --- a/components/common/cpp/include/loguru.hpp +++ b/components/common/cpp/include/loguru.hpp @@ -98,6 +98,7 @@ Website: www.ilikebigbits.com #define LOGURU_VERSION_PATCH 0 #define LOGURU_REPLACE_GLOG 1 +#define LOGURU_CATCH_SIGABRT 0 #if defined(_MSC_VER) #include <sal.h> // Needed for _In_z_ etc annotations diff --git a/components/common/cpp/src/configurable.cpp b/components/common/cpp/src/configurable.cpp index 81f9092b8e316af4fb4dc13f104b8d3b0b77af30..87f2a454fc4c3f7b8f3438f42e980aa4358fb58b 100644 --- a/components/common/cpp/src/configurable.cpp +++ b/components/common/cpp/src/configurable.cpp @@ -9,7 +9,14 @@ using ftl::config::json_t; extern nlohmann::json null_json; -Configurable::Configurable() : config_(null_json) {} +Configurable::Configurable() : config_(null_json) { + ftl::config::registerConfigurable(this); +} + +Configurable::Configurable(nlohmann::json &config) : config_(config) { + //if (config["uri"].is_string()) __changeURI(config["uri"].get<std::string>(), this); + ftl::config::registerConfigurable(this); +} void Configurable::required(const char *f, const std::vector<std::tuple<std::string, std::string, std::string>> &r) { bool diderror = false; diff --git a/components/common/cpp/src/configuration.cpp b/components/common/cpp/src/configuration.cpp index dd5bf2aff7155d9105c31c8dae3911ad0428eaae..f8bf95aec56fc577499db8eb2decfb70a9c346b6 100644 --- a/components/common/cpp/src/configuration.cpp +++ b/components/common/cpp/src/configuration.cpp @@ -183,14 +183,15 @@ ftl::Configurable *ftl::config::find(const std::string &uri) { void ftl::config::registerConfigurable(ftl::Configurable *cfg) { auto uri = cfg->get<string>("$id"); if (!uri) { - LOG(FATAL) << "Configurable object is missing $id property"; + LOG(ERROR) << "Configurable object is missing $id property: " << cfg->getConfig(); return; } auto ix = config_instance.find(*uri); - if (ix == config_instance.end()) { - LOG(FATAL) << "Attempting to create a duplicate object: " << *uri; + if (ix != config_instance.end()) { + LOG(ERROR) << "Attempting to create a duplicate object: " << *uri; } else { config_instance[*uri] = cfg; + LOG(INFO) << "Registering instance: " << *uri; } } diff --git a/components/control/cpp/include/ftl/master.hpp b/components/control/cpp/include/ftl/master.hpp index e069f12fcdfc813c27db76dbb75795b23f2570b0..e7c8e209e11ca45d42d50dad33109b4d8fb4947d 100644 --- a/components/control/cpp/include/ftl/master.hpp +++ b/components/control/cpp/include/ftl/master.hpp @@ -12,6 +12,7 @@ namespace ftl { namespace ctrl { struct LogEvent { + int verbosity; std::string preamble; std::string message; }; @@ -37,6 +38,8 @@ class Master { std::vector<std::string> getConfigurables(const ftl::UUID &peer); + std::vector<ftl::config::json_t> getSlaves(); + std::vector<ftl::config::json_t> get(const std::string &uri); ftl::config::json_t getOne(const std::string &uri); @@ -53,6 +56,9 @@ class Master { //void onStatus(); // void onChange(); + ftl::net::Universe *getNet() { return net_; } + ftl::Configurable *getRoot() { return root_; } + private: std::vector<std::function<void(const LogEvent&)>> log_handlers_; ftl::Configurable *root_; diff --git a/components/control/cpp/include/ftl/slave.hpp b/components/control/cpp/include/ftl/slave.hpp index 08e973339af0154efb089778c0f38d284b540cdb..d62261df150ce72523910492038bbe3446b94311 100644 --- a/components/control/cpp/include/ftl/slave.hpp +++ b/components/control/cpp/include/ftl/slave.hpp @@ -3,17 +3,38 @@ #include <ftl/net/universe.hpp> #include <ftl/configurable.hpp> +#include <loguru.hpp> +#include <mutex> namespace ftl { namespace ctrl { +/** + * Allows a node to be remote controlled and observed over the network. All + * such nodes should create a single instance of this class, but must call + * "stop()" before terminating the network. + */ class Slave { public: Slave(ftl::net::Universe *, ftl::Configurable *); ~Slave(); + /** + * Clean up to remove log and status forwarding over the network. + */ + void stop(); + + /** + * Do not call! Automatically called from logging subsystem. + */ + void sendLog(const loguru::Message& message); + private: + std::vector<ftl::UUID> log_peers_; ftl::net::Universe *net_; + std::recursive_mutex mutex_; + bool in_log_; + bool active_; }; } diff --git a/components/control/cpp/src/master.cpp b/components/control/cpp/src/master.cpp index 33ea746b7d0364d56c8d1b410386a09c4f3b846a..7bd1f753a988758ac4ad8de6f55af2e0ac8d7746 100644 --- a/components/control/cpp/src/master.cpp +++ b/components/control/cpp/src/master.cpp @@ -11,11 +11,13 @@ using ftl::ctrl::LogEvent; Master::Master(Configurable *root, Universe *net) : root_(root), net_(net) { - net_->bind("log", [this](const std::string &pre, const std::string &msg) { + net->bind("log", [this](int v, const std::string &pre, const std::string &msg) { for (auto f : log_handlers_) { - f({pre,msg}); + f({v,pre,msg}); } }); + + net->broadcast("log_subscribe", net->id()); } Master::~Master() { @@ -46,6 +48,15 @@ void Master::set(const ftl::UUID &peer, const string &uri, json_t &value) { net_->send(peer, "update_cfg", uri, (string)value); } +vector<json_t> Master::getSlaves() { + auto response = net_->findAll<string>("slave_details"); + vector<json_t> result; + for (auto &r : response) { + result.push_back(json_t::parse(r)); + } + return result; +} + vector<string> Master::getConfigurables() { return {}; } diff --git a/components/control/cpp/src/slave.cpp b/components/control/cpp/src/slave.cpp index 076584230b9948eff6c6caaded14438bd718363a..583c16ad45e13e51eb9d4fee4c4626ba301eba50 100644 --- a/components/control/cpp/src/slave.cpp +++ b/components/control/cpp/src/slave.cpp @@ -1,16 +1,19 @@ #include <ftl/slave.hpp> -#include <loguru.hpp> using ftl::Configurable; using ftl::net::Universe; using ftl::ctrl::Slave; +using std::string; +using std::mutex; +using std::unique_lock; +using std::recursive_mutex; static void netLog(void* user_data, const loguru::Message& message) { - Universe *net = (Universe*)user_data; - //net->publish("log", message.preamble, message.message); + Slave *slave = static_cast<Slave*>(user_data); + slave->sendLog(message); } -Slave::Slave(Universe *net, ftl::Configurable *root) : net_(net) { +Slave::Slave(Universe *net, ftl::Configurable *root) : net_(net), in_log_(false), active_(true) { net->bind("restart", []() { LOG(WARNING) << "Remote restart..."; //exit(1); @@ -32,12 +35,52 @@ Slave::Slave(Universe *net, ftl::Configurable *root) : net_(net) { return ftl::config::resolve(uri); }); - loguru::add_callback("net_log", netLog, net, loguru::Verbosity_INFO); + net->bind("slave_details", [net,root]() -> std::vector<std::string> { + ftl::config::json_t json { + {"id", net->id().to_string()}, + {"title", root->value("title", *root->get<string>("$id"))} + }; + return {json.dump()}; + }); + + net->bind("log_subscribe", [this](const ftl::UUID &peer) { + unique_lock<recursive_mutex> lk(mutex_); + log_peers_.push_back(peer); + }); + + loguru::add_callback("net_log", netLog, this, loguru::Verbosity_INFO); } Slave::~Slave() { + stop(); +} + +void Slave::stop() { + if (!active_) return; + active_ = false; + loguru::remove_all_callbacks(); net_->unbind("restart"); net_->unbind("shutdown"); net_->unbind("update_cfg"); net_->unbind("get_cfg"); + net_->unbind("slave_details"); + net_->unbind("log_subscribe"); +} + +void Slave::sendLog(const loguru::Message& message) { + unique_lock<recursive_mutex> lk(mutex_); + if (in_log_) return; + in_log_ = true; + + for (auto &p : log_peers_) { + auto peer = net_->getPeer(p); + if (!peer || !peer->isConnected()) continue; + + std::cout << "sending log to master..." << std::endl; + if (!net_->send(p, "log", message.verbosity, message.preamble, message.message)) { + // TODO(Nick) Remove peer from loggers list... + } + } + + in_log_ = false; } diff --git a/components/net/cpp/include/ftl/net/peer.hpp b/components/net/cpp/include/ftl/net/peer.hpp index cffb6362d291fc333ad341c63b40a86739042de8..3e51b5361eca58d52f4ca180e0813f5a3173ac44 100644 --- a/components/net/cpp/include/ftl/net/peer.hpp +++ b/components/net/cpp/include/ftl/net/peer.hpp @@ -129,6 +129,10 @@ class Peer { /** * Non-blocking Remote Procedure Call using a callback function. * + * @param name RPC Function name. + * @param cb Completion callback. + * @param args A variable number of arguments for RPC function. + * * @return A call id for use with cancelCall() if needed. */ template <typename T, typename... ARGS> @@ -136,6 +140,11 @@ class Peer { std::function<void(const T&)> cb, ARGS... args); + /** + * Used to terminate an async call if the response is not required. + * + * @param id The ID returned by the original asyncCall request. + */ void cancelCall(int id); /** @@ -146,6 +155,11 @@ class Peer { /** * Non-blocking send using RPC function, but with no return value. + * + * @param name RPC Function name + * @param args Variable number of arguments for function + * + * @return Number of bytes sent or -1 if error */ template <typename... ARGS> int send(const std::string &name, ARGS... args); @@ -154,6 +168,9 @@ class Peer { * Bind a function to an RPC call name. Note: if an overriding dispatcher * is used then these bindings will propagate to all peers sharing that * dispatcher. + * + * @param name RPC name to bind to + * @param func A function object to act as callback */ template <typename F> void bind(const std::string &name, F func); @@ -163,6 +180,8 @@ class Peer { void onDisconnect(std::function<void(Peer &)> &f) {} bool isWaiting() const { return is_waiting_; } + + void rawClose() { _badClose(false); } public: static const int kMaxMessage = 10*1024*1024; // 10Mb currently diff --git a/components/net/cpp/include/ftl/net/universe.hpp b/components/net/cpp/include/ftl/net/universe.hpp index 0c02dbe4540d3f876ba709d7a2ee6b479e207c35..5f8207c92715fdc85c892a8781395627f925d5e8 100644 --- a/components/net/cpp/include/ftl/net/universe.hpp +++ b/components/net/cpp/include/ftl/net/universe.hpp @@ -39,6 +39,8 @@ class Universe : public ftl::Configurable { /** * Constructor with json config object. The config allows listening and * peer connection to be set up automatically. + * + * Should be constructed with ftl::create<Universe>(...) and not directly. */ explicit Universe(nlohmann::json &config); @@ -53,6 +55,13 @@ class Universe : public ftl::Configurable { * @param addr URI giving protocol, interface and port */ bool listen(const std::string &addr); + + /** + * Essential to call this before destroying anything that registered + * callbacks or binds for RPC. It will terminate all connections and + * stop any network activity but without deleting the net object. + */ + void shutdown(); /** * Create a new peer connection. @@ -65,6 +74,10 @@ class Universe : public ftl::Configurable { size_t numberOfPeers() const { return peers_.size(); } + /** + * Will block until all currently registered connnections have completed. + * You should not use this, but rather use onConnect. + */ int waitConnections(); Peer *getPeer(const ftl::UUID &pid) const; @@ -312,7 +325,7 @@ template <typename R, typename... ARGS> R Universe::call(const ftl::UUID &pid, const std::string &name, ARGS... args) { Peer *p = getPeer(pid); if (p == nullptr) { - LOG(ERROR) << "Attempting to call an unknown peer : " << pid.to_string(); + DLOG(WARNING) << "Attempting to call an unknown peer : " << pid.to_string(); throw -1; } return p->call<R>(name, args...); @@ -322,7 +335,7 @@ template <typename... ARGS> bool Universe::send(const ftl::UUID &pid, const std::string &name, ARGS... args) { Peer *p = getPeer(pid); if (p == nullptr) { - LOG(ERROR) << "Attempting to call an unknown peer : " << pid.to_string(); + DLOG(WARNING) << "Attempting to call an unknown peer : " << pid.to_string(); throw -1; } return p->send(name, args...) > 0; diff --git a/components/net/cpp/include/ftl/uuid.hpp b/components/net/cpp/include/ftl/uuid.hpp index 5d494b8ddc26b262113b42db0bf1e461d61fc8c9..0b3cb6a1f68bdbd40e494db69bff000dd82b4998 100644 --- a/components/net/cpp/include/ftl/uuid.hpp +++ b/components/net/cpp/include/ftl/uuid.hpp @@ -25,8 +25,17 @@ namespace ftl { uuid_generate(uuid_); #endif } - explicit UUID(int u) { memset(&uuid_,u,16); } - UUID(const ftl::UUID &u) { memcpy(&uuid_,&u.uuid_,16); } + explicit UUID(int u) { memset(uuid_,u,16); } + UUID(const ftl::UUID &u) { memcpy(uuid_,u.uuid_,16); } + explicit UUID(const std::string &s) { +#ifdef WIN32 + // TODO(Nick) Windows UUID parse +#else + if (uuid_parse(s.c_str(), uuid_) < 0) { + memset(uuid_,0,16); + } +#endif + } UUID &operator=(const UUID &u) { memcpy(&uuid_,&u.uuid_,16); return *this; diff --git a/components/net/cpp/src/peer.cpp b/components/net/cpp/src/peer.cpp index 736c80a281095e18c7c535cb1084e5c383e3d3fa..ef9c223d5179d6ea119c9ffcebae5f6cb20da94f 100644 --- a/components/net/cpp/src/peer.cpp +++ b/components/net/cpp/src/peer.cpp @@ -155,6 +155,7 @@ Peer::Peer(SOCKET s, Dispatcher *d) : sock_(s) { bind("__disconnect__", [this]() { _badClose(false); + LOG(INFO) << "Peer elected to disconnect: " << id().to_string(); }); send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, ftl::net::this_peer); @@ -247,6 +248,7 @@ void Peer::close(bool retry) { send("__disconnect__"); _badClose(retry); + LOG(INFO) << "Deliberate disconnect of peer."; } } @@ -259,35 +261,16 @@ void Peer::_badClose(bool retry) { #endif sock_ = INVALID_SOCKET; status_ = kDisconnected; - - // Attempt auto reconnect? - if (retry) LOG(INFO) << "Should attempt reconnect..."; //auto i = find(sockets.begin(),sockets.end(),this); //sockets.erase(i); _trigger(close_handlers_); - } -} -/*void Peer::setProtocol(Protocol *p) { - if (p != NULL) { - if (proto_ == p) return; - if (proto_ && proto_->id() == p->id()) return; - - if (remote_proto_ != "") { - Handshake hs1; - hs1.magic = ftl::net::MAGIC; - //hs1.name_size = 0; - hs1.proto_size = p->id().size(); - send(FTL_PROTOCOL_HS1, hs1, p->id()); - LOG(INFO) << "Handshake initiated with " << uri_; - } - - proto_ = p; - } else { + // Attempt auto reconnect? + if (retry) LOG(INFO) << "Should attempt reconnect..."; } -}*/ +} void Peer::socketError() { int err; @@ -297,6 +280,11 @@ void Peer::socketError() { uint32_t optlen = sizeof(err); #endif getsockopt(sock_, SOL_SOCKET, SO_ERROR, (char*)&err, &optlen); + + // Must close before log since log may try to send over net causing + // more socket errors... + _badClose(); + LOG(ERROR) << "Socket: " << uri_ << " - error " << err; } @@ -405,6 +393,8 @@ void Peer::_connected() { } int Peer::_send() { + if (sock_ == INVALID_SOCKET) return -1; + // Are we using a websocket? if (scheme_ == ftl::URI::SCHEME_WS) { // Create a websocket header as well. @@ -443,7 +433,7 @@ int Peer::_send() { // We are blocking, so -1 should mean actual error if (c == -1) { socketError(); - _badClose(); + //_badClose(); } return c; @@ -453,6 +443,7 @@ Peer::~Peer() { std::unique_lock<std::mutex> lk1(send_mtx_); std::unique_lock<std::mutex> lk2(recv_mtx_); _badClose(false); + LOG(INFO) << "Deleting peer object"; delete disp_; } diff --git a/components/net/cpp/src/universe.cpp b/components/net/cpp/src/universe.cpp index 750258c3600f9b51ca39cd1ed08ad6d6a9bb6be8..53600416b27f419a5a1bacd221ddfc947488de9e 100644 --- a/components/net/cpp/src/universe.cpp +++ b/components/net/cpp/src/universe.cpp @@ -51,13 +51,18 @@ Universe::Universe(nlohmann::json &config) : } Universe::~Universe() { + shutdown(); +} + +void Universe::shutdown() { + if (!active_) return; LOG(INFO) << "Cleanup Network ..."; active_ = false; thread_.join(); for (auto s : peers_) { - s->close(); + s->rawClose(); } peers_.clear(); diff --git a/components/renderers/cpp/src/display.cpp b/components/renderers/cpp/src/display.cpp index 65c8b9eabfd770c39055fb73197dd61c3fc01c4f..2911c9ea6e38610e4fd123a1208c5cd61783c2d9 100644 --- a/components/renderers/cpp/src/display.cpp +++ b/components/renderers/cpp/src/display.cpp @@ -18,7 +18,7 @@ Display::Display(nlohmann::json &config, std::string name) : ftl::Configurable(c window_->setBackgroundColor(cv::viz::Color::white()); #endif // HAVE_VIZ - cv::namedWindow("Image", cv::WINDOW_KEEPRATIO); + //cv::namedWindow("Image", cv::WINDOW_KEEPRATIO); #if defined HAVE_PCL if (config.value("points", false)) { diff --git a/components/renderers/cpp/src/rgbd_display.cpp b/components/renderers/cpp/src/rgbd_display.cpp index 478ba958d763fa4b955c0a3b39efce9823cae49a..e5ae0ed765a05a34152f32cd8f5a6c049f352900 100644 --- a/components/renderers/cpp/src/rgbd_display.cpp +++ b/components/renderers/cpp/src/rgbd_display.cpp @@ -119,10 +119,10 @@ void Display::update() { centre_ += (lookPoint_ - centre_) * (lerpSpeed_ * 0.1f); Eigen::Matrix4f viewPose = lookAt<float>(eye_,centre_,up_).inverse(); - source_->setPose(viewPose); + //source_->setPose(viewPose); Mat rgb, depth; - source_->grab(); + //source_->grab(); source_->getRGBD(rgb, depth); if (rgb.rows > 0) cv::imshow(name_, rgb); wait(1); diff --git a/components/rgbd-sources/include/ftl/net_source.hpp b/components/rgbd-sources/include/ftl/net_source.hpp index cf56f6b5aae13d8ca3816e9197120654f8a1e435..2693128d275e3e4f7ad317e8d93b68dd5f096aa1 100644 --- a/components/rgbd-sources/include/ftl/net_source.hpp +++ b/components/rgbd-sources/include/ftl/net_source.hpp @@ -28,13 +28,18 @@ class NetSource : public RGBDSource { return new NetSource(config, net); } + void setPose(const Eigen::Matrix4f &pose); + private: bool has_calibration_; ftl::UUID peer_; int N_; + bool active_; + std::string uri_; bool _getCalibration(ftl::net::Universe &net, const ftl::UUID &peer, const std::string &src, ftl::rgbd::CameraParameters &p); void _recv(const std::vector<unsigned char> &jpg, const std::vector<unsigned char> &d); + void _updateURI(); }; } diff --git a/components/rgbd-sources/include/ftl/rgbd_streamer.hpp b/components/rgbd-sources/include/ftl/rgbd_streamer.hpp index 4012f7ee85cd4a90b6b46e05247a19da2ec210bf..bbe9449ec91cfb2926ffebf4bd501e664d757cb5 100644 --- a/components/rgbd-sources/include/ftl/rgbd_streamer.hpp +++ b/components/rgbd-sources/include/ftl/rgbd_streamer.hpp @@ -38,20 +38,59 @@ struct StreamSource { } +/** + * The maximum number of frames a client can request in a single request. + */ static const int kMaxFrames = 25; +/** + * Allows network streaming of a number of RGB-Depth sources. Remote machines + * can discover available streams from an instance of Streamer. It also allows + * for adaptive bitrate streams where bandwidth can be monitored and different + * data rates can be requested, it is up to the remote machine to decide on + * desired bitrate. + * + * The capture and compression steps operate in different threads and each + * source and bitrate also operate on different threads. For a specific source + * and specific bitrate there is a single thread that sends data to all + * requesting clients. + * + * Use ftl::create<Streamer>(parent, name) to construct, don't construct + * directly. + * + * Note that the streamer attempts to maintain a fixed frame rate by + * monitoring job processing times and sleeping if there is spare time. + */ class Streamer : public ftl::Configurable { public: Streamer(nlohmann::json &config, ftl::net::Universe *net); ~Streamer(); + /** + * Add an RGB-Depth source to be made available for streaming. + */ void add(RGBDSource *); + void remove(RGBDSource *); void remove(const std::string &); + /** + * Enable the streaming. This creates the threads, and if block is false + * then another thread will manage the stream process. + */ void run(bool block=false); + + /** + * Terminate all streaming and join the threads. + */ void stop(); + /** + * Alternative to calling run(), it will operate a single frame capture, + * compress and stream cycle. + */ + void poll(); + RGBDSource *get(const std::string &uri); private: @@ -60,6 +99,7 @@ class Streamer : public ftl::Configurable { std::shared_mutex mutex_; bool active_; ftl::net::Universe *net_; + bool late_; void _schedule(); void _swap(detail::StreamSource &); diff --git a/components/rgbd-sources/src/net_source.cpp b/components/rgbd-sources/src/net_source.cpp index cc2e8461af7c61effa56fafc20d19e2cc840d39b..8c0187f039e2a60d2a0b5eb041d3efd1464a99f5 100644 --- a/components/rgbd-sources/src/net_source.cpp +++ b/components/rgbd-sources/src/net_source.cpp @@ -15,23 +15,28 @@ using std::this_thread::sleep_for; using std::chrono::milliseconds; bool NetSource::_getCalibration(Universe &net, const UUID &peer, const string &src, ftl::rgbd::CameraParameters &p) { - while(true) { - auto buf = net.call<vector<unsigned char>>(peer_, "source_calibration", src); - if (buf.size() > 0) { - memcpy((char*)&p, buf.data(), buf.size()); - - if (sizeof(p) != buf.size()) { - LOG(ERROR) << "Corrupted calibration"; - return false; + try { + while(true) { + auto buf = net.call<vector<unsigned char>>(peer_, "source_calibration", src); + + if (buf.size() > 0) { + memcpy((char*)&p, buf.data(), buf.size()); + + if (sizeof(p) != buf.size()) { + LOG(ERROR) << "Corrupted calibration"; + return false; + } + + LOG(INFO) << "Calibration received: " << p.cx << ", " << p.cy << ", " << p.fx << ", " << p.fy; + + return true; + } else { + LOG(INFO) << "Could not get calibration, retrying"; + sleep_for(milliseconds(500)); } - - LOG(INFO) << "Calibration received: " << p.cx << ", " << p.cy << ", " << p.fx << ", " << p.fy; - - return true; - } else { - LOG(INFO) << "Could not get calibration, retrying"; - sleep_for(milliseconds(500)); } + } catch (...) { + return false; } } @@ -40,39 +45,18 @@ NetSource::NetSource(nlohmann::json &config) : RGBDSource(config) { } NetSource::NetSource(nlohmann::json &config, ftl::net::Universe *net) - : RGBDSource(config, net) { - - auto uri = get<string>("uri"); - if (!uri) { - LOG(ERROR) << "NetSource does not have a URI"; - return; - } - auto p = net->findOne<ftl::UUID>("find_stream", *uri); - if (!p) { - LOG(ERROR) << "Could not find stream: " << *uri; - return; - } - peer_ = *p; + : RGBDSource(config, net), active_(false) { - has_calibration_ = _getCalibration(*net, peer_, *uri, params_); - - net->bind(*uri, [this](const vector<unsigned char> &jpg, const vector<unsigned char> &d) { - unique_lock<mutex> lk(mutex_); - _recv(jpg, d); + on("uri", [this](const config::Event &e) { + _updateURI(); }); - N_ = 10; - - // Initiate stream with request for first 10 frames - net->send(peer_, "get_stream", *uri, 10, 0, net->id(), *uri); + _updateURI(); } NetSource::~NetSource() { - auto uri = get<string>("uri"); - - // TODO(Nick) If URI changes then must unbind + rebind. - if (uri) { - net_->unbind(*uri); + if (uri_.size() > 0) { + net_->unbind(uri_); } } @@ -89,6 +73,59 @@ void NetSource::_recv(const vector<unsigned char> &jpg, const vector<unsigned ch } } +void NetSource::setPose(const Eigen::Matrix4f &pose) { + if (!active_) return; + + vector<unsigned char> vec((unsigned char*)pose.data(), (unsigned char*)(pose.data()+(pose.size()))); + try { + net_->send(peer_, "set_pose", *get<string>("uri"), vec); + } catch (...) { + + } + RGBDSource::setPose(pose); +} + +void NetSource::_updateURI() { + active_ = false; + auto uri = get<string>("uri"); + + // TODO(Nick) If URI changes then must unbind + rebind. + if (uri_.size() > 0) { + net_->unbind(uri_); + } + + if (uri) { + auto p = net_->findOne<ftl::UUID>("find_stream", *uri); + if (!p) { + LOG(ERROR) << "Could not find stream: " << *uri; + return; + } + peer_ = *p; + + has_calibration_ = _getCalibration(*net_, peer_, *uri, params_); + + net_->bind(*uri, [this](const vector<unsigned char> &jpg, const vector<unsigned char> &d) { + unique_lock<mutex> lk(mutex_); + _recv(jpg, d); + }); + + N_ = 10; + + // Initiate stream with request for first 10 frames + try { + net_->send(peer_, "get_stream", *uri, 10, 0, net_->id(), *uri); + } catch(...) { + LOG(ERROR) << "Could not connect to stream " << *uri; + } + + uri_ = *uri; + active_ = true; + } else { + uri_ = ""; + LOG(WARNING) << "NetSource URI is missing"; + } +} + void NetSource::grab() { // net_.broadcast("grab"); } diff --git a/components/rgbd-sources/src/rgbd_source.cpp b/components/rgbd-sources/src/rgbd_source.cpp index a216529aa9f5d3d1eaa30c8bb43fde10e909a7be..d796c02e9bb90d72bdd67d52334e5ad079d88abd 100644 --- a/components/rgbd-sources/src/rgbd_source.cpp +++ b/components/rgbd-sources/src/rgbd_source.cpp @@ -24,6 +24,10 @@ bool RGBDSource::isReady() { return false; } +/*void RGBDSource::setURI(const std::string &uri) { + config_["uri"] = uri; +}*/ + void RGBDSource::getRGBD(cv::Mat &rgb, cv::Mat &depth) { unique_lock<mutex> lk(mutex_); rgb_.copyTo(rgb); diff --git a/components/rgbd-sources/src/rgbd_streamer.cpp b/components/rgbd-sources/src/rgbd_streamer.cpp index 5646495e84274897b55181223cf306f008f466e0..9ab2d184fbe025950f13dfde7f2879a5204593ce 100644 --- a/components/rgbd-sources/src/rgbd_streamer.cpp +++ b/components/rgbd-sources/src/rgbd_streamer.cpp @@ -24,14 +24,16 @@ using std::chrono::milliseconds; #define THREAD_POOL_SIZE 6 Streamer::Streamer(nlohmann::json &config, Universe *net) - : ftl::Configurable(config), pool_(THREAD_POOL_SIZE) { + : ftl::Configurable(config), pool_(THREAD_POOL_SIZE), late_(false) { active_ = false; net_ = net; net->bind("find_stream", [this](const std::string &uri) -> optional<UUID> { - if (sources_.find(uri) != sources_.end()) return net_->id(); - else return {}; + if (sources_.find(uri) != sources_.end()) { + LOG(INFO) << "Valid source request received: " << uri; + return net_->id(); + } else return {}; }); net->bind("list_streams", [this]() -> vector<string> { @@ -42,6 +44,16 @@ Streamer::Streamer(nlohmann::json &config, Universe *net) return streams; }); + net->bind("set_pose", [this](const std::string &uri, const std::vector<unsigned char> &buf) { + shared_lock<shared_mutex> slk(mutex_); + + if (sources_.find(uri) != sources_.end()) { + Eigen::Matrix4f pose; + memcpy(pose.data(), buf.data(), buf.size()); + sources_[uri]->src->setPose(pose); + } + }); + // Allow remote users to access camera calibration matrix net->bind("source_calibration", [this](const std::string &uri) -> vector<unsigned char> { vector<unsigned char> buf; @@ -82,7 +94,7 @@ void Streamer::add(RGBDSource *src) { unique_lock<shared_mutex> ulk(mutex_); if (sources_.find(src->getURI()) != sources_.end()) return; - StreamSource *s = new StreamSource; // = sources_.emplace(std::make_pair<std::string,StreamSource>(src->getURI(),{})); + StreamSource *s = new StreamSource; s->src = src; s->state = 0; sources_[src->getURI()] = s; @@ -121,42 +133,36 @@ void Streamer::stop() { pool_.stop(); } +void Streamer::poll() { + double wait = 1.0f / 25.0f; + auto start = std::chrono::high_resolution_clock::now(); + // Create frame jobs at correct FPS interval + _schedule(); + + std::chrono::duration<double> elapsed = + std::chrono::high_resolution_clock::now() - start; + + if (elapsed.count() >= wait) { + LOG(WARNING) << "Frame rate below optimal @ " << (1.0f / elapsed.count()); + } else { + // Otherwise, wait until next frame should start. + // CHECK(Nick) Is this accurate enough? + sleep_for(milliseconds((long long)((wait - elapsed.count()) * 1000.0f))); + } +} + void Streamer::run(bool block) { active_ = true; if (block) { - while (active_) { - double wait = 1.0f / 25.0f; - auto start = std::chrono::high_resolution_clock::now(); - // Create frame jobs at correct FPS interval - _schedule(); - - std::chrono::duration<double> elapsed = - std::chrono::high_resolution_clock::now() - start; - - if (elapsed.count() >= wait) { - LOG(WARNING) << "Frame rate below optimal @ " << (1.0f / elapsed.count()); - } else { - sleep_for(milliseconds((long long)((wait - elapsed.count()) * 1000.0f))); - } + while (ftl::running && active_) { + poll(); } } else { // Create thread job for frame ticking pool_.push([this](int id) { - while (active_) { - double wait = 1.0f / 25.0f; - auto start = std::chrono::high_resolution_clock::now(); - // Create frame jobs at correct FPS interval - _schedule(); - - std::chrono::duration<double> elapsed = - std::chrono::high_resolution_clock::now() - start; - - if (elapsed.count() >= wait) { - LOG(WARNING) << "Frame rate below optimal @ " << (1.0f / elapsed.count()); - } else { - sleep_for(milliseconds((long long)((wait - elapsed.count()) * 1000.0f))); - } + while (ftl::running && active_) { + poll(); } }); } @@ -178,16 +184,23 @@ void Streamer::_schedule() { string uri = s.first; shared_lock<shared_mutex> slk(s.second->mutex); + // CHECK Should never be true now if (s.second->state != 0) { - LOG(WARNING) << "Stream not ready to schedule on time: " << uri << " (" << s.second->state << ")"; + if (!late_) LOG(WARNING) << "Stream not ready to schedule on time: " << uri; + late_ = true; continue; + } else { + late_ = false; } + + // No point in doing work if no clients if (s.second->clients[0].size() == 0) { //LOG(ERROR) << "Stream has no clients: " << uri; continue; } slk.unlock(); + // There will be two jobs for this source... unique_lock<mutex> lk(job_mtx); jobs += 2; lk.unlock(); @@ -207,10 +220,12 @@ void Streamer::_schedule() { LOG(INFO) << "GRAB Elapsed: " << elapsed.count();*/ unique_lock<shared_mutex> lk(src->mutex); + //LOG(INFO) << "Grab frame"; src->state |= ftl::rgbd::detail::kGrabbed; _swap(*src); lk.unlock(); + // Mark job as finished unique_lock<mutex> ulk(job_mtx); jobs--; ulk.unlock(); @@ -222,9 +237,8 @@ void Streamer::_schedule() { pool_.push([this,uri,&jobs,&job_mtx,&job_cv](int id) { StreamSource *src = sources_[uri]; - //auto start = std::chrono::high_resolution_clock::now(); - - if (src->rgb.rows > 0 && src->depth.rows > 0 && src->clients[0].size() > 0) { + try { + if (src && src->rgb.rows > 0 && src->depth.rows > 0 && src->clients[0].size() > 0) { vector<unsigned char> rgb_buf; cv::imencode(".jpg", src->rgb, rgb_buf); @@ -252,17 +266,21 @@ void Streamer::_schedule() { } } } + } catch(...) { + LOG(ERROR) << "Error in transmission loop"; + } /*std::chrono::duration<double> elapsed = std::chrono::high_resolution_clock::now() - start; LOG(INFO) << "Stream Elapsed: " << elapsed.count();*/ unique_lock<shared_mutex> lk(src->mutex); - DLOG(1) << "Tx Frame: " << uri; + DLOG(2) << "Tx Frame: " << uri; src->state |= ftl::rgbd::detail::kTransmitted; _swap(*src); lk.unlock(); + // Mark job as finished unique_lock<mutex> ulk(job_mtx); jobs--; ulk.unlock(); @@ -270,6 +288,7 @@ void Streamer::_schedule() { }); } + // Wait for all jobs to complete before finishing frame unique_lock<mutex> lk(job_mtx); job_cv.wait(lk, [&jobs]{ return jobs == 0; }); } diff --git a/config/config.jsonc b/config/config.jsonc index 95fc7fde61d4bfe83c386b1f442ee5f310824b84..af02d368406353787a1324a2c1ddd29477ae7360 100644 --- a/config/config.jsonc +++ b/config/config.jsonc @@ -183,10 +183,24 @@ }, - "gui_default": { + "gui_node5": { "net": { "peers": ["tcp://ftl-node-5:9001"] }, "sources": [{"type": "net", "uri": "ftl://utu.fi/node5#vision_default/source"}] + }, + + "gui_node4": { + "net": { + "peers": ["tcp://ftl-node-4:9001"] + }, + "sources": [{"type": "net", "uri": "ftl://utu.fi/node4#vision_default/source"}] + }, + + "gui_default": { + "net": { + "peers": ["tcp://localhost:9001"] + }, + "sources": [{"type": "net", "uri": "ftl://utu.fi#vision_default/source"}] } }