Newer
Older
#include <vector>
#include <optional>
#include <thread>
#include <chrono>
using ftl::rgbd::detail::StreamSource;
using ftl::rgbd::detail::StreamClient;
using ftl::net::Universe;
using std::string;
using std::list;
using std::map;
using std::optional;
using std::vector;
using std::this_thread::sleep_for;
using std::chrono::milliseconds;
using std::tuple;
using std::make_tuple;
Streamer::Streamer(nlohmann::json &config, Universe *net)
: ftl::Configurable(config), late_(false), jobs_(0) {
mspf_ = 1000 / value("fps", 25);
//last_dropped_ = 0;
//drop_count_ = 0;
net->bind("find_stream", [this](const std::string &uri) -> optional<UUID> {
if (sources_.find(uri) != sources_.end()) {
LOG(INFO) << "Valid source request received: " << uri;
return net_->id();
} else return {};
});
net->bind("list_streams", [this]() -> vector<string> {
vector<string> streams;
for (auto &i : sources_) {
streams.push_back(i.first);
}
return streams;
});
net->bind("set_pose", [this](const std::string &uri, const std::vector<unsigned char> &buf) {
memcpy(pose.data(), buf.data(), buf.size());
sources_[uri]->src->setPose(pose);
}
});
net->bind("get_pose", [this](const std::string &uri) -> std::vector<unsigned char> {
SHARED_LOCK(mutex_,slk);
if (sources_.find(uri) != sources_.end()) {
Eigen::Matrix4d pose = sources_[uri]->src->getPose();
vector<unsigned char> vec((unsigned char*)pose.data(), (unsigned char*)(pose.data()+(pose.size())));
return vec;
} else {
LOG(WARNING) << "Requested pose not found: " << uri;
return {};
}
});
// Allow remote users to access camera calibration matrix
net->bind("source_details", [this](const std::string &uri, ftl::rgbd::channel_t chan) -> tuple<unsigned int,vector<unsigned char>> {
if (sources_.find(uri) != sources_.end()) {
LOG(INFO) << "Calib buf size = " << buf.size();
auto params = sources_[uri]->src->parameters(chan);
memcpy(buf.data(), ¶ms, buf.size());
return make_tuple(sources_[uri]->src->getCapabilities(), buf);
} else {
return make_tuple(0u,buf);
}
});
net->bind("get_stream", [this](const string &source, int N, int rate, const UUID &peer, const string &dest) {
_addClient(source, N, rate, peer, dest);
});
net->bind("set_channel", [this](const string &uri, unsigned int chan) {
SHARED_LOCK(mutex_,slk);
if (sources_.find(uri) != sources_.end()) {
sources_[uri]->src->setChannel((ftl::rgbd::channel_t)chan);
}
});
//net->bind("sync_streams", [this](unsigned long long time) {
//net->bind("ping_streamer", [this](unsigned long long time) -> unsigned long long {
// return time;
//});
net_->unbind("find_stream");
net_->unbind("list_streams");
net_->unbind("source_calibration");
net_->unbind("get_stream");
net_->unbind("sync_streams");
net_->unbind("ping_streamer");
{
UNIQUE_LOCK(mutex_,ulk);
if (sources_.find(src->getID()) != sources_.end()) return;
StreamSource *s = new StreamSource;
s->src = src;
//s->prev_depth = cv::Mat(cv::Size(src->parameters().width, src->parameters().height), CV_16SC1, 0);
s->jobs = 0;
s->frame = 0;
sources_[src->getID()] = s;
}
net_->broadcast("add_stream", src->getID());
}
void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID &peer, const string &dest) {
UNIQUE_LOCK(mutex_,slk);
if (sources_.find(source) == sources_.end()) return;
if (rate < 0 || rate >= 10) return;
if (N < 0 || N > ftl::rgbd::kMaxFrames) return;
DLOG(INFO) << "Adding Stream Peer: " << peer.to_string() << " rate=" << rate << " N=" << N;
// Set a time peer for clock sync
if (time_peer_ == ftl::UUID(0)) {
time_peer_ = peer;
// Also do a time sync (but should be repeated periodically)
auto start = std::chrono::high_resolution_clock::now();
int64_t mastertime = net_->call<int64_t>(peer, "__ping__");
auto elapsed = std::chrono::high_resolution_clock::now() - start;
int64_t latency = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count();
clock_adjust_ = mastertime - (std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count() + (latency/2));
LOG(INFO) << "Clock adjustment: " << clock_adjust_;
LOG(INFO) << "Latency: " << (latency / 2);
LOG(INFO) << "Local: " << std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count() << ", master: " << mastertime;
}
for (auto &client : s->clients[rate]) {
// If already listening, just update chunk counters
if (client.peerid == peer) {
client.txmax = N * kChunkCount;
client.txcount = 0;
// Not an existing client so add one
StreamClient &c = s->clients[rate].emplace_back();
c.peerid = peer;
c.uri = dest;
c.txcount = 0;
c.txmax = N * kChunkCount;
++s->clientCount;
}
void Streamer::remove(const std::string &) {
}
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
void Streamer::_decideFrameRate(int64_t framesdropped, int64_t msremainder) {
actual_fps_ = 1000.0f / (float)((framesdropped+1)*mspf_+(msremainder));
LOG(INFO) << "Actual FPS = " << actual_fps_;
/*if (framesdropped > 0) {
// If N consecutive frames are dropped, work out new rate
if (last_dropped_/mspf_ >= last_frame_/mspf_ - 2*framesdropped) drop_count_++;
else drop_count_ = 0;
last_dropped_ = last_frame_+mspf_;
if (drop_count_ >= ftl::rgbd::detail::kFrameDropLimit) {
drop_count_ = 0;
const int64_t actualmspf = std::min((int64_t)1000, framesdropped*mspf_+(mspf_ - msremainder));
LOG(WARNING) << "Suggest FPS @ " << (1000 / actualmspf);
//mspf_ = actualmspf;
// Also notify all clients of change
}
} else {
// Perhaps we can boost framerate?
const int64_t actualmspf = std::min((int64_t)1000, framesdropped*mspf_+(mspf_ - msremainder));
LOG(INFO) << "Boost framerate: " << (1000 / actualmspf);
//mspf_ = actualmspf;
}*/
}
void Streamer::stop() {
active_ = false;
void Streamer::poll() {
// Create frame jobs at correct FPS interval
_schedule();
}
void Streamer::run(bool block) {
active_ = true;
if (block) {
}
} else {
// Create thread job for frame ticking
// Must be called in source locked state or src.state must be atomic
void Streamer::_swap(StreamSource *src) {
for (unsigned int b=0; b<10; ++b) {
auto i = src->clients[b].begin();
while (i != src->clients[b].end()) {
// Client request completed so remove from list
if ((*i).txcount >= (*i).txmax) {
LOG(INFO) << "Remove client: " << (*i).uri;
i = src->clients[b].erase(i);
--src->clientCount;
} else {
i++;
}
src->src->getFrames(src->rgb, src->depth);
//if (!src->rgb.empty() && src->prev_depth.empty()) {
//src->prev_depth = cv::Mat(src->rgb.size(), CV_16UC1, cv::Scalar(0));
//LOG(INFO) << "Creating prevdepth: " << src->rgb.cols << "," << src->rgb.rows;
//}
src->jobs = 0;
src->frame++;
void Streamer::wait() {
// Do some jobs in this thread, might as well...
std::function<void(int)> j;
while ((bool)(j=ftl::pool.pop())) {
j(-1);
}
// Wait for all jobs to complete before finishing frame
//UNIQUE_LOCK(job_mtx_, lk);
std::unique_lock<std::mutex> lk(job_mtx_);
job_cv_.wait_for(lk, std::chrono::seconds(20), [this]{ return jobs_ == 0; });
if (jobs_ != 0) {
LOG(FATAL) << "Deadlock detected";
}
// Capture frame number?
frame_no_ = last_frame_;
// Prevent new clients during processing.
for (auto s : sources_) {
string uri = s.first;
StreamSource *src = sources_[uri];
if (src == nullptr || src->jobs != 0) continue;
ftl::pool.push([this,src](int id) {
//auto start = std::chrono::high_resolution_clock::now();
auto start = std::chrono::high_resolution_clock::now();
int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count()+clock_adjust_;
// TODO:(Nick) A now%mspf_ == 0 should be accepted
if (target != last_frame_) {
LOG(WARNING) << "Frame " << "(" << (target-last_frame_) << ") dropped by " << (now%mspf_) << "ms";
}
//_decideFrameRate(target-last_frame_, now%mspf_);
int64_t msdelay = mspf_ - (now % mspf_);
//LOG(INFO) << "Required Delay: " << (now / 40) << " = " << msdelay;
while (msdelay >= 20) {
sleep_for(milliseconds(10));
now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count()+clock_adjust_;
target = now / mspf_;
while ((now/mspf_) == target) {
_mm_pause(); // SSE2 nano pause intrinsic
now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count()+clock_adjust_;
};
LOG(ERROR) << "Exception when grabbing frame";
catch (...) {
LOG(ERROR) << "Unknown exception when grabbing frame";
}
// Mark job as finished
std::unique_lock<std::mutex> lk(job_mtx_);
--jobs_;
});
// Compute job
ftl::pool.push([this,src](int id) {
try {
src->src->compute();
} catch (std::exception &ex) {
LOG(ERROR) << "Exception when computing frame";
LOG(ERROR) << ex.what();
}
catch (...) {
LOG(ERROR) << "Unknown exception when computing frame";
}
src->jobs--;
_swap(src);
--jobs_;
job_cv_.notify_one();
});
for (int i=0; i<kChunkCount; ++i) {
// Add chunk job to thread pool
ftl::pool.push([this,src,i](int id) {
int chunk = i;
try {
if (!src->rgb.empty() && (src->src->getChannel() == ftl::rgbd::kChanNone || !src->depth.empty())) {
_encodeAndTransmit(src, chunk);
}
} catch(...) {
LOG(ERROR) << "Encode Exception: " << chunk;
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
void Streamer::_encodeAndTransmit(StreamSource *src, int chunk) {
bool hasChan2 = (!src->depth.empty() && src->src->getChannel() != ftl::rgbd::kChanNone);
bool delta = (chunk+src->frame) % 8 > 0; // Do XOR or not
int chunk_width = src->rgb.cols / kChunkDim;
int chunk_height = src->rgb.rows / kChunkDim;
// Build chunk heads
int cx = (chunk % kChunkDim) * chunk_width;
int cy = (chunk / kChunkDim) * chunk_height;
cv::Rect roi(cx,cy,chunk_width,chunk_height);
vector<unsigned char> rgb_buf;
cv::Mat chunkRGB = src->rgb(roi);
cv::Mat chunkDepth;
//cv::Mat chunkDepthPrev = src->prev_depth(roi);
cv::Mat d2, d3;
vector<unsigned char> d_buf;
if (hasChan2) {
chunkDepth = src->depth(roi);
if (chunkDepth.type() == CV_32F) chunkDepth.convertTo(d2, CV_16UC1, 1000); // 16*10);
else d2 = chunkDepth;
//if (delta) d3 = (d2 * 2) - chunkDepthPrev;
//else d3 = d2;
//d2.copyTo(chunkDepthPrev);
}
// For each allowed bitrate setting (0 = max quality)
for (unsigned int b=0; b<10; ++b) {
{
//SHARED_LOCK(src->mutex,lk);
if (src->clients[b].size() == 0) continue;
}
// Max bitrate means no changes
if (b == 0) {
_encodeChannel1(chunkRGB, rgb_buf, b);
if (hasChan2) _encodeChannel2(d2, d_buf, src->src->getChannel(), b);
// Otherwise must downscale and change compression params
// TODO:(Nick) could reuse downscales
} else {
cv::Mat downrgb, downdepth;
cv::resize(chunkRGB, downrgb, cv::Size(bitrate_settings[b].width / kChunkDim, bitrate_settings[b].height / kChunkDim));
if (hasChan2) cv::resize(d2, downdepth, cv::Size(bitrate_settings[b].width / kChunkDim, bitrate_settings[b].height / kChunkDim));
_encodeChannel1(downrgb, rgb_buf, b);
if (hasChan2) _encodeChannel2(downdepth, d_buf, src->src->getChannel(), b);
}
//if (chunk == 0) LOG(INFO) << "Sending chunk " << chunk << " : size = " << (d_buf.size()+rgb_buf.size()) << "bytes";
// Lock to prevent clients being added / removed
SHARED_LOCK(src->mutex,lk);
auto c = src->clients[b].begin();
while (c != src->clients[b].end()) {
try {
// TODO:(Nick) Send pose
if (!net_->send((*c).peerid, (*c).uri, frame_no_*mspf_, chunk, delta, rgb_buf, d_buf)) {
// Send failed so mark as client stream completed
(*c).txcount = (*c).txmax;
} else {
++(*c).txcount;
}
} catch(...) {
(*c).txcount = (*c).txmax;
}
++c;
}
}
}
void Streamer::_encodeChannel1(const cv::Mat &in, vector<unsigned char> &out, unsigned int b) {
vector<int> jpgparams = {cv::IMWRITE_JPEG_QUALITY, bitrate_settings[b].jpg_quality};
cv::imencode(".jpg", in, out, jpgparams);
}
bool Streamer::_encodeChannel2(const cv::Mat &in, vector<unsigned char> &out, ftl::rgbd::channel_t c, unsigned int b) {
if (c == ftl::rgbd::kChanNone) return false; // NOTE: Should not happen
if (isFloatChannel(c) && in.type() == CV_16U && in.channels() == 1) {
vector<int> params = {cv::IMWRITE_PNG_COMPRESSION, bitrate_settings[b].png_compression};
cv::imencode(".png", in, out, params);
return true;
} else if (!isFloatChannel(c) && in.type() == CV_8UC3) {
vector<int> params = {cv::IMWRITE_JPEG_QUALITY, bitrate_settings[b].jpg_quality};
cv::imencode(".jpg", in, out, params);
return true;
} else {
LOG(ERROR) << "Bad channel configuration: channel=" << c << " imagetype=" << in.type();
}
return false;
}
Source *Streamer::get(const std::string &uri) {
if (sources_.find(uri) != sources_.end()) return sources_[uri]->src;
else return nullptr;
}