Skip to content
Snippets Groups Projects
Commit e85dc65c authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Merge branch 'bug/196/snapshotcapture' into 'master'

Resolves #159 proxy of sources

Closes #159

See merge request nicolas.pope/ftl!125
parents a094dcba bd7455ab
No related branches found
No related tags found
1 merge request!125Resolves #159 proxy of sources
Pipeline #15246 passed
...@@ -160,6 +160,7 @@ static void run(ftl::Configurable *root) { ...@@ -160,6 +160,7 @@ static void run(ftl::Configurable *root) {
} }
stream->setLatency(5); // FIXME: This depends on source!? stream->setLatency(5); // FIXME: This depends on source!?
stream->add(&group);
stream->run(); stream->run();
bool busy = false; bool busy = false;
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#include <ftl/timer.hpp> #include <ftl/timer.hpp>
#include <ftl/rgbd/frame.hpp> #include <ftl/rgbd/frame.hpp>
#include <ftl/rgbd/frameset.hpp> #include <ftl/rgbd/frameset.hpp>
#include <ftl/codecs/packet.hpp>
#include <opencv2/opencv.hpp> #include <opencv2/opencv.hpp>
#include <vector> #include <vector>
...@@ -65,6 +66,22 @@ class Group { ...@@ -65,6 +66,22 @@ class Group {
*/ */
void sync(std::function<bool(FrameSet &)>); void sync(std::function<bool(FrameSet &)>);
/**
* Whenever any source within the group receives raw data, this callback
* will be called with that raw data. This is used to allow direct data
* capture (to disk) or proxy over a network without needing to re-encode.
* There is no guarantee about order or timing and the callback itself will
* need to ensure synchronisation of timestamps.
*/
void addRawCallback(std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &);
/**
* Removes a raw data callback from all sources in the group.
*/
void removeRawCallback(std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &);
inline std::vector<Source*> sources() const { return sources_; }
/** @deprecated */ /** @deprecated */
//bool getFrames(FrameSet &, bool complete=false); //bool getFrames(FrameSet &, bool complete=false);
......
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
#include <ftl/net/universe.hpp> #include <ftl/net/universe.hpp>
#include <ftl/uri.hpp> #include <ftl/uri.hpp>
#include <ftl/rgbd/detail/source.hpp> #include <ftl/rgbd/detail/source.hpp>
#include <ftl/codecs/packet.hpp>
#include <opencv2/opencv.hpp> #include <opencv2/opencv.hpp>
#include <Eigen/Eigen> #include <Eigen/Eigen>
#include <string> #include <string>
...@@ -201,9 +202,26 @@ class Source : public ftl::Configurable { ...@@ -201,9 +202,26 @@ class Source : public ftl::Configurable {
SHARED_MUTEX &mutex() { return mutex_; } SHARED_MUTEX &mutex() { return mutex_; }
std::function<void(int64_t, cv::Mat &, cv::Mat &)> &callback() { return callback_; } std::function<void(int64_t, cv::Mat &, cv::Mat &)> &callback() { return callback_; }
/**
* Set the callback that receives decoded frames as they are generated.
*/
void setCallback(std::function<void(int64_t, cv::Mat &, cv::Mat &)> cb); void setCallback(std::function<void(int64_t, cv::Mat &, cv::Mat &)> cb);
void removeCallback() { callback_ = nullptr; } void removeCallback() { callback_ = nullptr; }
/**
* Add a callback to immediately receive any raw data from this source.
* Currently this only works for a net source since other sources don't
* produce raw encoded data.
*/
void addRawCallback(const std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &);
void removeRawCallback(const std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &);
/**
* INTERNAL. Used to send raw data to callbacks.
*/
void notifyRaw(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt);
protected: protected:
detail::Source *impl_; detail::Source *impl_;
...@@ -220,6 +238,7 @@ class Source : public ftl::Configurable { ...@@ -220,6 +238,7 @@ class Source : public ftl::Configurable {
cudaStream_t stream_; cudaStream_t stream_;
int64_t timestamp_; int64_t timestamp_;
std::function<void(int64_t, cv::Mat &, cv::Mat &)> callback_; std::function<void(int64_t, cv::Mat &, cv::Mat &)> callback_;
std::list<std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)>> rawcallbacks_;
detail::Source *_createImplementation(); detail::Source *_createImplementation();
detail::Source *_createFileImpl(const ftl::URI &uri); detail::Source *_createFileImpl(const ftl::URI &uri);
......
...@@ -101,6 +101,11 @@ class Streamer : public ftl::Configurable { ...@@ -101,6 +101,11 @@ class Streamer : public ftl::Configurable {
*/ */
void add(Source *); void add(Source *);
/**
* Allow all sources in another group to be proxy streamed by this streamer.
*/
void add(ftl::rgbd::Group *grp);
void remove(Source *); void remove(Source *);
void remove(const std::string &); void remove(const std::string &);
...@@ -130,6 +135,7 @@ class Streamer : public ftl::Configurable { ...@@ -130,6 +135,7 @@ class Streamer : public ftl::Configurable {
private: private:
ftl::rgbd::Group group_; ftl::rgbd::Group group_;
std::map<std::string, detail::StreamSource*> sources_; std::map<std::string, detail::StreamSource*> sources_;
std::list<ftl::rgbd::Group*> proxy_grps_;
//ctpl::thread_pool pool_; //ctpl::thread_pool pool_;
SHARED_MUTEX mutex_; SHARED_MUTEX mutex_;
bool active_; bool active_;
...@@ -152,10 +158,17 @@ class Streamer : public ftl::Configurable { ...@@ -152,10 +158,17 @@ class Streamer : public ftl::Configurable {
ftl::codecs::device_t hq_devices_; ftl::codecs::device_t hq_devices_;
enum class Quality {
High,
Low,
Any
};
void _process(ftl::rgbd::FrameSet &); void _process(ftl::rgbd::FrameSet &);
void _cleanUp(); void _cleanUp();
void _addClient(const std::string &source, int N, int rate, const ftl::UUID &peer, const std::string &dest); void _addClient(const std::string &source, int N, int rate, const ftl::UUID &peer, const std::string &dest);
void _transmitPacket(detail::StreamSource *src, const ftl::codecs::Packet &pkt, int chan, bool hasChan2, bool hqonly); void _transmitPacket(detail::StreamSource *src, const ftl::codecs::Packet &pkt, int chan, bool hasChan2, Quality q);
void _transmitPacket(detail::StreamSource *src, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt, Quality q);
//void _encodeHQAndTransmit(detail::StreamSource *src, const cv::Mat &, const cv::Mat &, int chunk); //void _encodeHQAndTransmit(detail::StreamSource *src, const cv::Mat &, const cv::Mat &, int chunk);
//void _encodeLQAndTransmit(detail::StreamSource *src, const cv::Mat &, const cv::Mat &, int chunk); //void _encodeLQAndTransmit(detail::StreamSource *src, const cv::Mat &, const cv::Mat &, int chunk);
......
...@@ -222,9 +222,22 @@ void Group::sync(std::function<bool(ftl::rgbd::FrameSet &)> cb) { ...@@ -222,9 +222,22 @@ void Group::sync(std::function<bool(ftl::rgbd::FrameSet &)> cb) {
return true; return true;
}); });
LOG(INFO) << "Start timer";
ftl::timer::start(true); ftl::timer::start(true);
} }
void Group::addRawCallback(std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &f) {
for (auto s : sources_) {
s->addRawCallback(f);
}
}
void Group::removeRawCallback(std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &f) {
for (auto s : sources_) {
s->removeRawCallback(f);
}
}
//ftl::rgbd::FrameSet &Group::_getRelativeFrameset(int rel) { //ftl::rgbd::FrameSet &Group::_getRelativeFrameset(int rel) {
// int idx = (rel < 0) ? (head_+kFrameBufferSize+rel)%kFrameBufferSize : (head_+rel)%kFrameBufferSize; // int idx = (rel < 0) ? (head_+kFrameBufferSize+rel)%kFrameBufferSize : (head_+rel)%kFrameBufferSize;
// return framesets_[idx]; // return framesets_[idx];
......
...@@ -242,29 +242,30 @@ void NetSource::_recvPacket(short ttimeoff, const ftl::codecs::StreamPacket &spk ...@@ -242,29 +242,30 @@ void NetSource::_recvPacket(short ttimeoff, const ftl::codecs::StreamPacket &spk
int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count(); int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count();
if (!active_) return; if (!active_) return;
// Allow acccess to the raw data elsewhere...
host_->notifyRaw(spkt, pkt);
const ftl::rgbd::Channel chan = host_->getChannel(); const ftl::rgbd::Channel chan = host_->getChannel();
int rchan = spkt.channel & 0x1; int rchan = spkt.channel & 0x1;
// Ignore any unwanted second channel
if (chan == ftl::rgbd::Channel::None && rchan > 0) {
LOG(INFO) << "Unwanted channel";
//return;
// TODO: Allow decode to be skipped
}
NetFrame &frame = queue_.getFrame(spkt.timestamp, cv::Size(params_.width, params_.height), CV_8UC3, (isFloatChannel(chan) ? CV_32FC1 : CV_8UC3)); NetFrame &frame = queue_.getFrame(spkt.timestamp, cv::Size(params_.width, params_.height), CV_8UC3, (isFloatChannel(chan) ? CV_32FC1 : CV_8UC3));
// Update frame statistics // Update frame statistics
frame.tx_size += pkt.data.size(); frame.tx_size += pkt.data.size();
_createDecoder(rchan, pkt); // Ignore any unwanted second channel
auto *decoder = (rchan == 0) ? decoder_c1_ : decoder_c2_; if (!(chan == ftl::rgbd::Channel::None && rchan > 0)) {
if (!decoder) { _createDecoder(rchan, pkt);
LOG(ERROR) << "No frame decoder available"; auto *decoder = (rchan == 0) ? decoder_c1_ : decoder_c2_;
return; if (!decoder) {
} LOG(ERROR) << "No frame decoder available";
return;
}
decoder->decode(pkt, (rchan == 0) ? frame.channel1 : frame.channel2); decoder->decode(pkt, (rchan == 0) ? frame.channel1 : frame.channel2);
} else {
//LOG(INFO) << "Unwanted frame";
}
// Apply colour correction to chunk // Apply colour correction to chunk
//ftl::rgbd::colourCorrection(tmp_rgb, gamma_, temperature_); //ftl::rgbd::colourCorrection(tmp_rgb, gamma_, temperature_);
......
...@@ -310,3 +310,25 @@ void Source::setCallback(std::function<void(int64_t, cv::Mat &, cv::Mat &)> cb) ...@@ -310,3 +310,25 @@ void Source::setCallback(std::function<void(int64_t, cv::Mat &, cv::Mat &)> cb)
if (bool(callback_)) LOG(ERROR) << "Source already has a callback: " << getURI(); if (bool(callback_)) LOG(ERROR) << "Source already has a callback: " << getURI();
callback_ = cb; callback_ = cb;
} }
void Source::addRawCallback(const std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &f) {
UNIQUE_LOCK(mutex_,lk);
rawcallbacks_.push_back(f);
}
void Source::removeRawCallback(const std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &f) {
UNIQUE_LOCK(mutex_,lk);
for (auto i=rawcallbacks_.begin(); i!=rawcallbacks_.end(); ++i) {
if (i->target<void(*)(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)>() == f.target<void(*)(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)>()) {
rawcallbacks_.erase(i);
return;
}
}
}
void Source::notifyRaw(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) {
SHARED_LOCK(mutex_,lk);
for (auto &i : rawcallbacks_) {
i(this, spkt, pkt);
}
}
...@@ -174,6 +174,38 @@ void Streamer::add(Source *src) { ...@@ -174,6 +174,38 @@ void Streamer::add(Source *src) {
net_->broadcast("add_stream", src->getID()); net_->broadcast("add_stream", src->getID());
} }
void Streamer::add(ftl::rgbd::Group *grp) {
auto srcs = grp->sources();
for (auto src : srcs) {
{
UNIQUE_LOCK(mutex_,ulk);
if (sources_.find(src->getID()) != sources_.end()) return;
StreamSource *s = new StreamSource;
s->src = src;
//s->prev_depth = cv::Mat(cv::Size(src->parameters().width, src->parameters().height), CV_16SC1, 0);
s->jobs = 0;
s->frame = 0;
s->clientCount = 0;
s->hq_count = 0;
s->lq_count = 0;
sources_[src->getID()] = s;
//group_.addSource(src);
src->addRawCallback([this,s](Source *src, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) {
//LOG(INFO) << "RAW CALLBACK";
_transmitPacket(s, spkt, pkt, Quality::Any);
});
}
LOG(INFO) << "Proxy Streaming: " << src->getID();
net_->broadcast("add_stream", src->getID());
}
LOG(INFO) << "All proxy streams added";
}
void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID &peer, const string &dest) { void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID &peer, const string &dest) {
StreamSource *s = nullptr; StreamSource *s = nullptr;
...@@ -349,10 +381,11 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) { ...@@ -349,10 +381,11 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) {
// Prevent new clients during processing. // Prevent new clients during processing.
SHARED_LOCK(mutex_,slk); SHARED_LOCK(mutex_,slk);
if (fs.sources.size() != sources_.size()) { // This check is not valid, always assume fs.sources is correct
LOG(ERROR) << "Incorrect number of sources in frameset: " << fs.sources.size() << " vs " << sources_.size(); //if (fs.sources.size() != sources_.size()) {
return; // LOG(ERROR) << "Incorrect number of sources in frameset: " << fs.sources.size() << " vs " << sources_.size();
} //return;
//}
int totalclients = 0; int totalclients = 0;
...@@ -390,14 +423,15 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) { ...@@ -390,14 +423,15 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) {
// TODO: Each encode could be done in own thread // TODO: Each encode could be done in own thread
if (hasChan2) { if (hasChan2) {
enc2->encode(fs.frames[j].get<cv::Mat>(fs.sources[j]->getChannel()), src->hq_bitrate, [this,src,hasChan2](const ftl::codecs::Packet &blk){ enc2->encode(fs.frames[j].get<cv::Mat>(fs.sources[j]->getChannel()), src->hq_bitrate, [this,src,hasChan2](const ftl::codecs::Packet &blk){
_transmitPacket(src, blk, 1, hasChan2, true); _transmitPacket(src, blk, 1, hasChan2, Quality::High);
}); });
} else { } else {
if (enc2) enc2->reset(); if (enc2) enc2->reset();
} }
if (fs.timestamp % (10*ftl::timer::getInterval()) == 0) enc1->reset();
enc1->encode(fs.frames[j].get<cv::Mat>(Channel::Colour), src->hq_bitrate, [this,src,hasChan2](const ftl::codecs::Packet &blk){ enc1->encode(fs.frames[j].get<cv::Mat>(Channel::Colour), src->hq_bitrate, [this,src,hasChan2](const ftl::codecs::Packet &blk){
_transmitPacket(src, blk, 0, hasChan2, true); _transmitPacket(src, blk, 0, hasChan2, Quality::High);
}); });
} }
} }
...@@ -418,14 +452,14 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) { ...@@ -418,14 +452,14 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) {
// Receiver only waits for channel 1 by default // Receiver only waits for channel 1 by default
if (hasChan2) { if (hasChan2) {
enc2->encode(fs.frames[j].get<cv::Mat>(fs.sources[j]->getChannel()), src->lq_bitrate, [this,src,hasChan2](const ftl::codecs::Packet &blk){ enc2->encode(fs.frames[j].get<cv::Mat>(fs.sources[j]->getChannel()), src->lq_bitrate, [this,src,hasChan2](const ftl::codecs::Packet &blk){
_transmitPacket(src, blk, 1, hasChan2, false); _transmitPacket(src, blk, 1, hasChan2, Quality::Low);
}); });
} else { } else {
if (enc2) enc2->reset(); if (enc2) enc2->reset();
} }
enc1->encode(fs.frames[j].get<cv::Mat>(Channel::Colour), src->lq_bitrate, [this,src,hasChan2](const ftl::codecs::Packet &blk){ enc1->encode(fs.frames[j].get<cv::Mat>(Channel::Colour), src->lq_bitrate, [this,src,hasChan2](const ftl::codecs::Packet &blk){
_transmitPacket(src, blk, 0, hasChan2, false); _transmitPacket(src, blk, 0, hasChan2, Quality::Low);
}); });
} }
} }
...@@ -491,19 +525,24 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) { ...@@ -491,19 +525,24 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) {
} else _cleanUp(); } else _cleanUp();
} }
void Streamer::_transmitPacket(StreamSource *src, const ftl::codecs::Packet &pkt, int chan, bool hasChan2, bool hqonly) { void Streamer::_transmitPacket(StreamSource *src, const ftl::codecs::Packet &pkt, int chan, bool hasChan2, Quality q) {
ftl::codecs::StreamPacket spkt = { ftl::codecs::StreamPacket spkt = {
frame_no_, frame_no_,
static_cast<uint8_t>((chan & 0x1) | ((hasChan2) ? 0x2 : 0x0)) static_cast<uint8_t>((chan & 0x1) | ((hasChan2) ? 0x2 : 0x0))
}; };
_transmitPacket(src, spkt, pkt, q);
}
void Streamer::_transmitPacket(StreamSource *src, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt, Quality q) {
// Lock to prevent clients being added / removed // Lock to prevent clients being added / removed
//SHARED_LOCK(src->mutex,lk); //SHARED_LOCK(src->mutex,lk);
auto c = src->clients.begin(); auto c = src->clients.begin();
while (c != src->clients.end()) { while (c != src->clients.end()) {
const ftl::codecs::preset_t b = (*c).preset; const ftl::codecs::preset_t b = (*c).preset;
if ((hqonly && b >= kQualityThreshold) || (!hqonly && b < kQualityThreshold)) { if ((q == Quality::High && b >= kQualityThreshold) || (q == Quality::Low && b < kQualityThreshold)) {
++c; ++c;
LOG(INFO) << "INCORRECT QUALITY";
continue; continue;
} }
...@@ -520,7 +559,7 @@ void Streamer::_transmitPacket(StreamSource *src, const ftl::codecs::Packet &pkt ...@@ -520,7 +559,7 @@ void Streamer::_transmitPacket(StreamSource *src, const ftl::codecs::Packet &pkt
(*c).txcount = (*c).txmax; (*c).txcount = (*c).txmax;
} else { } else {
// Count frame as completed only if last block and channel is 0 // Count frame as completed only if last block and channel is 0
if (pkt.block_number == pkt.block_total - 1 && chan == 0) ++(*c).txcount; if (pkt.block_number == pkt.block_total - 1 && spkt.channel & 0x1 == 0) ++(*c).txcount;
} }
} catch(...) { } catch(...) {
(*c).txcount = (*c).txmax; (*c).txcount = (*c).txmax;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment