diff --git a/applications/reconstruct/src/main.cpp b/applications/reconstruct/src/main.cpp index 9bcd659793fedd7ed2532448acd347498c107e80..18ed5db3cefd62b2414552b68b4dccf22b015fd1 100644 --- a/applications/reconstruct/src/main.cpp +++ b/applications/reconstruct/src/main.cpp @@ -437,7 +437,7 @@ static void run() { active = 0; if (!paused) { - net.broadcast("grab"); // To sync cameras + //net.broadcast("grab"); // To sync cameras scene.nextFrame(); for (size_t i = 0; i < inputs.size(); i++) { diff --git a/applications/vision/src/main.cpp b/applications/vision/src/main.cpp index 35fa22bd125af31a29acc09a55a7257f29548de4..99c6937725e5d418802f59fa0b4134ab0f406da0 100644 --- a/applications/vision/src/main.cpp +++ b/applications/vision/src/main.cpp @@ -21,7 +21,7 @@ #include <ftl/rgbd.hpp> #include <ftl/middlebury.hpp> #include <ftl/display.hpp> -#include <ftl/streamer.hpp> +#include <ftl/rgbd_streamer.hpp> #include <ftl/net/universe.hpp> #include <nlohmann/json.hpp> @@ -38,7 +38,7 @@ using ftl::rgbd::RGBDSource; using ftl::rgbd::CameraParameters; using ftl::rgbd::StereoVideoSource; using ftl::Display; -using ftl::Streamer; +using ftl::rgbd::Streamer; using ftl::net::Universe; using std::string; using std::vector; @@ -86,109 +86,21 @@ static void run(const string &file) { StereoVideoSource *source = nullptr; source = new StereoVideoSource(config, file); - // Allow remote users to access camera calibration matrix - net.bind(string("ftl://utu.fi/")+(string)config["stream"]["name"]+string("/rgb-d/calibration"), [source]() -> vector<unsigned char> { - vector<unsigned char> buf; - buf.resize(sizeof(CameraParameters)); - LOG(INFO) << "Calib buf size = " << buf.size(); - memcpy(buf.data(), &source->getParameters(), buf.size()); - return buf; - }); - - Mat rgb, depth; - bool grabbed = false; - mutex datam; - condition_variable datacv; - - // Wait for grab message to sync camera capture - net.bind("grab", [&source,&datam,&datacv,&grabbed]() -> void { - unique_lock<mutex> datalk(datam); - if (grabbed) return; - source->grab(); - grabbed = true; - datacv.notify_one(); - }); - - Mat prgb, pdepth; - vector<unsigned char> rgb_buf; - vector<unsigned char> d_buf; - string uri = string("ftl://utu.fi/")+(string)config["stream"]["name"]+string("/rgb-d"); - Display display(config["display"], "local"); - Streamer stream(net, config["stream"]); - - LOG(INFO) << "Beginning capture..."; + Streamer stream(config["stream"], &net); + stream.add(source); + stream.run(); while (display.active()) { - mutex m; - condition_variable cv; - int jobs = 0; - - // Fake grab if no peers to allow visualisation locally - if (net.numberOfPeers() == 0) { - grabbed = true; - source->grab(); - } - - // Pipeline for disparity - pool.push([&](int id) { - // Wait for image grab - unique_lock<mutex> datalk(datam); - datacv.wait(datalk, [&grabbed](){ return grabbed; }); - grabbed = false; - - auto start = std::chrono::high_resolution_clock::now(); - source->getRGBD(rgb, depth); - datalk.unlock(); - - unique_lock<mutex> lk(m); - jobs++; - lk.unlock(); - cv.notify_one(); - - std::chrono::duration<double> elapsed = - std::chrono::high_resolution_clock::now() - start; - //LOG(INFO) << "Disparity in " << elapsed.count() << "s"; - }); - - // Pipeline for stream compression - pool.push([&](int id) { - auto start = std::chrono::high_resolution_clock::now(); - if (prgb.rows != 0) stream.send(prgb, pdepth); - unique_lock<mutex> lk(m); - jobs++; - lk.unlock(); - cv.notify_one(); - - std::chrono::duration<double> elapsed = - std::chrono::high_resolution_clock::now() - start; - //LOG(INFO) << "Stream in " << elapsed.count() << "s"; - }); - - // Send RGB+Depth images for local rendering - if (prgb.rows > 0) display.render(prgb, pdepth, source->getParameters()); - if (config["display"]["right"]) { - Mat rgbr = source->getRight().clone(); - cv::namedWindow("Right: ", cv::WINDOW_KEEPRATIO); - cv::line(rgbr, cv::Point(0, rgbr.rows/2), cv::Point(rgbr.cols-1, rgbr.rows/2), cv::Scalar(0,0,255), 1); - cv::line(rgbr, cv::Point(rgbr.cols/2, 0), cv::Point(rgbr.cols/2, rgbr.rows-1), cv::Scalar(0,0,255), 1); - cv::imshow("Right: ", rgbr); - } - - display.wait(1); - - // Wait for both pipelines to complete - unique_lock<mutex> lk(m); - cv.wait(lk, [&jobs]{return jobs == 2;}); - - // Store previous frame for next stage compression - rgb.copyTo(prgb); - depth.copyTo(pdepth); - - //net.publish(uri, rgb_buf, d_buf); + cv::Mat rgb, depth; + source->getRGBD(rgb, depth); + if (!rgb.empty()) display.render(rgb, depth, source->getParameters()); + display.wait(10); } + stream.stop(); + LOG(INFO) << "Finished."; } diff --git a/components/net/cpp/include/ftl/net/universe.hpp b/components/net/cpp/include/ftl/net/universe.hpp index f7fa2db894f14d0f61856eefd3596e79e18ead09..11ebcfdf51620808c1333a0be65be2688b08e7d2 100644 --- a/components/net/cpp/include/ftl/net/universe.hpp +++ b/components/net/cpp/include/ftl/net/universe.hpp @@ -103,14 +103,21 @@ class Universe : public ftl::Configurable { template <typename R, typename... ARGS> R call(const UUID &pid, const std::string &name, ARGS... args); + template <typename... ARGS> + void send(const UUID &pid, const std::string &name, ARGS... args); + template <typename R, typename... ARGS> std::optional<R> findOne(const std::string &name, ARGS... args); + + template <typename R, typename... ARGS> + std::vector<R> findAll(const std::string &name, ARGS... args); /** * Send a non-blocking RPC call with no return value to all subscribers * of a resource. There may be no subscribers. Note that query parameter * order in the URI string is not important. */ + //[[deprecated("Pub sub no longer to be used")]] template <typename... ARGS> void publish(const std::string &res, ARGS... args); @@ -119,6 +126,7 @@ class Universe : public ftl::Configurable { * of a resource. There may be no subscribers. This overload accepts a * URI object directly to enable more efficient modification of parameters. */ + //[[deprecated("Pub sub no longer to be used")]] template <typename... ARGS> void publish(const ftl::URI &res, ARGS... args); @@ -126,11 +134,14 @@ class Universe : public ftl::Configurable { * Register your ownership of a new resource. This must be called before * publishing to this resource and before any peers attempt to subscribe. */ + //[[deprecated("Pub sub no longer to be used")]] bool createResource(const std::string &uri); + //[[deprecated("Pub sub no longer to be used")]] std::optional<ftl::UUID> findOwner(const std::string &res); void setLocalID(const ftl::UUID &u) { this_peer = u; }; + const ftl::UUID &id() const { return this_peer; } private: void _run(); @@ -224,6 +235,45 @@ std::optional<R> Universe::findOne(const std::string &name, ARGS... args) { return result; } +template <typename R, typename... ARGS> +std::vector<R> Universe::findAll(const std::string &name, ARGS... args) { + int returncount = 0; + int sentcount = 0; + std::mutex m; + std::condition_variable cv; + + std::vector<R> results; + + auto handler = [&](const std::vector<R> &r) { + std::unique_lock<std::mutex> lk(m); + returncount++; + results.insert(results.end(), r.begin(), r.end()); + lk.unlock(); + cv.notify_one(); + }; + + std::map<Peer*, int> record; + for (auto p : peers_) { + sentcount++; + record[p] = p->asyncCall<std::vector<R>>(name, handler, args...); + } + + { // Block thread until async callback notifies us + std::unique_lock<std::mutex> lk(m); + cv.wait_for(lk, std::chrono::seconds(1), [&returncount,&sentcount]{return returncount == sentcount;}); + + // Cancel any further results + for (auto p : peers_) { + auto m = record.find(p); + if (m != record.end()) { + p->cancelCall(m->second); + } + } + } + + return results; +} + template <typename R, typename... ARGS> R Universe::call(const ftl::UUID &pid, const std::string &name, ARGS... args) { Peer *p = getPeer(pid); @@ -234,6 +284,16 @@ R Universe::call(const ftl::UUID &pid, const std::string &name, ARGS... args) { return p->call<R>(name, args...); } +template <typename... ARGS> +void Universe::send(const ftl::UUID &pid, const std::string &name, ARGS... args) { + Peer *p = getPeer(pid); + if (p == nullptr) { + LOG(ERROR) << "Attempting to call an unknown peer : " << pid.to_string(); + throw -1; + } + p->send(name, args...); +} + template <typename... ARGS> void Universe::publish(const std::string &res, ARGS... args) { ftl::URI uri(res); diff --git a/components/net/cpp/test/net_integration.cpp b/components/net/cpp/test/net_integration.cpp index d714ca0b9847bca2acbeed8b13f5f38f2e488d31..816edca923a854ed9ace53ef3a799e6fef7d5b65 100644 --- a/components/net/cpp/test/net_integration.cpp +++ b/components/net/cpp/test/net_integration.cpp @@ -149,6 +149,42 @@ TEST_CASE("Universe::broadcast()", "[net]") { } } +TEST_CASE("Universe::findAll()", "") { + Universe a; + Universe b; + Universe c; + a.listen("tcp://localhost:7077"); + b.connect("tcp://localhost:7077")->waitConnection(); + c.connect("tcp://localhost:7077")->waitConnection(); + + SECTION("no values exist") { + REQUIRE( (c.findAll<int>("test_all").size() == 0) ); + } + + SECTION("one set exists") { + a.bind("test_all", []() -> std::vector<int> { + return {3,4,5}; + }); + + auto res = c.findAll<int>("test_all"); + REQUIRE( (res.size() == 3) ); + REQUIRE( (res[0] == 3) ); + } + + SECTION("two sets exists") { + b.bind("test_all", []() -> std::vector<int> { + return {3,4,5}; + }); + c.bind("test_all", []() -> std::vector<int> { + return {6,7,8}; + }); + + auto res = a.findAll<int>("test_all"); + REQUIRE( (res.size() == 6) ); + REQUIRE( (res[0] == 3 || res[0] == 6) ); + } +} + TEST_CASE("Universe::findOwner()", "") { Universe a; Universe b; diff --git a/components/rgbd-sources/CMakeLists.txt b/components/rgbd-sources/CMakeLists.txt index 0c4cafc742eac4a254f6fbf38f3813972a33126f..637255efd1109cb04112c6e867c0fcf88ac8db2b 100644 --- a/components/rgbd-sources/CMakeLists.txt +++ b/components/rgbd-sources/CMakeLists.txt @@ -5,6 +5,7 @@ set(RGBDSRC src/rgbd_source.cpp src/stereovideo_source.cpp src/net_source.cpp + src/rgbd_streamer.cpp src/algorithms/rtcensus.cpp src/algorithms/rtcensus_sgm.cpp src/algorithms/opencv_sgbm.cpp diff --git a/components/rgbd-sources/include/ftl/net_source.hpp b/components/rgbd-sources/include/ftl/net_source.hpp index 8d7c44a88fc3ddbf813cdf96f5131cb208f4c57b..cf56f6b5aae13d8ca3816e9197120654f8a1e435 100644 --- a/components/rgbd-sources/include/ftl/net_source.hpp +++ b/components/rgbd-sources/include/ftl/net_source.hpp @@ -30,6 +30,11 @@ class NetSource : public RGBDSource { private: bool has_calibration_; + ftl::UUID peer_; + int N_; + + bool _getCalibration(ftl::net::Universe &net, const ftl::UUID &peer, const std::string &src, ftl::rgbd::CameraParameters &p); + void _recv(const std::vector<unsigned char> &jpg, const std::vector<unsigned char> &d); }; } diff --git a/components/rgbd-sources/include/ftl/rgbd_source.hpp b/components/rgbd-sources/include/ftl/rgbd_source.hpp index 9944533b257bc35d5cba227a003eaaeb8b791180..ca7dec6ac18d54716d5a07c1f924b4b097b91548 100644 --- a/components/rgbd-sources/include/ftl/rgbd_source.hpp +++ b/components/rgbd-sources/include/ftl/rgbd_source.hpp @@ -29,6 +29,7 @@ class RGBDSource : public ftl::Configurable { void getRGBD(cv::Mat &rgb, cv::Mat &depth); const CameraParameters &getParameters() { return params_; }; + std::string getURI() const { return config_["uri"].get<std::string>(); } virtual void setPose(const Eigen::Matrix4f &pose) { pose_ = pose; }; const Eigen::Matrix4f &getPose() { return pose_; }; diff --git a/components/rgbd-sources/include/ftl/rgbd_streamer.hpp b/components/rgbd-sources/include/ftl/rgbd_streamer.hpp new file mode 100644 index 0000000000000000000000000000000000000000..8eb7460c77d41415a2a2051a7593597037119c52 --- /dev/null +++ b/components/rgbd-sources/include/ftl/rgbd_streamer.hpp @@ -0,0 +1,72 @@ +#ifndef _FTL_RGBD_STREAMER_HPP_ +#define _FTL_RGBD_STREAMER_HPP_ + +#include <ctpl_stl.h> +#include <glog/logging.h> +#include <ftl/configuration.hpp> +#include <ftl/configurable.hpp> +#include <ftl/rgbd_source.hpp> +#include <ftl/net/universe.hpp> +#include <string> +#include <list> +#include <map> +#include <shared_mutex> + +namespace ftl { +namespace rgbd { + +namespace detail { + +struct StreamClient { + std::string uri; + ftl::UUID peerid; + int txcount; // Frames sent since last request + int txmax; // Frames to send in request +}; + +static const unsigned int kGrabbed = 0x1; +static const unsigned int kTransmitted = 0x2; + +struct StreamSource { + ftl::rgbd::RGBDSource *src; + unsigned int state; // Busy or ready to swap? + cv::Mat rgb; // Tx buffer + cv::Mat depth; // Tx buffer + std::list<detail::StreamClient> clients[10]; // One list per bitrate + std::shared_mutex mutex; +}; + +} + +static const int kMaxFrames = 25; + +class Streamer : public ftl::Configurable { + public: + Streamer(nlohmann::json &config, ftl::net::Universe *net); + ~Streamer(); + + void add(RGBDSource *); + void remove(RGBDSource *); + void remove(const std::string &); + + void run(bool block=false); + void stop(); + + RGBDSource *get(const std::string &uri); + + private: + std::map<std::string, detail::StreamSource*> sources_; + ctpl::thread_pool pool_; + std::shared_mutex mutex_; + bool active_; + ftl::net::Universe *net_; + + void _schedule(); + void _swap(detail::StreamSource &); + void _addClient(const std::string &source, int N, int rate, const ftl::UUID &peer, const std::string &dest); +}; + +} +} + +#endif // _FTL_RGBD_STREAMER_HPP_ diff --git a/components/rgbd-sources/src/net_source.cpp b/components/rgbd-sources/src/net_source.cpp index 0e50d0c915a7dfc60022ea66be2648f46e0a1889..a4e3c9f0a7b9f9b1d3edcd0d8188c09b8ddb18a0 100644 --- a/components/rgbd-sources/src/net_source.cpp +++ b/components/rgbd-sources/src/net_source.cpp @@ -5,6 +5,7 @@ using ftl::rgbd::NetSource; using ftl::net::Universe; +using ftl::UUID; using std::string; using ftl::rgbd::CameraParameters; using std::mutex; @@ -13,13 +14,13 @@ using std::vector; using std::this_thread::sleep_for; using std::chrono::milliseconds; -static bool getCalibration(Universe &net, string src, ftl::rgbd::CameraParameters &p) { +bool NetSource::_getCalibration(Universe &net, const UUID &peer, const string &src, ftl::rgbd::CameraParameters &p) { while(true) { - auto buf = net.findOne<vector<unsigned char>>((string) src +"/calibration"); - if (buf) { - memcpy((char*)&p, (*buf).data(), (*buf).size()); + auto buf = net.call<vector<unsigned char>>(peer_, "source_calibration", src); + if (buf.size() > 0) { + memcpy((char*)&p, buf.data(), buf.size()); - if (sizeof(p) != (*buf).size()) { + if (sizeof(p) != buf.size()) { LOG(ERROR) << "Corrupted calibration"; return false; } @@ -41,21 +42,43 @@ NetSource::NetSource(nlohmann::json &config) : RGBDSource(config) { NetSource::NetSource(nlohmann::json &config, ftl::net::Universe *net) : RGBDSource(config, net) { - has_calibration_ = getCalibration(*net, config["uri"].get<string>(), params_); + auto p = net->findOne<ftl::UUID>("find_stream", getURI()); + if (!p) { + LOG(ERROR) << "Could not find stream: " << getURI(); + return; + } + peer_ = *p; + + has_calibration_ = _getCalibration(*net, peer_, getURI(), params_); - net->subscribe(config["uri"].get<string>(), [this](const vector<unsigned char> &jpg, const vector<unsigned char> &d) { + net->bind(getURI(), [this](const vector<unsigned char> &jpg, const vector<unsigned char> &d) { unique_lock<mutex> lk(mutex_); - cv::imdecode(jpg, cv::IMREAD_COLOR, &rgb_); - //Mat(rgb_.size(), CV_16UC1); - cv::imdecode(d, cv::IMREAD_UNCHANGED, &depth_); - depth_.convertTo(depth_, CV_32FC1, 1.0f/(16.0f*100.0f)); + _recv(jpg, d); }); + + N_ = 10; + + // Initiate stream with request for first 10 frames + net->send(peer_, "get_stream", getURI(), 10, 0, net->id(), getURI()); } NetSource::~NetSource() { // TODO Unsubscribe } +void NetSource::_recv(const vector<unsigned char> &jpg, const vector<unsigned char> &d) { + cv::imdecode(jpg, cv::IMREAD_COLOR, &rgb_); + //Mat(rgb_.size(), CV_16UC1); + cv::imdecode(d, cv::IMREAD_UNCHANGED, &depth_); + depth_.convertTo(depth_, CV_32FC1, 1.0f/(16.0f*100.0f)); + + N_--; + if (N_ == 0) { + N_ += 10; + net_->send(peer_, "get_stream", getURI(), 10, 0, net_->id(), getURI()); + } +} + void NetSource::grab() { // net_.broadcast("grab"); } diff --git a/components/rgbd-sources/src/rgbd_streamer.cpp b/components/rgbd-sources/src/rgbd_streamer.cpp new file mode 100644 index 0000000000000000000000000000000000000000..b4e256169d598587a637e35624750d8ed3370f90 --- /dev/null +++ b/components/rgbd-sources/src/rgbd_streamer.cpp @@ -0,0 +1,224 @@ +#include <ftl/rgbd_streamer.hpp> +#include <vector> +#include <optional> +#include <thread> +#include <chrono> + +using ftl::rgbd::Streamer; +using ftl::rgbd::RGBDSource; +using ftl::rgbd::detail::StreamSource; +using ftl::rgbd::detail::StreamClient; +using ftl::net::Universe; +using std::string; +using std::list; +using std::map; +using std::optional; +using std::vector; +using std::mutex; +using std::shared_mutex; +using std::unique_lock; +using std::shared_lock; +using std::this_thread::sleep_for; +using std::chrono::milliseconds; + +#define THREAD_POOL_SIZE 6 + +Streamer::Streamer(nlohmann::json &config, Universe *net) + : ftl::Configurable(config), pool_(THREAD_POOL_SIZE) { + + active_ = false; + net_ = net; + + net->bind("find_stream", [this](const std::string &uri) -> optional<UUID> { + if (sources_.find(uri) != sources_.end()) return net_->id(); + else return {}; + }); + + net->bind("list_streams", [this]() -> vector<string> { + vector<string> streams; + for (auto &i : sources_) { + streams.push_back(i.first); + } + return streams; + }); + + // Allow remote users to access camera calibration matrix + net->bind("source_calibration", [this](const std::string &uri) -> vector<unsigned char> { + vector<unsigned char> buf; + shared_lock<shared_mutex> slk(mutex_); + + if (sources_.find(uri) != sources_.end()) { + buf.resize(sizeof(CameraParameters)); + LOG(INFO) << "Calib buf size = " << buf.size(); + memcpy(buf.data(), &sources_[uri]->src->getParameters(), buf.size()); + } + return buf; + }); + + net->bind("get_stream", [this](const string &source, int N, int rate, const UUID &peer, const string &dest) { + _addClient(source, N, rate, peer, dest); + }); + + net->bind("sync_streams", [this](unsigned long long time) { + // Calc timestamp delta + }); + + net->bind("ping_streamer", [this](unsigned long long time) -> unsigned long long { + return time; + }); +} + +Streamer::~Streamer() { + // TODO Unbind everything from net.... + pool_.stop(); +} + +void Streamer::add(RGBDSource *src) { + unique_lock<shared_mutex> ulk(mutex_); + if (sources_.find(src->getURI()) != sources_.end()) return; + + StreamSource *s = new StreamSource; // = sources_.emplace(std::make_pair<std::string,StreamSource>(src->getURI(),{})); + s->src = src; + s->state = 0; + sources_[src->getURI()] = s; +} + +void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID &peer, const string &dest) { + shared_lock<shared_mutex> slk(mutex_); + if (sources_.find(source) == sources_.end()) return; + + if (rate < 0 || rate >= 10) return; + if (N < 0 || N > ftl::rgbd::kMaxFrames) return; + + StreamClient c; + c.peerid = peer; + c.uri = dest; + c.txcount = 0; + c.txmax = N; + + StreamSource *s = sources_[source]; + unique_lock<shared_mutex> ulk(s->mutex); + s->clients[rate].push_back(c); +} + +void Streamer::remove(RGBDSource *) { + +} + +void Streamer::remove(const std::string &) { + +} + +void Streamer::stop() { + active_ = false; +} + +void Streamer::run(bool block) { + active_ = true; + + if (block) { + while (active_) { + double wait = 1.0f / 25.0f; + auto start = std::chrono::high_resolution_clock::now(); + // Create frame jobs at correct FPS interval + _schedule(); + + std::chrono::duration<double> elapsed = + std::chrono::high_resolution_clock::now() - start; + + sleep_for(milliseconds((long long)((wait - elapsed.count()) * 1000.0f))); + } + } else { + // Create thread job for frame ticking + pool_.push([this](int id) { + while (active_) { + double wait = 1.0f / 25.0f; + auto start = std::chrono::high_resolution_clock::now(); + // Create frame jobs at correct FPS interval + _schedule(); + + std::chrono::duration<double> elapsed = + std::chrono::high_resolution_clock::now() - start; + + sleep_for(milliseconds((long long)((wait - elapsed.count()) * 1000.0f))); + } + }); + } +} + +void Streamer::_swap(StreamSource &src) { + if (src.state == (ftl::rgbd::detail::kGrabbed | ftl::rgbd::detail::kTransmitted)) { + src.src->getRGBD(src.rgb, src.depth); + src.state = 0; + } +} + +void Streamer::_schedule() { + for (auto s : sources_) { + string uri = s.first; + + shared_lock<shared_mutex> slk(s.second->mutex); + if (s.second->state != 0) { + LOG(ERROR) << "Stream not ready to schedule on time: " << uri; + continue; + } + if (s.second->clients[0].size() == 0) { + //LOG(ERROR) << "Stream has no clients: " << uri; + continue; + } + slk.unlock(); + + // Grab job + pool_.push([this,uri](int id) { + StreamSource *src = sources_[uri]; + src->src->grab(); + + unique_lock<shared_mutex> lk(src->mutex); + src->state |= ftl::rgbd::detail::kGrabbed; + _swap(*src); + }); + + // Transmit job + // TODO, could do one for each bitrate... + pool_.push([this,uri](int id) { + StreamSource *src = sources_[uri]; + + if (src->rgb.rows > 0 && src->depth.rows > 0 && src->clients[0].size() > 0) { + vector<unsigned char> rgb_buf; + cv::imencode(".jpg", src->rgb, rgb_buf); + + cv::Mat d2; + src->depth.convertTo(d2, CV_16UC1, 16*100); + vector<unsigned char> d_buf; + cv::imencode(".png", d2, d_buf); + + auto i = src->clients[0].begin(); + while (i != src->clients[0].end()) { + try { + net_->send((*i).peerid, (*i).uri, rgb_buf, d_buf); + } catch(...) { + (*i).txcount = (*i).txmax; + } + (*i).txcount++; + if ((*i).txcount >= (*i).txmax) { + LOG(INFO) << "Remove client"; + i = src->clients[0].erase(i); + } else { + i++; + } + } + } + + unique_lock<shared_mutex> lk(src->mutex); + LOG(INFO) << "Tx Frame: " << uri; + src->state |= ftl::rgbd::detail::kTransmitted; + _swap(*src); + }); + } +} + +RGBDSource *Streamer::get(const std::string &uri) { + shared_lock<shared_mutex> slk(mutex_); + if (sources_.find(uri) != sources_.end()) return sources_[uri]->src; + else return nullptr; +}