diff --git a/components/rgbd-sources/include/ftl/rgbd/group.hpp b/components/rgbd-sources/include/ftl/rgbd/group.hpp index 0ded29e80b7d2fa01ad656c2fbb3b8865b726a70..3c7b26e171a3f119f2ca515696ffdd03e1ead54a 100644 --- a/components/rgbd-sources/include/ftl/rgbd/group.hpp +++ b/components/rgbd-sources/include/ftl/rgbd/group.hpp @@ -6,6 +6,7 @@ #include <ftl/timer.hpp> #include <ftl/rgbd/frame.hpp> #include <ftl/rgbd/frameset.hpp> +#include <ftl/codecs/packet.hpp> #include <opencv2/opencv.hpp> #include <vector> @@ -65,6 +66,22 @@ class Group { */ void sync(std::function<bool(FrameSet &)>); + /** + * Whenever any source within the group receives raw data, this callback + * will be called with that raw data. This is used to allow direct data + * capture (to disk) or proxy over a network without needing to re-encode. + * There is no guarantee about order or timing and the callback itself will + * need to ensure synchronisation of timestamps. + */ + void addRawCallback(std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &); + + /** + * Removes a raw data callback from all sources in the group. + */ + void removeRawCallback(std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &); + + inline std::vector<Source*> sources() const { return sources_; } + /** @deprecated */ //bool getFrames(FrameSet &, bool complete=false); diff --git a/components/rgbd-sources/include/ftl/rgbd/source.hpp b/components/rgbd-sources/include/ftl/rgbd/source.hpp index 0ee163add0009023ec24e6df6bd18a1da927af1e..4c27baf866fc8a80d30f37ec3b794df4e0368916 100644 --- a/components/rgbd-sources/include/ftl/rgbd/source.hpp +++ b/components/rgbd-sources/include/ftl/rgbd/source.hpp @@ -8,6 +8,7 @@ #include <ftl/net/universe.hpp> #include <ftl/uri.hpp> #include <ftl/rgbd/detail/source.hpp> +#include <ftl/codecs/packet.hpp> #include <opencv2/opencv.hpp> #include <Eigen/Eigen> #include <string> @@ -201,9 +202,26 @@ class Source : public ftl::Configurable { SHARED_MUTEX &mutex() { return mutex_; } std::function<void(int64_t, cv::Mat &, cv::Mat &)> &callback() { return callback_; } + + /** + * Set the callback that receives decoded frames as they are generated. + */ void setCallback(std::function<void(int64_t, cv::Mat &, cv::Mat &)> cb); void removeCallback() { callback_ = nullptr; } + /** + * Add a callback to immediately receive any raw data from this source. + * Currently this only works for a net source since other sources don't + * produce raw encoded data. + */ + void addRawCallback(const std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &); + + void removeRawCallback(const std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &); + + /** + * INTERNAL. Used to send raw data to callbacks. + */ + void notifyRaw(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt); protected: detail::Source *impl_; @@ -220,6 +238,7 @@ class Source : public ftl::Configurable { cudaStream_t stream_; int64_t timestamp_; std::function<void(int64_t, cv::Mat &, cv::Mat &)> callback_; + std::list<std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)>> rawcallbacks_; detail::Source *_createImplementation(); detail::Source *_createFileImpl(const ftl::URI &uri); diff --git a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp index 7c6e6f479afe022cdacefbabf9098e276e0c9f79..47048c946da2ff792b7c41dfc21cf4bf6099ed5b 100644 --- a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp +++ b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp @@ -101,6 +101,11 @@ class Streamer : public ftl::Configurable { */ void add(Source *); + /** + * Allow all sources in another group to be proxy streamed by this streamer. + */ + void add(ftl::rgbd::Group *grp); + void remove(Source *); void remove(const std::string &); @@ -130,6 +135,7 @@ class Streamer : public ftl::Configurable { private: ftl::rgbd::Group group_; std::map<std::string, detail::StreamSource*> sources_; + std::list<ftl::rgbd::Group*> proxy_grps_; //ctpl::thread_pool pool_; SHARED_MUTEX mutex_; bool active_; @@ -156,6 +162,7 @@ class Streamer : public ftl::Configurable { void _cleanUp(); void _addClient(const std::string &source, int N, int rate, const ftl::UUID &peer, const std::string &dest); void _transmitPacket(detail::StreamSource *src, const ftl::codecs::Packet &pkt, int chan, bool hasChan2, bool hqonly); + void _transmitPacket(detail::StreamSource *src, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt, bool hqonly); //void _encodeHQAndTransmit(detail::StreamSource *src, const cv::Mat &, const cv::Mat &, int chunk); //void _encodeLQAndTransmit(detail::StreamSource *src, const cv::Mat &, const cv::Mat &, int chunk); diff --git a/components/rgbd-sources/src/group.cpp b/components/rgbd-sources/src/group.cpp index 96ca3a82fd3e306656f047d4971ce4a29b00fe48..1804bfe46231453bbbdf619bfe8a3599e76c0896 100644 --- a/components/rgbd-sources/src/group.cpp +++ b/components/rgbd-sources/src/group.cpp @@ -225,6 +225,18 @@ void Group::sync(std::function<bool(ftl::rgbd::FrameSet &)> cb) { ftl::timer::start(true); } +void Group::addRawCallback(std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &f) { + for (auto s : sources_) { + s->addRawCallback(f); + } +} + +void Group::removeRawCallback(std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &f) { + for (auto s : sources_) { + s->removeRawCallback(f); + } +} + //ftl::rgbd::FrameSet &Group::_getRelativeFrameset(int rel) { // int idx = (rel < 0) ? (head_+kFrameBufferSize+rel)%kFrameBufferSize : (head_+rel)%kFrameBufferSize; // return framesets_[idx]; diff --git a/components/rgbd-sources/src/net.cpp b/components/rgbd-sources/src/net.cpp index 4f1f5b1419f0bbf581be0ae178b843029f5fe284..f720651d690ec07c91ebb16c804733a5d44dbce2 100644 --- a/components/rgbd-sources/src/net.cpp +++ b/components/rgbd-sources/src/net.cpp @@ -242,6 +242,9 @@ void NetSource::_recvPacket(short ttimeoff, const ftl::codecs::StreamPacket &spk int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count(); if (!active_) return; + // Allow acccess to the raw data elsewhere... + host_->notifyRaw(spkt, pkt); + const ftl::rgbd::Channel chan = host_->getChannel(); int rchan = spkt.channel & 0x1; diff --git a/components/rgbd-sources/src/source.cpp b/components/rgbd-sources/src/source.cpp index 35d23f27ad7edac18d3e3e02247296f1382be5e2..4ec34a5e5685cc76d4356ac3766a5a97fa067679 100644 --- a/components/rgbd-sources/src/source.cpp +++ b/components/rgbd-sources/src/source.cpp @@ -310,3 +310,25 @@ void Source::setCallback(std::function<void(int64_t, cv::Mat &, cv::Mat &)> cb) if (bool(callback_)) LOG(ERROR) << "Source already has a callback: " << getURI(); callback_ = cb; } + +void Source::addRawCallback(const std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &f) { + UNIQUE_LOCK(mutex_,lk); + rawcallbacks_.push_back(f); +} + +void Source::removeRawCallback(const std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &f) { + UNIQUE_LOCK(mutex_,lk); + for (auto i=rawcallbacks_.begin(); i!=rawcallbacks_.end(); ++i) { + if (i->target<void(*)(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)>() == f.target<void(*)(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)>()) { + rawcallbacks_.erase(i); + return; + } + } +} + +void Source::notifyRaw(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + SHARED_LOCK(mutex_,lk); + for (auto &i : rawcallbacks_) { + i(this, spkt, pkt); + } +} diff --git a/components/rgbd-sources/src/streamer.cpp b/components/rgbd-sources/src/streamer.cpp index 7a9118c9f47975a31d6389982b2adb818ed8a046..74912f2e79704443b26465e5252b5c71552bc519 100644 --- a/components/rgbd-sources/src/streamer.cpp +++ b/components/rgbd-sources/src/streamer.cpp @@ -174,6 +174,35 @@ void Streamer::add(Source *src) { net_->broadcast("add_stream", src->getID()); } +void Streamer::add(ftl::rgbd::Group *grp) { + auto srcs = grp->sources(); + for (auto src : srcs) { + { + UNIQUE_LOCK(mutex_,ulk); + if (sources_.find(src->getID()) != sources_.end()) return; + + StreamSource *s = new StreamSource; + s->src = src; + //s->prev_depth = cv::Mat(cv::Size(src->parameters().width, src->parameters().height), CV_16SC1, 0); + s->jobs = 0; + s->frame = 0; + s->clientCount = 0; + s->hq_count = 0; + s->lq_count = 0; + sources_[src->getID()] = s; + + //group_.addSource(src); + + src->addRawCallback([this,s](Source *src, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + _transmitPacket(s, spkt, pkt, false); + }); + } + + LOG(INFO) << "Streaming: " << src->getID(); + net_->broadcast("add_stream", src->getID()); + } +} + void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID &peer, const string &dest) { StreamSource *s = nullptr; @@ -497,6 +526,10 @@ void Streamer::_transmitPacket(StreamSource *src, const ftl::codecs::Packet &pkt static_cast<uint8_t>((chan & 0x1) | ((hasChan2) ? 0x2 : 0x0)) }; + _transmitPacket(src, spkt, pkt, hqonly); +} + +void Streamer::_transmitPacket(StreamSource *src, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt, bool hqonly) { // Lock to prevent clients being added / removed //SHARED_LOCK(src->mutex,lk); auto c = src->clients.begin(); @@ -520,7 +553,7 @@ void Streamer::_transmitPacket(StreamSource *src, const ftl::codecs::Packet &pkt (*c).txcount = (*c).txmax; } else { // Count frame as completed only if last block and channel is 0 - if (pkt.block_number == pkt.block_total - 1 && chan == 0) ++(*c).txcount; + if (pkt.block_number == pkt.block_total - 1 && spkt.channel & 0x1 == 0) ++(*c).txcount; } } catch(...) { (*c).txcount = (*c).txmax;