Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • nicolaspope/ftl
1 result
Show changes
Commits on Source (4)
...@@ -160,6 +160,7 @@ static void run(ftl::Configurable *root) { ...@@ -160,6 +160,7 @@ static void run(ftl::Configurable *root) {
} }
stream->setLatency(5); // FIXME: This depends on source!? stream->setLatency(5); // FIXME: This depends on source!?
stream->add(&group);
stream->run(); stream->run();
bool busy = false; bool busy = false;
......
...@@ -307,12 +307,14 @@ bool Splatter::render(ftl::rgbd::VirtualSource *src, ftl::rgbd::Frame &out, cuda ...@@ -307,12 +307,14 @@ bool Splatter::render(ftl::rgbd::VirtualSource *src, ftl::rgbd::Frame &out, cuda
out.create<GpuMat>(Channel::Depth, Format<float>(camera.width, camera.height)); out.create<GpuMat>(Channel::Depth, Format<float>(camera.width, camera.height));
out.create<GpuMat>(Channel::Colour, Format<uchar4>(camera.width, camera.height)); out.create<GpuMat>(Channel::Colour, Format<uchar4>(camera.width, camera.height));
// FIXME: Use source resolutions, not virtual resolution if (scene_->frames.size() == 0) return false;
auto &g = scene_->frames[0].get<GpuMat>(Channel::Colour);
temp_.create<GpuMat>(Channel::Colour, Format<float4>(camera.width, camera.height)); temp_.create<GpuMat>(Channel::Colour, Format<float4>(camera.width, camera.height));
temp_.create<GpuMat>(Channel::Contribution, Format<float>(camera.width, camera.height)); temp_.create<GpuMat>(Channel::Contribution, Format<float>(camera.width, camera.height));
temp_.create<GpuMat>(Channel::Depth, Format<int>(camera.width, camera.height)); temp_.create<GpuMat>(Channel::Depth, Format<int>(camera.width, camera.height));
temp_.create<GpuMat>(Channel::Depth2, Format<int>(camera.width, camera.height)); temp_.create<GpuMat>(Channel::Depth2, Format<int>(camera.width, camera.height));
temp_.create<GpuMat>(Channel::Normals, Format<float4>(camera.width, camera.height)); temp_.create<GpuMat>(Channel::Normals, Format<float4>(g.cols, g.rows));
cv::cuda::Stream cvstream = cv::cuda::StreamAccessor::wrapStream(stream); cv::cuda::Stream cvstream = cv::cuda::StreamAccessor::wrapStream(stream);
...@@ -363,7 +365,7 @@ bool Splatter::render(ftl::rgbd::VirtualSource *src, ftl::rgbd::Frame &out, cuda ...@@ -363,7 +365,7 @@ bool Splatter::render(ftl::rgbd::VirtualSource *src, ftl::rgbd::Frame &out, cuda
auto &g = f.get<GpuMat>(Channel::Colour); auto &g = f.get<GpuMat>(Channel::Colour);
ftl::cuda::normals(f.createTexture<float4>(Channel::Normals, Format<float4>(g.cols, g.rows)), ftl::cuda::normals(f.createTexture<float4>(Channel::Normals, Format<float4>(g.cols, g.rows)),
temp_.getTexture<float4>(Channel::Normals), // FIXME: Uses assumption of vcam res same as input res temp_.getTexture<float4>(Channel::Normals),
f.getTexture<float4>(Channel::Points), f.getTexture<float4>(Channel::Points),
3, 0.04f, 3, 0.04f,
s->parameters(), pose.getFloat3x3(), stream); s->parameters(), pose.getFloat3x3(), stream);
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#include <ftl/timer.hpp> #include <ftl/timer.hpp>
#include <ftl/rgbd/frame.hpp> #include <ftl/rgbd/frame.hpp>
#include <ftl/rgbd/frameset.hpp> #include <ftl/rgbd/frameset.hpp>
#include <ftl/codecs/packet.hpp>
#include <opencv2/opencv.hpp> #include <opencv2/opencv.hpp>
#include <vector> #include <vector>
...@@ -65,6 +66,22 @@ class Group { ...@@ -65,6 +66,22 @@ class Group {
*/ */
void sync(std::function<bool(FrameSet &)>); void sync(std::function<bool(FrameSet &)>);
/**
* Whenever any source within the group receives raw data, this callback
* will be called with that raw data. This is used to allow direct data
* capture (to disk) or proxy over a network without needing to re-encode.
* There is no guarantee about order or timing and the callback itself will
* need to ensure synchronisation of timestamps.
*/
void addRawCallback(std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &);
/**
* Removes a raw data callback from all sources in the group.
*/
void removeRawCallback(std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &);
inline std::vector<Source*> sources() const { return sources_; }
/** @deprecated */ /** @deprecated */
//bool getFrames(FrameSet &, bool complete=false); //bool getFrames(FrameSet &, bool complete=false);
......
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
#include <ftl/net/universe.hpp> #include <ftl/net/universe.hpp>
#include <ftl/uri.hpp> #include <ftl/uri.hpp>
#include <ftl/rgbd/detail/source.hpp> #include <ftl/rgbd/detail/source.hpp>
#include <ftl/codecs/packet.hpp>
#include <opencv2/opencv.hpp> #include <opencv2/opencv.hpp>
#include <Eigen/Eigen> #include <Eigen/Eigen>
#include <string> #include <string>
...@@ -201,9 +202,26 @@ class Source : public ftl::Configurable { ...@@ -201,9 +202,26 @@ class Source : public ftl::Configurable {
SHARED_MUTEX &mutex() { return mutex_; } SHARED_MUTEX &mutex() { return mutex_; }
std::function<void(int64_t, cv::Mat &, cv::Mat &)> &callback() { return callback_; } std::function<void(int64_t, cv::Mat &, cv::Mat &)> &callback() { return callback_; }
/**
* Set the callback that receives decoded frames as they are generated.
*/
void setCallback(std::function<void(int64_t, cv::Mat &, cv::Mat &)> cb); void setCallback(std::function<void(int64_t, cv::Mat &, cv::Mat &)> cb);
void removeCallback() { callback_ = nullptr; } void removeCallback() { callback_ = nullptr; }
/**
* Add a callback to immediately receive any raw data from this source.
* Currently this only works for a net source since other sources don't
* produce raw encoded data.
*/
void addRawCallback(const std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &);
void removeRawCallback(const std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &);
/**
* INTERNAL. Used to send raw data to callbacks.
*/
void notifyRaw(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt);
protected: protected:
detail::Source *impl_; detail::Source *impl_;
...@@ -220,6 +238,7 @@ class Source : public ftl::Configurable { ...@@ -220,6 +238,7 @@ class Source : public ftl::Configurable {
cudaStream_t stream_; cudaStream_t stream_;
int64_t timestamp_; int64_t timestamp_;
std::function<void(int64_t, cv::Mat &, cv::Mat &)> callback_; std::function<void(int64_t, cv::Mat &, cv::Mat &)> callback_;
std::list<std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)>> rawcallbacks_;
detail::Source *_createImplementation(); detail::Source *_createImplementation();
detail::Source *_createFileImpl(const ftl::URI &uri); detail::Source *_createFileImpl(const ftl::URI &uri);
......
...@@ -101,6 +101,11 @@ class Streamer : public ftl::Configurable { ...@@ -101,6 +101,11 @@ class Streamer : public ftl::Configurable {
*/ */
void add(Source *); void add(Source *);
/**
* Allow all sources in another group to be proxy streamed by this streamer.
*/
void add(ftl::rgbd::Group *grp);
void remove(Source *); void remove(Source *);
void remove(const std::string &); void remove(const std::string &);
...@@ -130,6 +135,7 @@ class Streamer : public ftl::Configurable { ...@@ -130,6 +135,7 @@ class Streamer : public ftl::Configurable {
private: private:
ftl::rgbd::Group group_; ftl::rgbd::Group group_;
std::map<std::string, detail::StreamSource*> sources_; std::map<std::string, detail::StreamSource*> sources_;
std::list<ftl::rgbd::Group*> proxy_grps_;
//ctpl::thread_pool pool_; //ctpl::thread_pool pool_;
SHARED_MUTEX mutex_; SHARED_MUTEX mutex_;
bool active_; bool active_;
...@@ -152,10 +158,17 @@ class Streamer : public ftl::Configurable { ...@@ -152,10 +158,17 @@ class Streamer : public ftl::Configurable {
ftl::codecs::device_t hq_devices_; ftl::codecs::device_t hq_devices_;
enum class Quality {
High,
Low,
Any
};
void _process(ftl::rgbd::FrameSet &); void _process(ftl::rgbd::FrameSet &);
void _cleanUp(); void _cleanUp();
void _addClient(const std::string &source, int N, int rate, const ftl::UUID &peer, const std::string &dest); void _addClient(const std::string &source, int N, int rate, const ftl::UUID &peer, const std::string &dest);
void _transmitPacket(detail::StreamSource *src, const ftl::codecs::Packet &pkt, int chan, bool hasChan2, bool hqonly); void _transmitPacket(detail::StreamSource *src, const ftl::codecs::Packet &pkt, int chan, bool hasChan2, Quality q);
void _transmitPacket(detail::StreamSource *src, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt, Quality q);
//void _encodeHQAndTransmit(detail::StreamSource *src, const cv::Mat &, const cv::Mat &, int chunk); //void _encodeHQAndTransmit(detail::StreamSource *src, const cv::Mat &, const cv::Mat &, int chunk);
//void _encodeLQAndTransmit(detail::StreamSource *src, const cv::Mat &, const cv::Mat &, int chunk); //void _encodeLQAndTransmit(detail::StreamSource *src, const cv::Mat &, const cv::Mat &, int chunk);
......
...@@ -222,9 +222,22 @@ void Group::sync(std::function<bool(ftl::rgbd::FrameSet &)> cb) { ...@@ -222,9 +222,22 @@ void Group::sync(std::function<bool(ftl::rgbd::FrameSet &)> cb) {
return true; return true;
}); });
LOG(INFO) << "Start timer";
ftl::timer::start(true); ftl::timer::start(true);
} }
void Group::addRawCallback(std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &f) {
for (auto s : sources_) {
s->addRawCallback(f);
}
}
void Group::removeRawCallback(std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &f) {
for (auto s : sources_) {
s->removeRawCallback(f);
}
}
//ftl::rgbd::FrameSet &Group::_getRelativeFrameset(int rel) { //ftl::rgbd::FrameSet &Group::_getRelativeFrameset(int rel) {
// int idx = (rel < 0) ? (head_+kFrameBufferSize+rel)%kFrameBufferSize : (head_+rel)%kFrameBufferSize; // int idx = (rel < 0) ? (head_+kFrameBufferSize+rel)%kFrameBufferSize : (head_+rel)%kFrameBufferSize;
// return framesets_[idx]; // return framesets_[idx];
......
...@@ -242,29 +242,30 @@ void NetSource::_recvPacket(short ttimeoff, const ftl::codecs::StreamPacket &spk ...@@ -242,29 +242,30 @@ void NetSource::_recvPacket(short ttimeoff, const ftl::codecs::StreamPacket &spk
int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count(); int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count();
if (!active_) return; if (!active_) return;
// Allow acccess to the raw data elsewhere...
host_->notifyRaw(spkt, pkt);
const ftl::rgbd::Channel chan = host_->getChannel(); const ftl::rgbd::Channel chan = host_->getChannel();
int rchan = spkt.channel & 0x1; int rchan = spkt.channel & 0x1;
// Ignore any unwanted second channel
if (chan == ftl::rgbd::Channel::None && rchan > 0) {
LOG(INFO) << "Unwanted channel";
//return;
// TODO: Allow decode to be skipped
}
NetFrame &frame = queue_.getFrame(spkt.timestamp, cv::Size(params_.width, params_.height), CV_8UC3, (isFloatChannel(chan) ? CV_32FC1 : CV_8UC3)); NetFrame &frame = queue_.getFrame(spkt.timestamp, cv::Size(params_.width, params_.height), CV_8UC3, (isFloatChannel(chan) ? CV_32FC1 : CV_8UC3));
// Update frame statistics // Update frame statistics
frame.tx_size += pkt.data.size(); frame.tx_size += pkt.data.size();
_createDecoder(rchan, pkt); // Ignore any unwanted second channel
auto *decoder = (rchan == 0) ? decoder_c1_ : decoder_c2_; if (!(chan == ftl::rgbd::Channel::None && rchan > 0)) {
if (!decoder) { _createDecoder(rchan, pkt);
LOG(ERROR) << "No frame decoder available"; auto *decoder = (rchan == 0) ? decoder_c1_ : decoder_c2_;
return; if (!decoder) {
} LOG(ERROR) << "No frame decoder available";
return;
}
decoder->decode(pkt, (rchan == 0) ? frame.channel1 : frame.channel2); decoder->decode(pkt, (rchan == 0) ? frame.channel1 : frame.channel2);
} else {
//LOG(INFO) << "Unwanted frame";
}
// Apply colour correction to chunk // Apply colour correction to chunk
//ftl::rgbd::colourCorrection(tmp_rgb, gamma_, temperature_); //ftl::rgbd::colourCorrection(tmp_rgb, gamma_, temperature_);
......
...@@ -310,3 +310,25 @@ void Source::setCallback(std::function<void(int64_t, cv::Mat &, cv::Mat &)> cb) ...@@ -310,3 +310,25 @@ void Source::setCallback(std::function<void(int64_t, cv::Mat &, cv::Mat &)> cb)
if (bool(callback_)) LOG(ERROR) << "Source already has a callback: " << getURI(); if (bool(callback_)) LOG(ERROR) << "Source already has a callback: " << getURI();
callback_ = cb; callback_ = cb;
} }
void Source::addRawCallback(const std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &f) {
UNIQUE_LOCK(mutex_,lk);
rawcallbacks_.push_back(f);
}
void Source::removeRawCallback(const std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &f) {
UNIQUE_LOCK(mutex_,lk);
for (auto i=rawcallbacks_.begin(); i!=rawcallbacks_.end(); ++i) {
if (i->target<void(*)(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)>() == f.target<void(*)(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)>()) {
rawcallbacks_.erase(i);
return;
}
}
}
void Source::notifyRaw(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) {
SHARED_LOCK(mutex_,lk);
for (auto &i : rawcallbacks_) {
i(this, spkt, pkt);
}
}
...@@ -174,6 +174,38 @@ void Streamer::add(Source *src) { ...@@ -174,6 +174,38 @@ void Streamer::add(Source *src) {
net_->broadcast("add_stream", src->getID()); net_->broadcast("add_stream", src->getID());
} }
void Streamer::add(ftl::rgbd::Group *grp) {
auto srcs = grp->sources();
for (auto src : srcs) {
{
UNIQUE_LOCK(mutex_,ulk);
if (sources_.find(src->getID()) != sources_.end()) return;
StreamSource *s = new StreamSource;
s->src = src;
//s->prev_depth = cv::Mat(cv::Size(src->parameters().width, src->parameters().height), CV_16SC1, 0);
s->jobs = 0;
s->frame = 0;
s->clientCount = 0;
s->hq_count = 0;
s->lq_count = 0;
sources_[src->getID()] = s;
//group_.addSource(src);
src->addRawCallback([this,s](Source *src, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) {
//LOG(INFO) << "RAW CALLBACK";
_transmitPacket(s, spkt, pkt, Quality::Any);
});
}
LOG(INFO) << "Proxy Streaming: " << src->getID();
net_->broadcast("add_stream", src->getID());
}
LOG(INFO) << "All proxy streams added";
}
void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID &peer, const string &dest) { void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID &peer, const string &dest) {
StreamSource *s = nullptr; StreamSource *s = nullptr;
...@@ -349,10 +381,11 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) { ...@@ -349,10 +381,11 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) {
// Prevent new clients during processing. // Prevent new clients during processing.
SHARED_LOCK(mutex_,slk); SHARED_LOCK(mutex_,slk);
if (fs.sources.size() != sources_.size()) { // This check is not valid, always assume fs.sources is correct
LOG(ERROR) << "Incorrect number of sources in frameset: " << fs.sources.size() << " vs " << sources_.size(); //if (fs.sources.size() != sources_.size()) {
return; // LOG(ERROR) << "Incorrect number of sources in frameset: " << fs.sources.size() << " vs " << sources_.size();
} //return;
//}
int totalclients = 0; int totalclients = 0;
...@@ -390,14 +423,15 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) { ...@@ -390,14 +423,15 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) {
// TODO: Each encode could be done in own thread // TODO: Each encode could be done in own thread
if (hasChan2) { if (hasChan2) {
enc2->encode(fs.frames[j].get<cv::Mat>(fs.sources[j]->getChannel()), src->hq_bitrate, [this,src,hasChan2](const ftl::codecs::Packet &blk){ enc2->encode(fs.frames[j].get<cv::Mat>(fs.sources[j]->getChannel()), src->hq_bitrate, [this,src,hasChan2](const ftl::codecs::Packet &blk){
_transmitPacket(src, blk, 1, hasChan2, true); _transmitPacket(src, blk, 1, hasChan2, Quality::High);
}); });
} else { } else {
if (enc2) enc2->reset(); if (enc2) enc2->reset();
} }
if (fs.timestamp % (10*ftl::timer::getInterval()) == 0) enc1->reset();
enc1->encode(fs.frames[j].get<cv::Mat>(Channel::Colour), src->hq_bitrate, [this,src,hasChan2](const ftl::codecs::Packet &blk){ enc1->encode(fs.frames[j].get<cv::Mat>(Channel::Colour), src->hq_bitrate, [this,src,hasChan2](const ftl::codecs::Packet &blk){
_transmitPacket(src, blk, 0, hasChan2, true); _transmitPacket(src, blk, 0, hasChan2, Quality::High);
}); });
} }
} }
...@@ -418,14 +452,14 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) { ...@@ -418,14 +452,14 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) {
// Receiver only waits for channel 1 by default // Receiver only waits for channel 1 by default
if (hasChan2) { if (hasChan2) {
enc2->encode(fs.frames[j].get<cv::Mat>(fs.sources[j]->getChannel()), src->lq_bitrate, [this,src,hasChan2](const ftl::codecs::Packet &blk){ enc2->encode(fs.frames[j].get<cv::Mat>(fs.sources[j]->getChannel()), src->lq_bitrate, [this,src,hasChan2](const ftl::codecs::Packet &blk){
_transmitPacket(src, blk, 1, hasChan2, false); _transmitPacket(src, blk, 1, hasChan2, Quality::Low);
}); });
} else { } else {
if (enc2) enc2->reset(); if (enc2) enc2->reset();
} }
enc1->encode(fs.frames[j].get<cv::Mat>(Channel::Colour), src->lq_bitrate, [this,src,hasChan2](const ftl::codecs::Packet &blk){ enc1->encode(fs.frames[j].get<cv::Mat>(Channel::Colour), src->lq_bitrate, [this,src,hasChan2](const ftl::codecs::Packet &blk){
_transmitPacket(src, blk, 0, hasChan2, false); _transmitPacket(src, blk, 0, hasChan2, Quality::Low);
}); });
} }
} }
...@@ -491,19 +525,24 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) { ...@@ -491,19 +525,24 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) {
} else _cleanUp(); } else _cleanUp();
} }
void Streamer::_transmitPacket(StreamSource *src, const ftl::codecs::Packet &pkt, int chan, bool hasChan2, bool hqonly) { void Streamer::_transmitPacket(StreamSource *src, const ftl::codecs::Packet &pkt, int chan, bool hasChan2, Quality q) {
ftl::codecs::StreamPacket spkt = { ftl::codecs::StreamPacket spkt = {
frame_no_, frame_no_,
static_cast<uint8_t>((chan & 0x1) | ((hasChan2) ? 0x2 : 0x0)) static_cast<uint8_t>((chan & 0x1) | ((hasChan2) ? 0x2 : 0x0))
}; };
_transmitPacket(src, spkt, pkt, q);
}
void Streamer::_transmitPacket(StreamSource *src, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt, Quality q) {
// Lock to prevent clients being added / removed // Lock to prevent clients being added / removed
//SHARED_LOCK(src->mutex,lk); //SHARED_LOCK(src->mutex,lk);
auto c = src->clients.begin(); auto c = src->clients.begin();
while (c != src->clients.end()) { while (c != src->clients.end()) {
const ftl::codecs::preset_t b = (*c).preset; const ftl::codecs::preset_t b = (*c).preset;
if ((hqonly && b >= kQualityThreshold) || (!hqonly && b < kQualityThreshold)) { if ((q == Quality::High && b >= kQualityThreshold) || (q == Quality::Low && b < kQualityThreshold)) {
++c; ++c;
LOG(INFO) << "INCORRECT QUALITY";
continue; continue;
} }
...@@ -520,7 +559,7 @@ void Streamer::_transmitPacket(StreamSource *src, const ftl::codecs::Packet &pkt ...@@ -520,7 +559,7 @@ void Streamer::_transmitPacket(StreamSource *src, const ftl::codecs::Packet &pkt
(*c).txcount = (*c).txmax; (*c).txcount = (*c).txmax;
} else { } else {
// Count frame as completed only if last block and channel is 0 // Count frame as completed only if last block and channel is 0
if (pkt.block_number == pkt.block_total - 1 && chan == 0) ++(*c).txcount; if (pkt.block_number == pkt.block_total - 1 && spkt.channel & 0x1 == 0) ++(*c).txcount;
} }
} catch(...) { } catch(...) {
(*c).txcount = (*c).txmax; (*c).txcount = (*c).txmax;
......