Newer
Older
#include <vector>
#include <optional>
#include <thread>
#include <chrono>
using ftl::rgbd::detail::StreamSource;
using ftl::rgbd::detail::StreamClient;
using ftl::rgbd::detail::ABRController;
using ftl::codecs::definition_t;
using ftl::codecs::device_t;
using ftl::net::Universe;
using std::string;
using std::list;
using std::map;
using std::optional;
using std::vector;
using std::this_thread::sleep_for;
using std::chrono::milliseconds;
using std::tuple;
using std::make_tuple;
static const ftl::codecs::preset_t kQualityThreshold = ftl::codecs::kPresetLQThreshold;
Streamer::Streamer(nlohmann::json &config, Universe *net)
: ftl::Configurable(config), late_(false) {
mspf_ = ftl::timer::getInterval(); //1000 / value("fps", 20);
//last_dropped_ = 0;
//drop_count_ = 0;
encode_mode_ = ftl::rgbd::kEncodeVideo;
hq_devices_ = (value("disable_hardware_encode", false)) ? device_t::Software : device_t::Any;
hq_codec_ = value("video_codec", ftl::codecs::codec_t::Any);
//group_.setFPS(value("fps", 20));
group_.setName("NetStreamer");
net->bind("find_stream", [this](const std::string &uri) -> optional<ftl::UUID> {
if (sources_.find(uri) != sources_.end()) {
LOG(INFO) << "Valid source request received: " << uri;
return net_->id();
} else return {};
});
net->bind("list_streams", [this]() -> vector<string> {
vector<string> streams;
for (auto &i : sources_) {
streams.push_back(i.first);
}
return streams;
});
net->bind("set_pose", [this](const std::string &uri, const std::vector<unsigned char> &buf) {
memcpy(pose.data(), buf.data(), buf.size());
sources_[uri]->src->setPose(pose);
}
});
net->bind("get_pose", [this](const std::string &uri) -> std::vector<unsigned char> {
SHARED_LOCK(mutex_,slk);
if (sources_.find(uri) != sources_.end()) {
Eigen::Matrix4d pose = sources_[uri]->src->getPose();
vector<unsigned char> vec((unsigned char*)pose.data(), (unsigned char*)(pose.data()+(pose.size())));
return vec;
} else {
LOG(WARNING) << "Requested pose not found: " << uri;
return {};
}
});
// Allow remote users to access camera calibration matrix
net->bind("source_details", [this](const std::string &uri, ftl::codecs::Channel chan) -> tuple<unsigned int,vector<unsigned char>> {
if (sources_.find(uri) != sources_.end()) {
LOG(INFO) << "Calib buf size = " << buf.size();
auto params = sources_[uri]->src->parameters(chan);
memcpy(buf.data(), ¶ms, buf.size());
return make_tuple(sources_[uri]->src->getCapabilities(), buf);
} else {
return make_tuple(0u,buf);
}
});
net->bind("get_stream", [this](const string &source, int N, int rate, const UUID &peer, const string &dest) {
_addClient(source, N, rate, peer, dest);
});
net->bind("set_channel", [this](const string &uri, Channel chan) {
SHARED_LOCK(mutex_,slk);
if (sources_.find(uri) != sources_.end()) {
//net->bind("sync_streams", [this](unsigned long long time) {
//net->bind("ping_streamer", [this](unsigned long long time) -> unsigned long long {
// return time;
//});
on("hq_bitrate", [this](const ftl::config::Event &e) {
UNIQUE_LOCK(mutex_,ulk);
for (auto &s : sources_) {
s.second->hq_bitrate = value("hq_bitrate", ftl::codecs::kPresetBest);
}
});
on("video_codec", [this](const ftl::config::Event &e) {
UNIQUE_LOCK(mutex_,ulk);
hq_codec_ = value("video_codec", ftl::codecs::codec_t::Any);
for (auto &s : sources_) {
if (s.second->hq_encoder_c1) ftl::codecs::free(s.second->hq_encoder_c1);
if (s.second->hq_encoder_c2) ftl::codecs::free(s.second->hq_encoder_c2);
s.second->hq_encoder_c1 = nullptr;
s.second->hq_encoder_c2 = nullptr;
}
});
on("lq_bitrate", [this](const ftl::config::Event &e) {
UNIQUE_LOCK(mutex_,ulk);
for (auto &s : sources_) {
s.second->lq_bitrate = value("lq_bitrate", ftl::codecs::kPresetWorst);
}
});
insert_iframes_ = value("insert_iframes", false);
on("insert_iframes", [this](const ftl::config::Event &e) {
insert_iframes_ = value("insert_iframes", false);
});
net_->unbind("find_stream");
net_->unbind("list_streams");
net_->unbind("source_calibration");
net_->unbind("get_stream");
net_->unbind("sync_streams");
net_->unbind("ping_streamer");
{
UNIQUE_LOCK(mutex_,ulk);
for (auto &s : sources_) {
StreamSource *src = s.second;
src->clientCount = 0;
}
}
_cleanUp();
{
UNIQUE_LOCK(mutex_,ulk);
sources_.clear();
}
{
UNIQUE_LOCK(mutex_,ulk);
if (sources_.find(src->getID()) != sources_.end()) return;
StreamSource *s = new StreamSource;
s->src = src;
//s->prev_depth = cv::Mat(cv::Size(src->parameters().width, src->parameters().height), CV_16SC1, 0);
s->jobs = 0;
s->frame = 0;
s->hq_count = 0;
s->lq_count = 0;
s->hq_bitrate = value("hq_bitrate", ftl::codecs::kPresetBest);
s->lq_bitrate = value("lq_bitrate", ftl::codecs::kPresetWorst);
src->addRawCallback([this,s](Source *src, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) {
if (spkt.channel == Channel::Calibration) {
// Calibration changed, so lets re-check the bitrate presets
const auto ¶ms = src->parameters();
LOG(INFO) << "RAW CALLBACK";
_transmitPacket(s, spkt, pkt, Quality::Any);
});
net_->broadcast("add_stream", src->getID());
void Streamer::add(ftl::rgbd::Group *grp) {
auto srcs = grp->sources();
for (int i=0; i<srcs.size(); ++i) {
auto &src = srcs[i];
{
UNIQUE_LOCK(mutex_,ulk);
if (sources_.find(src->getID()) != sources_.end()) return;
StreamSource *s = new StreamSource;
s->src = src;
//s->prev_depth = cv::Mat(cv::Size(src->parameters().width, src->parameters().height), CV_16SC1, 0);
s->jobs = 0;
s->frame = 0;
s->clientCount = 0;
s->hq_count = 0;
s->lq_count = 0;
sources_[src->getID()] = s;
//group_.addSource(src);
src->addRawCallback([this,s](Source *src, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) {
LOG(INFO) << "RAW CALLBACK2";
_transmitPacket(s, spkt, pkt, Quality::Any);
});
}
LOG(INFO) << "Proxy Streaming: " << src->getID();
net_->broadcast("add_stream", src->getID());
}
LOG(INFO) << "All proxy streams added";
}
void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID &peer, const string &dest) {
UNIQUE_LOCK(mutex_,slk);
if (sources_.find(source) == sources_.end()) return;
if (rate < 0 || rate >= 10) return;
if (N < 0 || N > ftl::rgbd::kMaxFrames) return;
//DLOG(INFO) << "Adding Stream Peer: " << peer.to_string() << " rate=" << rate << " N=" << N;
// Set a time peer for clock sync
if (time_peer_ == ftl::UUID(0)) {
time_peer_ = peer;
// Do a time sync whenever the CPU is idle for 10ms or more.
// FIXME: Could be starved
timer_job_ = ftl::timer::add(ftl::timer::kTimerIdle10, [peer,this](int id) {
auto start = std::chrono::high_resolution_clock::now();
int64_t mastertime;
try {
mastertime = net_->call<int64_t>(peer, "__ping__");
} catch (...) {
LOG(ERROR) << "Ping failed";
// Reset time peer and remove timer
time_peer_ = ftl::UUID(0);
return false;
}
auto elapsed = std::chrono::high_resolution_clock::now() - start;
int64_t latency = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count();
auto clock_adjust = mastertime - (ftl::timer::get_time() + (latency/2));
if (clock_adjust > 0) {
LOG(INFO) << "Clock adjustment: " << clock_adjust;
//LOG(INFO) << "Latency: " << (latency / 2);
//LOG(INFO) << "Local: " << std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count() << ", master: " << mastertime;
ftl::timer::setClockAdjustment(clock_adjust);
}
return true;
});
// If already listening, just update chunk counters
if (client.peerid == peer) {
// Allow for same client but different quality (beyond threshold)
if ((client.preset < kQualityThreshold && rate >= kQualityThreshold) ||
(client.preset >= kQualityThreshold && rate < kQualityThreshold)) continue;
// Possible switch from high quality to low quality encoding or vice versa
/*if (client.preset < kQualityThreshold && rate >= kQualityThreshold) {
s->hq_count--;
s->lq_count++;
if (s->lq_encoder_c1) s->lq_encoder_c1->reset();
if (s->lq_encoder_c2) s->lq_encoder_c2->reset();
} else if (client.preset >= kQualityThreshold && rate < kQualityThreshold) {
s->hq_count++;
s->lq_count--;
if (s->hq_encoder_c1) s->hq_encoder_c1->reset();
if (s->hq_encoder_c2) s->hq_encoder_c2->reset();
StreamClient &c = s->clients.emplace_back();
c.peerid = peer;
c.uri = dest;
c.txcount = 0;
c.txmax = N;
c.preset = rate;
if (rate >= kQualityThreshold) {
if (s->lq_encoder_c1) s->lq_encoder_c1->reset();
if (s->lq_encoder_c2) s->lq_encoder_c2->reset();
s->lq_count++;
} else {
if (s->hq_encoder_c1) s->hq_encoder_c1->reset();
if (s->hq_encoder_c2) s->hq_encoder_c2->reset();
s->hq_count++;
}
// Finally, inject calibration and config data
s->src->inject(Channel::Calibration, s->src->parameters(Channel::Left), Channel::Left, s->src->getCapabilities());
s->src->inject(Channel::Calibration, s->src->parameters(Channel::Right), Channel::Right, s->src->getCapabilities());
//if (!(*s->src->get<nlohmann::json>("meta")).is_null()) {
s->src->inject(Channel::Configuration, "/original", s->src->getConfig().dump());
//}
}
void Streamer::remove(const std::string &) {
}
void Streamer::stop() {
void Streamer::run(bool block) {
if (block) {
} else {
// Create thread job for frame ticking
void Streamer::_cleanUp() {
for (auto &s : sources_) {
StreamSource *src = s.second;
auto i = src->clients.begin();
while (i != src->clients.end()) {
// Client request completed so remove from list
if ((*i).txcount >= (*i).txmax) {
// If peer was clock sync master, the remove that...
if ((*i).peerid == time_peer_) {
timer_job_.cancel();
time_peer_ = ftl::UUID(0);
LOG(INFO) << "Remove client: " << (*i).uri;
if ((*i).preset < kQualityThreshold) {
src->hq_count--;
} else {
src->lq_count--;
}
i = src->clients.erase(i);
--src->clientCount;
} else {
i++;
if (src->hq_count == 0) {
if (src->hq_encoder_c1) ftl::codecs::free(src->hq_encoder_c1);
if (src->hq_encoder_c2) ftl::codecs::free(src->hq_encoder_c2);
}
if (src->lq_count == 0) {
if (src->lq_encoder_c1) ftl::codecs::free(src->lq_encoder_c1);
if (src->lq_encoder_c2) ftl::codecs::free(src->lq_encoder_c2);
}
if (src->clientCount == 0) {
}
void Streamer::_process(ftl::rgbd::FrameSet &fs) {
// Prevent new clients during processing.
SHARED_LOCK(mutex_,slk);
// This check is not valid, always assume fs.sources is correct
//if (fs.sources.size() != sources_.size()) {
// LOG(ERROR) << "Incorrect number of sources in frameset: " << fs.sources.size() << " vs " << sources_.size();
//return;
//}
for (int j=0; j<fs.sources.size(); ++j) {
StreamSource *src = sources_[fs.sources[j]->getID()];
// Don't do any work in the following cases
if (!src) continue;
if (!fs.sources[j]->isReady()) continue;
if (src->clientCount == 0) continue;
//if (fs.channel1[j].empty() || (fs.sources[j]->getChannel() != ftl::rgbd::kChanNone && fs.channel2[j].empty())) continue;
if (!fs.frames[j].hasChannel(Channel::Colour) || !fs.frames[j].hasChannel(fs.sources[j]->getChannel())) {
LOG(WARNING) << "Missing required channel when streaming: " << (int)fs.sources[j]->getChannel();
continue;
}
bool hasChan2 = fs.sources[j]->getChannel() != Channel::None &&
fs.frames[j].hasChannel(fs.sources[j]->getChannel());
// Do we need to do high quality encoding?
if (src->hq_count > 0) {
auto chan = fs.sources[j]->getChannel();
// Do we have the resources to do a HQ encoding?
///if (src->hq_encoder_c1 && (!hasChan2 || src->hq_encoder_c2)) {
//auto *enc1 = src->hq_encoder_c1;
//auto *enc2 = src->hq_encoder_c2;
MUTEX mtx;
std::condition_variable cv;
bool chan2done = false;
if (fs.frames[j].getPackets(chan).size() == 0) {
// Allocate an encoder
if (!src->hq_encoder_c2) src->hq_encoder_c2 = ftl::codecs::allocateEncoder(
definition_t::HD1080, hq_devices_, hq_codec_);
auto *enc = src->hq_encoder_c2;
// Do we have an encoder to continue with?
if (enc) {
ftl::pool.push([this,&fs,enc,src,hasChan2,&cv,j,&chan2done](int id) {
// TODO: Stagger the reset between nodes... random phasing
if (insert_iframes_ && fs.timestamp % (10*ftl::timer::getInterval()) == 0) enc->reset();
auto chan = fs.sources[j]->getChannel();
try {
enc->encode(fs.frames[j].get<cv::cuda::GpuMat>(chan), src->hq_bitrate, [this,src,hasChan2,chan,&cv,&chan2done](const ftl::codecs::Packet &blk){
_transmitPacket(src, blk, chan, hasChan2, Quality::High);
chan2done = true;
cv.notify_one();
});
} catch (std::exception &e) {
LOG(ERROR) << "Exception in encoder: " << e.what();
chan2done = true;
cv.notify_one();
}
LOG(ERROR) << "Insufficient encoder resources";
} else {
// Already have an encoding so send this
const auto &packets = fs.frames[j].getPackets(chan);
LOG(INFO) << "Send existing chan2 encoding: " << packets.size();
for (const auto &i : packets) {
_transmitPacket(src, i, chan, hasChan2, Quality::High);
}
}
// No second channel requested...
if (src->hq_encoder_c2) src->hq_encoder_c2->reset();
auto colChan = (fs.frames[j].hasChannel(Channel::ColourHighRes)) ? Channel::ColourHighRes : Channel::Colour;
if (fs.frames[j].getPackets(colChan).size() == 0) {
if (!src->hq_encoder_c1) src->hq_encoder_c1 = ftl::codecs::allocateEncoder(
definition_t::HD1080, hq_devices_, hq_codec_);
auto *enc = src->hq_encoder_c1;
if (enc) {
// TODO: Stagger the reset between nodes... random phasing
if (insert_iframes_ && fs.timestamp % (10*ftl::timer::getInterval()) == 0) enc->reset();
enc->encode(fs.frames[j].get<cv::cuda::GpuMat>(colChan), src->hq_bitrate, [this,src,hasChan2](const ftl::codecs::Packet &blk){
_transmitPacket(src, blk, Channel::Colour, hasChan2, Quality::High);
});
} else {
LOG(ERROR) << "Insufficient encoder resources";
}
} else {
const auto &packets = fs.frames[j].getPackets(colChan);
// FIXME: Adjust block number and total to match number of packets
// Also requires the receiver to decode in block number order.
LOG(INFO) << "Send existing encoding: " << packets.size();
for (const auto &i : packets) {
_transmitPacket(src, i, Channel::Colour, hasChan2, Quality::High);
}
}
// Ensure both channels have been completed.
std::unique_lock<std::mutex> lk(mtx);
cv.wait(lk, [&chan2done]{ return chan2done; });
}
// Do we need to do low quality encoding?
if (src->lq_count > 0) {
if (!src->lq_encoder_c1) src->lq_encoder_c1 = ftl::codecs::allocateEncoder(
definition_t::SD480, device_t::Software);
if (!src->lq_encoder_c2 && hasChan2) src->lq_encoder_c2 = ftl::codecs::allocateEncoder(
definition_t::SD480, device_t::Software);
// Do we have the resources to do a LQ encoding?
if (src->lq_encoder_c1 && (!hasChan2 || src->lq_encoder_c2)) {
auto *enc1 = src->lq_encoder_c1;
auto *enc2 = src->lq_encoder_c2;
// Important to send channel 2 first if needed...
// Receiver only waits for channel 1 by default
if (hasChan2) {
auto chan = fs.sources[j]->getChannel();
enc2->encode(fs.frames[j].get<cv::cuda::GpuMat>(chan), src->lq_bitrate, [this,src,hasChan2,chan](const ftl::codecs::Packet &blk){
_transmitPacket(src, blk, chan, hasChan2, Quality::Low);
});
} else {
if (enc2) enc2->reset();
}
enc1->encode(fs.frames[j].get<cv::cuda::GpuMat>(Channel::Colour), src->lq_bitrate, [this,src,hasChan2](const ftl::codecs::Packet &blk){
_transmitPacket(src, blk, Channel::Colour, hasChan2, Quality::Low);
/*std::unique_lock<std::mutex> lk(job_mtx_);
job_cv_.wait_for(lk, std::chrono::seconds(20), [this]{ return jobs_ == 0; });
if (jobs_ != 0) {
LOG(FATAL) << "Deadlock detected";
// Go to sleep if no clients instead of spinning the cpu
if (totalclients == 0 || sources_.size() == 0) {
// Make sure to unlock so clients can connect!
//sleep_for(milliseconds(50));
void Streamer::_transmitPacket(StreamSource *src, const ftl::codecs::Packet &pkt, Channel chan, bool hasChan2, Quality q) {
ftl::codecs::StreamPacket spkt = {
frame_no_,
src->id,
(hasChan2) ? 2 : 1,
chan
//static_cast<uint8_t>((chan & 0x1) | ((hasChan2) ? 0x2 : 0x0))
_transmitPacket(src, spkt, pkt, q);
}
void Streamer::_transmitPacket(StreamSource *src, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt, Quality q) {
//LOG(INFO) << "TRANSMIT: " << spkt.timestamp;
// Lock to prevent clients being added / removed
//SHARED_LOCK(src->mutex,lk);
auto c = src->clients.begin();
while (c != src->clients.end()) {
const ftl::codecs::preset_t b = (*c).preset;
if ((q == Quality::High && b >= kQualityThreshold) || (q == Quality::Low && b < kQualityThreshold)) {
continue;
}
try {
// TODO:(Nick) Send pose
short pre_transmit_latency = short(ftl::timer::get_time() - spkt.timestamp);
if (!net_->send((*c).peerid,
(*c).uri,
pre_transmit_latency, // Time since timestamp for tx
spkt,
pkt)) {
// Send failed so mark as client stream completed
(*c).txcount = (*c).txmax;
} else {
// Count frame as completed only if last block and channel is 0
if (pkt.block_number == pkt.block_total - 1 && spkt.channel == Channel::Colour) ++(*c).txcount;
}
} catch(...) {
(*c).txcount = (*c).txmax;
}
++c;
}
}