Skip to content
Snippets Groups Projects

Add configuration to stream, H264 support, codec selection

Merged Nicolas Pope requested to merge feature/configstream into master
2 files
+ 21
105
Compare changes
  • Side-by-side
  • Inline
Files
2
@@ -52,8 +52,8 @@ NetFrame &NetFrameQueue::getFrame(int64_t ts, const cv::Size &s, int c1type, int
@@ -52,8 +52,8 @@ NetFrame &NetFrameQueue::getFrame(int64_t ts, const cv::Size &s, int c1type, int
f.chunk_total[1] = 0;
f.chunk_total[1] = 0;
f.channel_count = 0;
f.channel_count = 0;
f.tx_size = 0;
f.tx_size = 0;
f.channel1.create(s, c1type);
f.channel[0].create(s, c1type);
f.channel2.create(s, c2type);
f.channel[1].create(s, c2type);
return f;
return f;
}
}
oldest = (f.timestamp < oldest) ? f.timestamp : oldest;
oldest = (f.timestamp < oldest) ? f.timestamp : oldest;
@@ -72,8 +72,8 @@ NetFrame &NetFrameQueue::getFrame(int64_t ts, const cv::Size &s, int c1type, int
@@ -72,8 +72,8 @@ NetFrame &NetFrameQueue::getFrame(int64_t ts, const cv::Size &s, int c1type, int
f.chunk_total[1] = 0;
f.chunk_total[1] = 0;
f.channel_count = 0;
f.channel_count = 0;
f.tx_size = 0;
f.tx_size = 0;
f.channel1.create(s, c1type);
f.channel[0].create(s, c1type);
f.channel2.create(s, c2type);
f.channel[1].create(s, c2type);
return f;
return f;
}
}
}
}
@@ -90,55 +90,6 @@ void NetFrameQueue::freeFrame(NetFrame &f) {
@@ -90,55 +90,6 @@ void NetFrameQueue::freeFrame(NetFrame &f) {
// ===== NetSource =============================================================
// ===== NetSource =============================================================
/*bool NetSource::_getCalibration(Universe &net, const UUID &peer, const string &src, ftl::rgbd::Camera &p, ftl::codecs::Channel chan) {
try {
while(true) {
auto [cap,buf] = net.call<tuple<unsigned int,vector<unsigned char>>>(peer_, "source_details", src, chan);
capabilities_ = cap;
if (buf.size() > 0) {
memcpy((char*)&p, buf.data(), buf.size());
if (sizeof(p) != buf.size()) {
LOG(ERROR) << "Corrupted calibration";
return false;
}
LOG(INFO) << "Calibration received: " << p.cx << ", " << p.cy << ", " << p.fx << ", " << p.fy;
if (chan == Channel::Left) {
// Put calibration into config manually
host_->getConfig()["focal"] = p.fx;
host_->getConfig()["centre_x"] = p.cx;
host_->getConfig()["centre_y"] = p.cy;
host_->getConfig()["baseline"] = p.baseline;
host_->getConfig()["doffs"] = p.doffs;
} else {
host_->getConfig()["focal_right"] = p.fx;
host_->getConfig()["centre_x_right"] = p.cx;
host_->getConfig()["centre_y_right"] = p.cy;
host_->getConfig()["baseline_right"] = p.baseline;
host_->getConfig()["doffs_right"] = p.doffs;
}
return true;
} else {
LOG(INFO) << "Could not get calibration, retrying";
sleep_for(milliseconds(500));
}
}
} catch (const std::exception& ex) {
LOG(ERROR) << "Exception: " << ex.what();
return false;
} catch (...) {
LOG(ERROR) << "Unknown exception";
return false;
}
}*/
NetSource::NetSource(ftl::rgbd::Source *host)
NetSource::NetSource(ftl::rgbd::Source *host)
: ftl::rgbd::detail::Source(host), active_(false), minB_(9), maxN_(1), adaptive_(0), queue_(3) {
: ftl::rgbd::detail::Source(host), active_(false), minB_(9), maxN_(1), adaptive_(0), queue_(3) {
@@ -149,8 +100,8 @@ NetSource::NetSource(ftl::rgbd::Source *host)
@@ -149,8 +100,8 @@ NetSource::NetSource(ftl::rgbd::Source *host)
params_right_.width = 0;
params_right_.width = 0;
has_calibration_ = false;
has_calibration_ = false;
decoder_c1_ = nullptr;
decoder_[0] = nullptr;
decoder_c2_ = nullptr;
decoder_[1] = nullptr;
host->on("gamma", [this,host](const ftl::config::Event&) {
host->on("gamma", [this,host](const ftl::config::Event&) {
gamma_ = host->value("gamma", 1.0f);
gamma_ = host->value("gamma", 1.0f);
@@ -222,8 +173,8 @@ NetSource::NetSource(ftl::rgbd::Source *host)
@@ -222,8 +173,8 @@ NetSource::NetSource(ftl::rgbd::Source *host)
}
}
NetSource::~NetSource() {
NetSource::~NetSource() {
if (decoder_c1_) ftl::codecs::free(decoder_c1_);
if (decoder_[0]) ftl::codecs::free(decoder_[0]);
if (decoder_c2_) ftl::codecs::free(decoder_c2_);
if (decoder_[1]) ftl::codecs::free(decoder_[1]);
if (uri_.size() > 0) {
if (uri_.size() > 0) {
host_->getNet()->unbind(uri_);
host_->getNet()->unbind(uri_);
@@ -261,35 +212,29 @@ NetSource::~NetSource() {
@@ -261,35 +212,29 @@ NetSource::~NetSource() {
void NetSource::_createDecoder(int chan, const ftl::codecs::Packet &pkt) {
void NetSource::_createDecoder(int chan, const ftl::codecs::Packet &pkt) {
UNIQUE_LOCK(mutex_,lk);
UNIQUE_LOCK(mutex_,lk);
auto *decoder = (chan == 0) ? decoder_c1_ : decoder_c2_;
auto *decoder = decoder_[chan];
if (decoder) {
if (decoder) {
if (!decoder->accepts(pkt)) {
if (!decoder->accepts(pkt)) {
ftl::codecs::free((chan == 0) ? decoder_c1_ : decoder_c2_);
ftl::codecs::free(decoder_[chan]);
} else {
} else {
return;
return;
}
}
}
}
if (chan == 0) {
decoder_[chan] = ftl::codecs::allocateDecoder(pkt);
decoder_c1_ = ftl::codecs::allocateDecoder(pkt);
} else {
decoder_c2_ = ftl::codecs::allocateDecoder(pkt);
}
}
}
void NetSource::_recvPacket(short ttimeoff, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) {
// Capture time here for better net latency estimate
int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count();
if (!active_) return;
// Allow acccess to the raw data elsewhere...
void NetSource::_processConfig(const ftl::codecs::Packet &pkt) {
host_->notifyRaw(spkt, pkt);
std::tuple<std::string, std::string> cfg;
 
auto unpacked = msgpack::unpack((const char*)pkt.data.data(), pkt.data.size());
 
unpacked.get().convert(cfg);
const ftl::codecs::Channel chan = host_->getChannel();
LOG(INFO) << "Config Received: " << std::get<1>(cfg);
const ftl::codecs::Channel rchan = spkt.channel;
// TODO: This needs to be put in safer / better location
const int channum = (rchan == Channel::Colour) ? 0 : 1;
host_->set(std::get<0>(cfg), nlohmann::json::parse(std::get<1>(cfg)));
 
}
if (rchan == Channel::Calibration) {
void NetSource::_processCalibration(const ftl::codecs::Packet &pkt) {
std::tuple<ftl::rgbd::Camera, ftl::codecs::Channel, ftl::rgbd::capability_t> params;
std::tuple<ftl::rgbd::Camera, ftl::codecs::Channel, ftl::rgbd::capability_t> params;
auto unpacked = msgpack::unpack((const char*)pkt.data.data(), pkt.data.size());
auto unpacked = msgpack::unpack((const char*)pkt.data.data(), pkt.data.size());
unpacked.get().convert(params);
unpacked.get().convert(params);
@@ -298,19 +243,33 @@ void NetSource::_recvPacket(short ttimeoff, const ftl::codecs::StreamPacket &spk
@@ -298,19 +243,33 @@ void NetSource::_recvPacket(short ttimeoff, const ftl::codecs::StreamPacket &spk
params_ = std::get<0>(params);
params_ = std::get<0>(params);
capabilities_ = std::get<2>(params);
capabilities_ = std::get<2>(params);
has_calibration_ = true;
has_calibration_ = true;
//rgb_ = cv::Mat(cv::Size(params_.width, params_.height), CV_8UC3, cv::Scalar(0,0,0));
//depth_ = cv::Mat(cv::Size(params_.width, params_.height), CV_32FC1, 0.0f);
LOG(INFO) << "Got Calibration channel: " << params_.width << "x" << params_.height;
LOG(INFO) << "Got Calibration channel: " << params_.width << "x" << params_.height;
} else {
} else {
params_right_ = std::get<0>(params);
params_right_ = std::get<0>(params);
}
}
 
}
return;
void NetSource::_processPose(const ftl::codecs::Packet &pkt) {
} else if (rchan == Channel::Pose) {
LOG(INFO) << "Got POSE channel";
LOG(INFO) << "Got POSE channel";
return;
}
 
 
void NetSource::_recvPacket(short ttimeoff, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) {
 
// Capture time here for better net latency estimate
 
int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count();
 
if (!active_) return;
 
 
// Allow acccess to the raw data elsewhere...
 
host_->notifyRaw(spkt, pkt);
 
 
const ftl::codecs::Channel chan = host_->getChannel();
 
const ftl::codecs::Channel rchan = spkt.channel;
 
const int channum = (rchan == Channel::Colour) ? 0 : 1;
 
 
// Deal with the special channels...
 
switch (rchan) {
 
case Channel::Configuration : _processConfig(pkt); return;
 
case Channel::Calibration : _processCalibration(pkt); return;
 
case Channel::Pose : _processPose(pkt); return;
}
}
if (!has_calibration_) {
if (!has_calibration_) {
@@ -326,19 +285,19 @@ void NetSource::_recvPacket(short ttimeoff, const ftl::codecs::StreamPacket &spk
@@ -326,19 +285,19 @@ void NetSource::_recvPacket(short ttimeoff, const ftl::codecs::StreamPacket &spk
// Only decode if this channel is wanted.
// Only decode if this channel is wanted.
if (rchan == Channel::Colour || rchan == chan) {
if (rchan == Channel::Colour || rchan == chan) {
_createDecoder(channum, pkt);
_createDecoder(channum, pkt);
auto *decoder = (rchan == Channel::Colour) ? decoder_c1_ : decoder_c2_;
auto *decoder = decoder_[channum];
if (!decoder) {
if (!decoder) {
LOG(ERROR) << "No frame decoder available";
LOG(ERROR) << "No frame decoder available";
return;
return;
}
}
decoder->decode(pkt, (rchan == Channel::Colour) ? frame.channel1 : frame.channel2);
decoder->decode(pkt, frame.channel[channum]);
} else if (chan != Channel::None && rchan != Channel::Colour) {
} else if (chan != Channel::None && rchan != Channel::Colour) {
// Didn't receive correct second channel so just clear the images
// Didn't receive correct second channel so just clear the images
if (isFloatChannel(chan)) {
if (isFloatChannel(chan)) {
frame.channel2.setTo(cv::Scalar(0.0f));
frame.channel[1].setTo(cv::Scalar(0.0f));
} else {
} else {
frame.channel2.setTo(cv::Scalar(0,0,0));
frame.channel[1].setTo(cv::Scalar(0,0,0));
}
}
}
}
@@ -358,51 +317,41 @@ void NetSource::_recvPacket(short ttimeoff, const ftl::codecs::StreamPacket &spk
@@ -358,51 +317,41 @@ void NetSource::_recvPacket(short ttimeoff, const ftl::codecs::StreamPacket &spk
}
}
++frame.chunk_count[channum];
++frame.chunk_count[channum];
++frame.channel_count;
if (frame.chunk_count[channum] == frame.chunk_total[channum]) ++frame.channel_count;
if (frame.chunk_count[channum] > frame.chunk_total[channum]) LOG(FATAL) << "TOO MANY CHUNKS";
if (frame.chunk_count[channum] > frame.chunk_total[channum]) LOG(FATAL) << "TOO MANY CHUNKS";
// Capture tx time of first received chunk
// Capture tx time of first received chunk
if (frame.channel_count == 1 && frame.chunk_count[channum] == 1) {
// FIXME: This seems broken
 
if (channum == 1 && frame.chunk_count[channum] == 1) {
UNIQUE_LOCK(frame.mtx, flk);
UNIQUE_LOCK(frame.mtx, flk);
if (frame.chunk_count[channum] == 1) {
if (frame.chunk_count[channum] == 1) {
frame.tx_latency = int64_t(ttimeoff);
frame.tx_latency = int64_t(ttimeoff);
}
}
}
}
// Last chunk of both channels now received
// Last chunk of both channels now received, so we are done.
if (frame.channel_count == spkt.channel_count &&
if (frame.channel_count == spkt.channel_count) {
frame.chunk_count[0] == frame.chunk_total[0] &&
_completeFrame(frame, now-(spkt.timestamp+frame.tx_latency));
frame.chunk_count[1] == frame.chunk_total[1]) {
}
 
}
 
 
void NetSource::_completeFrame(NetFrame &frame, int64_t latency) {
UNIQUE_LOCK(frame.mtx, flk);
UNIQUE_LOCK(frame.mtx, flk);
if (frame.timestamp >= 0 && frame.chunk_count[0] == frame.chunk_total[0] && frame.chunk_count[1] == frame.chunk_total[1]) {
// Frame must not have already been freed.
 
if (frame.timestamp >= 0) {
timestamp_ = frame.timestamp;
timestamp_ = frame.timestamp;
frame.tx_latency = now-(spkt.timestamp+frame.tx_latency);
frame.tx_latency = latency;
 
// Note: Not used currently
adaptive_ = abr_.selectBitrate(frame);
adaptive_ = abr_.selectBitrate(frame);
//LOG(INFO) << "Frame finished: " << frame.timestamp;
host_->notify(frame.timestamp, frame.channel1, frame.channel2);
/*auto cb = host_->callback();
if (cb) {
try {
cb(frame.timestamp, frame.channel1, frame.channel2);
} catch (...) {
LOG(ERROR) << "Exception in net frame callback";
}
} else {
LOG(ERROR) << "NO FRAME CALLBACK";
}*/
queue_.freeFrame(frame);
host_->notify(frame.timestamp, frame.channel[0], frame.channel[1]);
{
queue_.freeFrame(frame);
// Decrement expected frame counter
N_--;
N_--;
}
}
}
}
}
}
void NetSource::setPose(const Eigen::Matrix4d &pose) {
void NetSource::setPose(const Eigen::Matrix4d &pose) {
if (!active_) return;
if (!active_) return;
@@ -420,12 +369,6 @@ void NetSource::setPose(const Eigen::Matrix4d &pose) {
@@ -420,12 +369,6 @@ void NetSource::setPose(const Eigen::Matrix4d &pose) {
ftl::rgbd::Camera NetSource::parameters(ftl::codecs::Channel chan) {
ftl::rgbd::Camera NetSource::parameters(ftl::codecs::Channel chan) {
if (chan == ftl::codecs::Channel::Right) {
if (chan == ftl::codecs::Channel::Right) {
/*if (params_right_.width == 0) {
auto uri = host_->get<string>("uri");
if (!uri) return params_;
_getCalibration(*host_->getNet(), peer_, *uri, params_right_, chan);
}*/
return params_right_;
return params_right_;
} else {
} else {
return params_;
return params_;
@@ -450,27 +393,11 @@ void NetSource::_updateURI() {
@@ -450,27 +393,11 @@ void NetSource::_updateURI() {
}
}
peer_ = *p;
peer_ = *p;
//has_calibration_ = _getCalibration(*host_->getNet(), peer_, *uri, params_, ftl::codecs::Channel::Left);
//_getCalibration(*host_->getNet(), peer_, *uri, params_right_, ftl::codecs::Channel::Right);
host_->getNet()->bind(*uri, [this](short ttimeoff, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) {
host_->getNet()->bind(*uri, [this](short ttimeoff, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) {
//if (chunk == -1) {
//#ifdef HAVE_NVPIPE
//_recvVideo(frame, ttimeoff, bitrate, jpg, d);
//#else
//LOG(ERROR) << "Cannot receive HEVC, no NvPipe support";
//#endif
//} else {
//_recvChunk(frame, ttimeoff, bitrate, chunk, jpg, d);
_recvPacket(ttimeoff, spkt, pkt);
_recvPacket(ttimeoff, spkt, pkt);
//}
});
});
N_ = 0;
N_ = 0;
//d_rgb_ = cv::Mat(cv::Size(params_.width, params_.height), CV_8UC3, cv::Scalar(0,0,0));
//d_depth_ = cv::Mat(cv::Size(params_.width, params_.height), CV_32FC1, 0.0f);
uri_ = *uri;
uri_ = *uri;
active_ = true;
active_ = true;
} else {
} else {
Loading