Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • nicolaspope/ftl
1 result
Show changes
Commits on Source (4)
......@@ -160,6 +160,7 @@ static void run(ftl::Configurable *root) {
}
stream->setLatency(5); // FIXME: This depends on source!?
stream->add(&group);
stream->run();
bool busy = false;
......
......@@ -307,12 +307,14 @@ bool Splatter::render(ftl::rgbd::VirtualSource *src, ftl::rgbd::Frame &out, cuda
out.create<GpuMat>(Channel::Depth, Format<float>(camera.width, camera.height));
out.create<GpuMat>(Channel::Colour, Format<uchar4>(camera.width, camera.height));
// FIXME: Use source resolutions, not virtual resolution
if (scene_->frames.size() == 0) return false;
auto &g = scene_->frames[0].get<GpuMat>(Channel::Colour);
temp_.create<GpuMat>(Channel::Colour, Format<float4>(camera.width, camera.height));
temp_.create<GpuMat>(Channel::Contribution, Format<float>(camera.width, camera.height));
temp_.create<GpuMat>(Channel::Depth, Format<int>(camera.width, camera.height));
temp_.create<GpuMat>(Channel::Depth2, Format<int>(camera.width, camera.height));
temp_.create<GpuMat>(Channel::Normals, Format<float4>(camera.width, camera.height));
temp_.create<GpuMat>(Channel::Normals, Format<float4>(g.cols, g.rows));
cv::cuda::Stream cvstream = cv::cuda::StreamAccessor::wrapStream(stream);
......@@ -363,7 +365,7 @@ bool Splatter::render(ftl::rgbd::VirtualSource *src, ftl::rgbd::Frame &out, cuda
auto &g = f.get<GpuMat>(Channel::Colour);
ftl::cuda::normals(f.createTexture<float4>(Channel::Normals, Format<float4>(g.cols, g.rows)),
temp_.getTexture<float4>(Channel::Normals), // FIXME: Uses assumption of vcam res same as input res
temp_.getTexture<float4>(Channel::Normals),
f.getTexture<float4>(Channel::Points),
3, 0.04f,
s->parameters(), pose.getFloat3x3(), stream);
......
......@@ -6,6 +6,7 @@
#include <ftl/timer.hpp>
#include <ftl/rgbd/frame.hpp>
#include <ftl/rgbd/frameset.hpp>
#include <ftl/codecs/packet.hpp>
#include <opencv2/opencv.hpp>
#include <vector>
......@@ -65,6 +66,22 @@ class Group {
*/
void sync(std::function<bool(FrameSet &)>);
/**
* Whenever any source within the group receives raw data, this callback
* will be called with that raw data. This is used to allow direct data
* capture (to disk) or proxy over a network without needing to re-encode.
* There is no guarantee about order or timing and the callback itself will
* need to ensure synchronisation of timestamps.
*/
void addRawCallback(std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &);
/**
* Removes a raw data callback from all sources in the group.
*/
void removeRawCallback(std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &);
inline std::vector<Source*> sources() const { return sources_; }
/** @deprecated */
//bool getFrames(FrameSet &, bool complete=false);
......
......@@ -8,6 +8,7 @@
#include <ftl/net/universe.hpp>
#include <ftl/uri.hpp>
#include <ftl/rgbd/detail/source.hpp>
#include <ftl/codecs/packet.hpp>
#include <opencv2/opencv.hpp>
#include <Eigen/Eigen>
#include <string>
......@@ -201,9 +202,26 @@ class Source : public ftl::Configurable {
SHARED_MUTEX &mutex() { return mutex_; }
std::function<void(int64_t, cv::Mat &, cv::Mat &)> &callback() { return callback_; }
/**
* Set the callback that receives decoded frames as they are generated.
*/
void setCallback(std::function<void(int64_t, cv::Mat &, cv::Mat &)> cb);
void removeCallback() { callback_ = nullptr; }
/**
* Add a callback to immediately receive any raw data from this source.
* Currently this only works for a net source since other sources don't
* produce raw encoded data.
*/
void addRawCallback(const std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &);
void removeRawCallback(const std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &);
/**
* INTERNAL. Used to send raw data to callbacks.
*/
void notifyRaw(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt);
protected:
detail::Source *impl_;
......@@ -220,6 +238,7 @@ class Source : public ftl::Configurable {
cudaStream_t stream_;
int64_t timestamp_;
std::function<void(int64_t, cv::Mat &, cv::Mat &)> callback_;
std::list<std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)>> rawcallbacks_;
detail::Source *_createImplementation();
detail::Source *_createFileImpl(const ftl::URI &uri);
......
......@@ -101,6 +101,11 @@ class Streamer : public ftl::Configurable {
*/
void add(Source *);
/**
* Allow all sources in another group to be proxy streamed by this streamer.
*/
void add(ftl::rgbd::Group *grp);
void remove(Source *);
void remove(const std::string &);
......@@ -130,6 +135,7 @@ class Streamer : public ftl::Configurable {
private:
ftl::rgbd::Group group_;
std::map<std::string, detail::StreamSource*> sources_;
std::list<ftl::rgbd::Group*> proxy_grps_;
//ctpl::thread_pool pool_;
SHARED_MUTEX mutex_;
bool active_;
......@@ -152,10 +158,17 @@ class Streamer : public ftl::Configurable {
ftl::codecs::device_t hq_devices_;
enum class Quality {
High,
Low,
Any
};
void _process(ftl::rgbd::FrameSet &);
void _cleanUp();
void _addClient(const std::string &source, int N, int rate, const ftl::UUID &peer, const std::string &dest);
void _transmitPacket(detail::StreamSource *src, const ftl::codecs::Packet &pkt, int chan, bool hasChan2, bool hqonly);
void _transmitPacket(detail::StreamSource *src, const ftl::codecs::Packet &pkt, int chan, bool hasChan2, Quality q);
void _transmitPacket(detail::StreamSource *src, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt, Quality q);
//void _encodeHQAndTransmit(detail::StreamSource *src, const cv::Mat &, const cv::Mat &, int chunk);
//void _encodeLQAndTransmit(detail::StreamSource *src, const cv::Mat &, const cv::Mat &, int chunk);
......
......@@ -222,9 +222,22 @@ void Group::sync(std::function<bool(ftl::rgbd::FrameSet &)> cb) {
return true;
});
LOG(INFO) << "Start timer";
ftl::timer::start(true);
}
void Group::addRawCallback(std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &f) {
for (auto s : sources_) {
s->addRawCallback(f);
}
}
void Group::removeRawCallback(std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &f) {
for (auto s : sources_) {
s->removeRawCallback(f);
}
}
//ftl::rgbd::FrameSet &Group::_getRelativeFrameset(int rel) {
// int idx = (rel < 0) ? (head_+kFrameBufferSize+rel)%kFrameBufferSize : (head_+rel)%kFrameBufferSize;
// return framesets_[idx];
......
......@@ -242,29 +242,30 @@ void NetSource::_recvPacket(short ttimeoff, const ftl::codecs::StreamPacket &spk
int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count();
if (!active_) return;
// Allow acccess to the raw data elsewhere...
host_->notifyRaw(spkt, pkt);
const ftl::rgbd::Channel chan = host_->getChannel();
int rchan = spkt.channel & 0x1;
// Ignore any unwanted second channel
if (chan == ftl::rgbd::Channel::None && rchan > 0) {
LOG(INFO) << "Unwanted channel";
//return;
// TODO: Allow decode to be skipped
}
NetFrame &frame = queue_.getFrame(spkt.timestamp, cv::Size(params_.width, params_.height), CV_8UC3, (isFloatChannel(chan) ? CV_32FC1 : CV_8UC3));
// Update frame statistics
frame.tx_size += pkt.data.size();
_createDecoder(rchan, pkt);
auto *decoder = (rchan == 0) ? decoder_c1_ : decoder_c2_;
if (!decoder) {
LOG(ERROR) << "No frame decoder available";
return;
}
// Ignore any unwanted second channel
if (!(chan == ftl::rgbd::Channel::None && rchan > 0)) {
_createDecoder(rchan, pkt);
auto *decoder = (rchan == 0) ? decoder_c1_ : decoder_c2_;
if (!decoder) {
LOG(ERROR) << "No frame decoder available";
return;
}
decoder->decode(pkt, (rchan == 0) ? frame.channel1 : frame.channel2);
decoder->decode(pkt, (rchan == 0) ? frame.channel1 : frame.channel2);
} else {
//LOG(INFO) << "Unwanted frame";
}
// Apply colour correction to chunk
//ftl::rgbd::colourCorrection(tmp_rgb, gamma_, temperature_);
......
......@@ -310,3 +310,25 @@ void Source::setCallback(std::function<void(int64_t, cv::Mat &, cv::Mat &)> cb)
if (bool(callback_)) LOG(ERROR) << "Source already has a callback: " << getURI();
callback_ = cb;
}
void Source::addRawCallback(const std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &f) {
UNIQUE_LOCK(mutex_,lk);
rawcallbacks_.push_back(f);
}
void Source::removeRawCallback(const std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &f) {
UNIQUE_LOCK(mutex_,lk);
for (auto i=rawcallbacks_.begin(); i!=rawcallbacks_.end(); ++i) {
if (i->target<void(*)(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)>() == f.target<void(*)(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)>()) {
rawcallbacks_.erase(i);
return;
}
}
}
void Source::notifyRaw(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) {
SHARED_LOCK(mutex_,lk);
for (auto &i : rawcallbacks_) {
i(this, spkt, pkt);
}
}
......@@ -174,6 +174,38 @@ void Streamer::add(Source *src) {
net_->broadcast("add_stream", src->getID());
}
void Streamer::add(ftl::rgbd::Group *grp) {
auto srcs = grp->sources();
for (auto src : srcs) {
{
UNIQUE_LOCK(mutex_,ulk);
if (sources_.find(src->getID()) != sources_.end()) return;
StreamSource *s = new StreamSource;
s->src = src;
//s->prev_depth = cv::Mat(cv::Size(src->parameters().width, src->parameters().height), CV_16SC1, 0);
s->jobs = 0;
s->frame = 0;
s->clientCount = 0;
s->hq_count = 0;
s->lq_count = 0;
sources_[src->getID()] = s;
//group_.addSource(src);
src->addRawCallback([this,s](Source *src, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) {
//LOG(INFO) << "RAW CALLBACK";
_transmitPacket(s, spkt, pkt, Quality::Any);
});
}
LOG(INFO) << "Proxy Streaming: " << src->getID();
net_->broadcast("add_stream", src->getID());
}
LOG(INFO) << "All proxy streams added";
}
void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID &peer, const string &dest) {
StreamSource *s = nullptr;
......@@ -349,10 +381,11 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) {
// Prevent new clients during processing.
SHARED_LOCK(mutex_,slk);
if (fs.sources.size() != sources_.size()) {
LOG(ERROR) << "Incorrect number of sources in frameset: " << fs.sources.size() << " vs " << sources_.size();
return;
}
// This check is not valid, always assume fs.sources is correct
//if (fs.sources.size() != sources_.size()) {
// LOG(ERROR) << "Incorrect number of sources in frameset: " << fs.sources.size() << " vs " << sources_.size();
//return;
//}
int totalclients = 0;
......@@ -390,14 +423,15 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) {
// TODO: Each encode could be done in own thread
if (hasChan2) {
enc2->encode(fs.frames[j].get<cv::Mat>(fs.sources[j]->getChannel()), src->hq_bitrate, [this,src,hasChan2](const ftl::codecs::Packet &blk){
_transmitPacket(src, blk, 1, hasChan2, true);
_transmitPacket(src, blk, 1, hasChan2, Quality::High);
});
} else {
if (enc2) enc2->reset();
}
if (fs.timestamp % (10*ftl::timer::getInterval()) == 0) enc1->reset();
enc1->encode(fs.frames[j].get<cv::Mat>(Channel::Colour), src->hq_bitrate, [this,src,hasChan2](const ftl::codecs::Packet &blk){
_transmitPacket(src, blk, 0, hasChan2, true);
_transmitPacket(src, blk, 0, hasChan2, Quality::High);
});
}
}
......@@ -418,14 +452,14 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) {
// Receiver only waits for channel 1 by default
if (hasChan2) {
enc2->encode(fs.frames[j].get<cv::Mat>(fs.sources[j]->getChannel()), src->lq_bitrate, [this,src,hasChan2](const ftl::codecs::Packet &blk){
_transmitPacket(src, blk, 1, hasChan2, false);
_transmitPacket(src, blk, 1, hasChan2, Quality::Low);
});
} else {
if (enc2) enc2->reset();
}
enc1->encode(fs.frames[j].get<cv::Mat>(Channel::Colour), src->lq_bitrate, [this,src,hasChan2](const ftl::codecs::Packet &blk){
_transmitPacket(src, blk, 0, hasChan2, false);
_transmitPacket(src, blk, 0, hasChan2, Quality::Low);
});
}
}
......@@ -491,19 +525,24 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) {
} else _cleanUp();
}
void Streamer::_transmitPacket(StreamSource *src, const ftl::codecs::Packet &pkt, int chan, bool hasChan2, bool hqonly) {
void Streamer::_transmitPacket(StreamSource *src, const ftl::codecs::Packet &pkt, int chan, bool hasChan2, Quality q) {
ftl::codecs::StreamPacket spkt = {
frame_no_,
static_cast<uint8_t>((chan & 0x1) | ((hasChan2) ? 0x2 : 0x0))
};
_transmitPacket(src, spkt, pkt, q);
}
void Streamer::_transmitPacket(StreamSource *src, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt, Quality q) {
// Lock to prevent clients being added / removed
//SHARED_LOCK(src->mutex,lk);
auto c = src->clients.begin();
while (c != src->clients.end()) {
const ftl::codecs::preset_t b = (*c).preset;
if ((hqonly && b >= kQualityThreshold) || (!hqonly && b < kQualityThreshold)) {
if ((q == Quality::High && b >= kQualityThreshold) || (q == Quality::Low && b < kQualityThreshold)) {
++c;
LOG(INFO) << "INCORRECT QUALITY";
continue;
}
......@@ -520,7 +559,7 @@ void Streamer::_transmitPacket(StreamSource *src, const ftl::codecs::Packet &pkt
(*c).txcount = (*c).txmax;
} else {
// Count frame as completed only if last block and channel is 0
if (pkt.block_number == pkt.block_total - 1 && chan == 0) ++(*c).txcount;
if (pkt.block_number == pkt.block_total - 1 && spkt.channel & 0x1 == 0) ++(*c).txcount;
}
} catch(...) {
(*c).txcount = (*c).txmax;
......