From 23390a4c8b958f56ef840f03e0add0e20f0a298a Mon Sep 17 00:00:00 2001 From: Nicolas Pope <nwpope@utu.fi> Date: Mon, 27 Jul 2020 09:51:44 +0300 Subject: [PATCH] WIP new reconstruct using feed --- CMakeLists.txt | 2 +- applications/reconstruct2/CMakeLists.txt | 21 ++++ applications/reconstruct2/src/main.cpp | 98 +++++++++++++++++++ components/common/cpp/include/ftl/uri.hpp | 2 +- components/common/cpp/src/uri.cpp | 2 +- .../streams/include/ftl/streams/feed.hpp | 4 +- components/streams/src/feed.cpp | 46 ++++++--- 7 files changed, 158 insertions(+), 17 deletions(-) create mode 100644 applications/reconstruct2/CMakeLists.txt create mode 100644 applications/reconstruct2/src/main.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 8e1fa5a66..25fd1d0b5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -481,7 +481,7 @@ if (BUILD_VISION) endif() if (BUILD_RECONSTRUCT) - #add_subdirectory(applications/reconstruct) + add_subdirectory(applications/reconstruct2) endif() if (HAVE_NANOGUI) diff --git a/applications/reconstruct2/CMakeLists.txt b/applications/reconstruct2/CMakeLists.txt new file mode 100644 index 000000000..a001f2553 --- /dev/null +++ b/applications/reconstruct2/CMakeLists.txt @@ -0,0 +1,21 @@ +# Need to include staged files and libs +#include_directories(${PROJECT_SOURCE_DIR}/reconstruct/include) +#include_directories(${PROJECT_BINARY_DIR}) + +set(REPSRC + src/main.cpp +) + +add_executable(ftl-reconstruct2 ${REPSRC}) + +#target_include_directories(ftl-reconstruct PUBLIC +# $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> +# $<INSTALL_INTERFACE:include> +# PRIVATE src) + +if (CUDA_FOUND) +set_property(TARGET ftl-reconstruct2 PROPERTY CUDA_SEPARABLE_COMPILATION ON) +endif() + +#target_include_directories(cv-node PUBLIC ${PROJECT_SOURCE_DIR}/include) +target_link_libraries(ftl-reconstruct2 ftlcommon ftlrgbd Threads::Threads ${OpenCV_LIBS} ftlctrl ftlnet ftlrender ftloperators ftlstreams ftlaudio) diff --git a/applications/reconstruct2/src/main.cpp b/applications/reconstruct2/src/main.cpp new file mode 100644 index 000000000..8efa92d17 --- /dev/null +++ b/applications/reconstruct2/src/main.cpp @@ -0,0 +1,98 @@ +#include <ftl/configuration.hpp> +#include <ftl/net.hpp> +#include <ftl/streams/feed.hpp> +#include <ftl/master.hpp> +#include <nlohmann/json.hpp> +#include <loguru.hpp> + +using ftl::net::Universe; +using ftl::stream::Feed; +using ftl::codecs::Channel; +using std::vector; +using std::string; + +static void threadSetCUDADevice() { + // Ensure all threads have correct cuda device + std::atomic<int> ijobs = 0; + for (int i=0; i<ftl::pool.size(); ++i) { + ftl::pool.push([&ijobs](int id) { + ftl::cuda::setDevice(); + ++ijobs; + while (ijobs < ftl::pool.size()) std::this_thread::sleep_for(std::chrono::milliseconds(10)); + }); + } + while (ijobs < ftl::pool.size()) std::this_thread::sleep_for(std::chrono::milliseconds(10)); +} + +static void run(ftl::Configurable *root) { + // Use other GPU if available. + ftl::cuda::setDevice(ftl::cuda::deviceCount()-1); + threadSetCUDADevice(); + ftl::timer::setClockSlave(false); + ftl::timer::setHighPrecision(true); + + Universe *net = ftl::create<Universe>(root, "net"); + ftl::ctrl::Master ctrl(root, net); + + net->start(); + net->waitConnections(); + + Feed *feed = ftl::create<Feed>(root, "feed", net); + std::string group_name = root->value("group", std::string("Reconstruction")); + + // Add sources here + if (root->getConfig().contains("sources")) { + for (const auto &s : root->getConfig()["sources"]) { + ftl::URI uri(s); + uri.setAttribute("group", group_name); + feed->add(uri); + } + } + + // Add sources from command line as well + auto paths = root->get<vector<string>>("paths"); + string file = ""; + + for (auto &x : *paths) { + if (x != "") { + ftl::URI uri(x); + uri.setAttribute("group", group_name); + feed->add(uri); + } + } + + auto *filter = feed->filter({Channel::Colour, Channel::Depth}); + feed->startStreaming(filter); + + // Just do whatever jobs are available + while (ftl::running) { + auto f = ftl::pool.pop(); + if (f) { + f(-1); + } else { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + } + + feed->stopRecording(); + feed->removeFilter(filter); + + net->shutdown(); + LOG(INFO) << "Stopping..."; + ftl::timer::stop(true); + ftl::pool.stop(true); + LOG(INFO) << "All threads stopped."; + + delete feed; + delete net; + delete root; +} + +int main(int argc, char **argv) { + run(ftl::configure(argc, argv, "reconstruction_default")); + + // Save config changes and delete final objects + ftl::config::cleanup(); + + return ftl::exit_code; +} diff --git a/components/common/cpp/include/ftl/uri.hpp b/components/common/cpp/include/ftl/uri.hpp index 49bf3e653..455d4f845 100644 --- a/components/common/cpp/include/ftl/uri.hpp +++ b/components/common/cpp/include/ftl/uri.hpp @@ -73,7 +73,7 @@ namespace ftl { std::string to_string() const; - void to_json(nlohmann::json &); + void to_json(nlohmann::json &) const; private: void _parse(uri_t puri); diff --git a/components/common/cpp/src/uri.cpp b/components/common/cpp/src/uri.cpp index db5bb12d8..8885078d8 100644 --- a/components/common/cpp/src/uri.cpp +++ b/components/common/cpp/src/uri.cpp @@ -214,7 +214,7 @@ void URI::setAttribute(const string &key, int value) { m_qmap[key] = std::to_string(value); } -void URI::to_json(nlohmann::json &json) { +void URI::to_json(nlohmann::json &json) const { std::string uri = to_string(); if (m_frag.size() > 0) uri += std::string("#") + getFragment(); diff --git a/components/streams/include/ftl/streams/feed.hpp b/components/streams/include/ftl/streams/feed.hpp index c113d6fcf..70e5f8fb7 100644 --- a/components/streams/include/ftl/streams/feed.hpp +++ b/components/streams/include/ftl/streams/feed.hpp @@ -203,8 +203,8 @@ private: void _saveThumbnail(const ftl::data::FrameSetPtr& fs); /** callback for network (adds new sorces on connect/...) */ - void _updateNetSources(ftl::net::Peer *p); - void _updateNetSources(ftl::net::Peer *p, const std::string &uri); + void _updateNetSources(ftl::net::Peer *p, bool autoadd=false); + void _updateNetSources(ftl::net::Peer *p, const std::string &uri, bool autoadd=false); /** select channels and sources based on current filters_; */ void select(); diff --git a/components/streams/src/feed.cpp b/components/streams/src/feed.cpp index 81f6881b9..af7d513d1 100644 --- a/components/streams/src/feed.cpp +++ b/components/streams/src/feed.cpp @@ -476,7 +476,7 @@ void Feed::removeFilter(Feed::Filter* filter) { } } -void Feed::_updateNetSources(ftl::net::Peer *p, const std::string &s) { +void Feed::_updateNetSources(ftl::net::Peer *p, const std::string &s, bool autoadd) { UNIQUE_LOCK(mtx_, lk); netcams_.push_back(s); @@ -485,12 +485,16 @@ void Feed::_updateNetSources(ftl::net::Peer *p, const std::string &s) { ftl::URI uri(s); _add_recent_source(uri)["host"] = p->getURI(); + if (autoadd || value("auto_host_sources", false)) { + add(uri); + } + ftl::pool.push([this](int id) { new_sources_cb_.trigger(0); }); } -void Feed::_updateNetSources(ftl::net::Peer *p) { +void Feed::_updateNetSources(ftl::net::Peer *p, bool autoadd) { //auto netcams = // net_->findAll<std::string>("list_streams"); @@ -503,6 +507,11 @@ void Feed::_updateNetSources(ftl::net::Peer *p) { for (const auto &s : peerstreams) { ftl::URI uri(s); _add_recent_source(uri)["host"] = p->getURI(); + + if (autoadd || value("auto_host_sources", false)) { + LOG(INFO) << "AUTO ADD: " << s; + ftl::pool.push([this, uri](int id) { add(uri); }); + } } /*if (value("auto_host_sources", false)) { @@ -770,11 +779,14 @@ void Feed::add(uint32_t fsid, const std::string &uri, ftl::stream::Stream* strea stream_->select(fsid, {Channel::Colour}, true); } - uint32_t Feed::add(const std::string &path) { - UNIQUE_LOCK(mtx_, lk); ftl::URI uri(path); + return add(uri); +} +uint32_t Feed::add(const ftl::URI &uri) { + UNIQUE_LOCK(mtx_, lk); + //if (!uri.isValid()) throw FTL_Error("Invalid URI: " << path); if (fsid_lookup_.count(uri.getBaseURI()) > 0) return fsid_lookup_[uri.getBaseURI()]; @@ -785,11 +797,11 @@ uint32_t Feed::add(const std::string &path) { if ((scheme == ftl::URI::SCHEME_OTHER) || // assumes relative path (scheme == ftl::URI::SCHEME_FILE)) { - auto eix = ((scheme == ftl::URI::SCHEME_OTHER) ? path : uri.getPath()).find_last_of('.'); - auto ext = ((scheme == ftl::URI::SCHEME_OTHER) ? path : uri.getPath()).substr(eix+1); + auto eix = ((scheme == ftl::URI::SCHEME_OTHER) ? uri.getBaseURI() : uri.getPath()).find_last_of('.'); + auto ext = ((scheme == ftl::URI::SCHEME_OTHER) ? uri.getBaseURI() : uri.getPath()).substr(eix+1); if (ext != "ftl") { - throw FTL_Error("Bad filename (expects .ftl) : " << path); + throw FTL_Error("Bad filename (expects .ftl) : " << uri.getBaseURI()); } const int fsid = allocateFrameSetId(group); @@ -797,7 +809,7 @@ uint32_t Feed::add(const std::string &path) { (this, std::string("ftlfile-") + std::to_string(file_counter_++)); if (scheme == ftl::URI::SCHEME_OTHER) { - fstream->set("filename", path); + fstream->set("filename", uri.getBaseURI()); } else { // possible BUG: uri.getPath() might return (wrong) absolute paths @@ -809,7 +821,7 @@ uint32_t Feed::add(const std::string &path) { #endif } - fstream->set("uri", path); + fstream->set("uri", uri.to_string()); auto &recent_files = getConfig()["recent_files"]; auto &file_details = recent_files[uri.getBaseURI()]; @@ -889,7 +901,14 @@ uint32_t Feed::add(const std::string &path) { auto &host_details = known_hosts[uri.getBaseURIWithUser()]; host_details["last_open"] = ftl::timer::get_time(); - ftl::pool.push([this,path](int id) { net_->connect(path)->noReconnect(); }); + if (uri.getPathLength() == 1 && uri.getPathSegment(0) == "*") { + auto *p = net_->connect(uri.getBaseURIWithUser()); + if (p->waitConnection()) { + ftl::pool.push([this,p](int id) {_updateNetSources(p, true); }); + } + } else { + ftl::pool.push([this,path = uri.getBaseURIWithUser()](int id) { net_->connect(path)->noReconnect(); }); + } } else if (scheme == ftl::URI::SCHEME_FTL) { @@ -900,10 +919,11 @@ uint32_t Feed::add(const std::string &path) { auto *p = net_->connect(details["host"].get<std::string>()); p->noReconnect(); if (!p->waitConnection()) { - throw FTL_Error("Could not connect to host " << details["host"].get<std::string>() << " for stream " << path); + throw FTL_Error("Could not connect to host " << details["host"].get<std::string>() << " for stream " << uri.getBaseURI()); } } else { // See if it can otherwise be found? + LOG(WARNING) << "Could not find stream host"; } auto *stream = ftl::create<ftl::stream::Net> @@ -912,7 +932,7 @@ uint32_t Feed::add(const std::string &path) { int fsid = allocateFrameSetId(group); - stream->set("uri", path); + stream->set("uri", uri.to_string()); add(fsid, uri.getBaseURI(), stream); LOG(INFO) << "Add Stream: " @@ -1061,6 +1081,8 @@ void Feed::_beginRecord(Filter *f) { handle_record_ = f->onWithHandle([this, f](const ftl::data::FrameSetPtr &fs) { record_stream_->select(fs->frameset(), f->channels(), true); + LOG(INFO) << "GOT FRAMESET TO STREAM: " << fs->timestamp(); + for (auto c : f->channels()) { if (fs->hasAnyChanged(c)) recorder_->post(*fs.get(), c); } -- GitLab