Newer
Older
#include <vector>
#include <optional>
#include <thread>
#include <chrono>
using ftl::rgbd::detail::StreamSource;
using ftl::rgbd::detail::StreamClient;
using ftl::net::Universe;
using std::string;
using std::list;
using std::map;
using std::optional;
using std::vector;
using std::this_thread::sleep_for;
using std::chrono::milliseconds;
using std::tuple;
using std::make_tuple;
Streamer::Streamer(nlohmann::json &config, Universe *net)
: ftl::Configurable(config), late_(false), jobs_(0) {
net->bind("find_stream", [this](const std::string &uri) -> optional<UUID> {
if (sources_.find(uri) != sources_.end()) {
LOG(INFO) << "Valid source request received: " << uri;
return net_->id();
} else return {};
});
net->bind("list_streams", [this]() -> vector<string> {
vector<string> streams;
for (auto &i : sources_) {
streams.push_back(i.first);
}
return streams;
});
net->bind("set_pose", [this](const std::string &uri, const std::vector<unsigned char> &buf) {
memcpy(pose.data(), buf.data(), buf.size());
sources_[uri]->src->setPose(pose);
}
});
net->bind("get_pose", [this](const std::string &uri) -> std::vector<unsigned char> {
SHARED_LOCK(mutex_,slk);
if (sources_.find(uri) != sources_.end()) {
Eigen::Matrix4d pose = sources_[uri]->src->getPose();
vector<unsigned char> vec((unsigned char*)pose.data(), (unsigned char*)(pose.data()+(pose.size())));
return vec;
} else {
LOG(WARNING) << "Requested pose not found: " << uri;
return {};
}
});
// Allow remote users to access camera calibration matrix
net->bind("source_details", [this](const std::string &uri) -> tuple<unsigned int,vector<unsigned char>> {
if (sources_.find(uri) != sources_.end()) {
LOG(INFO) << "Calib buf size = " << buf.size();
memcpy(buf.data(), &sources_[uri]->src->parameters(), buf.size());
return make_tuple(sources_[uri]->src->getCapabilities(), buf);
} else {
return make_tuple(0u,buf);
}
});
net->bind("get_stream", [this](const string &source, int N, int rate, const UUID &peer, const string &dest) {
_addClient(source, N, rate, peer, dest);
});
net->bind("set_channel", [this](const string &uri, unsigned int chan) {
SHARED_LOCK(mutex_,slk);
if (sources_.find(uri) != sources_.end()) {
sources_[uri]->src->setChannel((ftl::rgbd::channel_t)chan);
}
});
net->bind("sync_streams", [this](unsigned long long time) {
// Calc timestamp delta
});
net->bind("ping_streamer", [this](unsigned long long time) -> unsigned long long {
return time;
});
}
Streamer::~Streamer() {
net_->unbind("find_stream");
net_->unbind("list_streams");
net_->unbind("source_calibration");
net_->unbind("get_stream");
net_->unbind("sync_streams");
net_->unbind("ping_streamer");
{
UNIQUE_LOCK(mutex_,ulk);
if (sources_.find(src->getID()) != sources_.end()) return;
StreamSource *s = new StreamSource;
s->src = src;
//s->prev_depth = cv::Mat(cv::Size(src->parameters().width, src->parameters().height), CV_16SC1, 0);
s->jobs = 0;
s->frame = 0;
sources_[src->getID()] = s;
}
net_->broadcast("add_stream", src->getID());
}
void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID &peer, const string &dest) {
UNIQUE_LOCK(mutex_,slk);
if (sources_.find(source) == sources_.end()) return;
if (rate < 0 || rate >= 10) return;
if (N < 0 || N > ftl::rgbd::kMaxFrames) return;
LOG(INFO) << "Adding Stream Peer: " << peer.to_string() << " rate=" << rate << " N=" << N;
for (auto &client : s->clients[rate]) {
// If already listening, just update chunk counters
if (client.peerid == peer) {
client.txmax = N * kChunkCount;
client.txcount = 0;
// Not an existing client so add one
StreamClient &c = s->clients[rate].emplace_back();
c.peerid = peer;
c.uri = dest;
c.txcount = 0;
c.txmax = N * kChunkCount;
++s->clientCount;
}
void Streamer::remove(const std::string &) {
}
void Streamer::stop() {
active_ = false;
double wait = 1.0f / 25.0f; // TODO:(Nick) Should be in config
auto start = std::chrono::high_resolution_clock::now();
// Create frame jobs at correct FPS interval
_schedule();
std::chrono::duration<double> elapsed =
std::chrono::high_resolution_clock::now() - start;
if (elapsed.count() >= wait) {
LOG(WARNING) << "Frame rate below optimal @ " << (1.0f / elapsed.count());
} else {
//LOG(INFO) << "Frame rate @ " << (1.0f / elapsed.count());
// Otherwise, wait until next frame should start.
// FIXME:(Nick) Is this accurate enough? Almost certainly not
// TODO:(Nick) Synchronise by time corrections and use of fixed time points
// but this only works if framerate can be achieved.
sleep_for(milliseconds((long long)((wait - elapsed.count()) * 1000.0f)));
}
}
void Streamer::run(bool block) {
active_ = true;
if (block) {
}
} else {
// Create thread job for frame ticking
// Must be called in source locked state or src.state must be atomic
void Streamer::_swap(StreamSource *src) {
for (unsigned int b=0; b<10; ++b) {
auto i = src->clients[b].begin();
while (i != src->clients[b].end()) {
// Client request completed so remove from list
if ((*i).txcount >= (*i).txmax) {
LOG(INFO) << "Remove client: " << (*i).uri;
i = src->clients[b].erase(i);
--src->clientCount;
} else {
i++;
}
src->src->getFrames(src->rgb, src->depth);
//if (!src->rgb.empty() && src->prev_depth.empty()) {
//src->prev_depth = cv::Mat(src->rgb.size(), CV_16UC1, cv::Scalar(0));
//LOG(INFO) << "Creating prevdepth: " << src->rgb.cols << "," << src->rgb.rows;
//}
src->jobs = 0;
src->frame++;
void Streamer::wait() {
// Do some jobs in this thread, might as well...
std::function<void(int)> j;
while ((bool)(j=ftl::pool.pop())) {
j(-1);
}
// Wait for all jobs to complete before finishing frame
//UNIQUE_LOCK(job_mtx_, lk);
std::unique_lock<std::mutex> lk(job_mtx_);
job_cv_.wait_for(lk, std::chrono::seconds(20), [this]{ return jobs_ == 0; });
if (jobs_ != 0) {
LOG(FATAL) << "Deadlock detected";
}
wait();
//std::mutex job_mtx;
//std::condition_variable job_cv;
//int jobs = 0;
// Prevent new clients during processing.
for (auto s : sources_) {
string uri = s.first;
StreamSource *src = sources_[uri];
if (src == nullptr || src->jobs != 0) continue;
ftl::pool.push([this,src](int id) {
//auto start = std::chrono::high_resolution_clock::now();
LOG(ERROR) << "Exception when grabbing frame";
catch (...) {
LOG(ERROR) << "Unknown exception when grabbing frame";
}
//std::chrono::duration<double> elapsed =
// std::chrono::high_resolution_clock::now() - start;
//LOG(INFO) << "Grab in " << elapsed.count() << "s";
--jobs_;
job_cv_.notify_one();
});
for (int i=0; i<kChunkCount; ++i) {
// Add chunk job to thread pool
ftl::pool.push([this,src](int id, int chunk) {
try {
if (!src->rgb.empty() && (src->src->getChannel() == ftl::rgbd::kChanNone || !src->depth.empty())) {
_encodeAndTransmit(src, chunk);
}
} catch(...) {
LOG(ERROR) << "Encode Exception: " << chunk;
--jobs_;
job_cv_.notify_one();
}, i);
}
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
void Streamer::_encodeAndTransmit(StreamSource *src, int chunk) {
bool hasChan2 = (!src->depth.empty() && src->src->getChannel() != ftl::rgbd::kChanNone);
bool delta = (chunk+src->frame) % 8 > 0; // Do XOR or not
int chunk_width = src->rgb.cols / kChunkDim;
int chunk_height = src->rgb.rows / kChunkDim;
// Build chunk heads
int cx = (chunk % kChunkDim) * chunk_width;
int cy = (chunk / kChunkDim) * chunk_height;
cv::Rect roi(cx,cy,chunk_width,chunk_height);
vector<unsigned char> rgb_buf;
cv::Mat chunkRGB = src->rgb(roi);
cv::Mat chunkDepth;
//cv::Mat chunkDepthPrev = src->prev_depth(roi);
cv::Mat d2, d3;
vector<unsigned char> d_buf;
if (hasChan2) {
chunkDepth = src->depth(roi);
if (chunkDepth.type() == CV_32F) chunkDepth.convertTo(d2, CV_16UC1, 1000); // 16*10);
else d2 = chunkDepth;
//if (delta) d3 = (d2 * 2) - chunkDepthPrev;
//else d3 = d2;
//d2.copyTo(chunkDepthPrev);
}
// For each allowed bitrate setting (0 = max quality)
for (unsigned int b=0; b<10; ++b) {
{
//SHARED_LOCK(src->mutex,lk);
if (src->clients[b].size() == 0) continue;
}
// Max bitrate means no changes
if (b == 0) {
_encodeChannel1(chunkRGB, rgb_buf, b);
if (hasChan2) _encodeChannel2(d2, d_buf, src->src->getChannel(), b);
// Otherwise must downscale and change compression params
// TODO:(Nick) could reuse downscales
} else {
cv::Mat downrgb, downdepth;
cv::resize(chunkRGB, downrgb, cv::Size(bitrate_settings[b].width / kChunkDim, bitrate_settings[b].height / kChunkDim));
if (hasChan2) cv::resize(d2, downdepth, cv::Size(bitrate_settings[b].width / kChunkDim, bitrate_settings[b].height / kChunkDim));
_encodeChannel1(downrgb, rgb_buf, b);
if (hasChan2) _encodeChannel2(downdepth, d_buf, src->src->getChannel(), b);
}
//if (chunk == 0) LOG(INFO) << "Sending chunk " << chunk << " : size = " << (d_buf.size()+rgb_buf.size()) << "bytes";
// Lock to prevent clients being added / removed
SHARED_LOCK(src->mutex,lk);
auto c = src->clients[b].begin();
while (c != src->clients[b].end()) {
try {
// TODO:(Nick) Send pose and timestamp
if (!net_->send((*c).peerid, (*c).uri, 0, chunk, delta, rgb_buf, d_buf)) {
// Send failed so mark as client stream completed
(*c).txcount = (*c).txmax;
} else {
++(*c).txcount;
}
} catch(...) {
(*c).txcount = (*c).txmax;
}
++c;
}
}
}
void Streamer::_encodeChannel1(const cv::Mat &in, vector<unsigned char> &out, unsigned int b) {
vector<int> jpgparams = {cv::IMWRITE_JPEG_QUALITY, bitrate_settings[b].jpg_quality};
cv::imencode(".jpg", in, out, jpgparams);
}
bool Streamer::_encodeChannel2(const cv::Mat &in, vector<unsigned char> &out, ftl::rgbd::channel_t c, unsigned int b) {
if (c == ftl::rgbd::kChanNone) return false; // NOTE: Should not happen
if (c == ftl::rgbd::kChanDepth && in.type() == CV_16U && in.channels() == 1) {
vector<int> params = {cv::IMWRITE_PNG_COMPRESSION, bitrate_settings[b].png_compression};
cv::imencode(".png", in, out, params);
return true;
} else if (c == ftl::rgbd::kChanRight && in.type() == CV_8UC3) {
vector<int> params = {cv::IMWRITE_JPEG_QUALITY, bitrate_settings[b].jpg_quality};
cv::imencode(".jpg", in, out, params);
return true;
} else {
LOG(ERROR) << "Bad channel configuration: channel=" << c << " imagetype=" << in.type();
}
return false;
}
Source *Streamer::get(const std::string &uri) {
if (sources_.find(uri) != sources_.end()) return sources_[uri]->src;
else return nullptr;
}