Newer
Older
#include <vector>
#include <optional>
#include <thread>
#include <chrono>
using ftl::rgbd::Streamer;
using ftl::rgbd::detail::StreamSource;
using ftl::rgbd::detail::StreamClient;
using ftl::net::Universe;
using std::string;
using std::list;
using std::map;
using std::optional;
using std::vector;
using std::mutex;
using std::shared_mutex;
using std::unique_lock;
using std::shared_lock;
using std::this_thread::sleep_for;
using std::chrono::milliseconds;
#define THREAD_POOL_SIZE 6
Streamer::Streamer(nlohmann::json &config, Universe *net)
: ftl::Configurable(config), pool_(THREAD_POOL_SIZE), late_(false), jobs_(0) {
active_ = false;
net_ = net;
net->bind("find_stream", [this](const std::string &uri) -> optional<UUID> {
if (sources_.find(uri) != sources_.end()) {
LOG(INFO) << "Valid source request received: " << uri;
return net_->id();
} else return {};
});
net->bind("list_streams", [this]() -> vector<string> {
vector<string> streams;
for (auto &i : sources_) {
streams.push_back(i.first);
}
return streams;
});
net->bind("set_pose", [this](const std::string &uri, const std::vector<unsigned char> &buf) {
if (sources_.find(uri) != sources_.end()) {
Eigen::Matrix4f pose;
memcpy(pose.data(), buf.data(), buf.size());
sources_[uri]->src->setPose(pose);
}
});
// Allow remote users to access camera calibration matrix
net->bind("source_calibration", [this](const std::string &uri) -> vector<unsigned char> {
vector<unsigned char> buf;
if (sources_.find(uri) != sources_.end()) {
LOG(INFO) << "Calib buf size = " << buf.size();
memcpy(buf.data(), &sources_[uri]->src->parameters(), buf.size());
}
return buf;
});
net->bind("get_stream", [this](const string &source, int N, int rate, const UUID &peer, const string &dest) {
_addClient(source, N, rate, peer, dest);
});
net->bind("sync_streams", [this](unsigned long long time) {
// Calc timestamp delta
});
net->bind("ping_streamer", [this](unsigned long long time) -> unsigned long long {
return time;
});
}
Streamer::~Streamer() {
net_->unbind("find_stream");
net_->unbind("list_streams");
net_->unbind("source_calibration");
net_->unbind("get_stream");
net_->unbind("sync_streams");
net_->unbind("ping_streamer");
{
UNIQUE_LOCK(mutex_,ulk);
if (sources_.find(src->getID()) != sources_.end()) return;
StreamSource *s = new StreamSource;
s->src = src;
s->state = 0;
sources_[src->getID()] = s;
}
net_->broadcast("add_stream", src->getID());
}
void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID &peer, const string &dest) {
StreamSource *s = nullptr;
//{
UNIQUE_LOCK(mutex_,slk);
if (sources_.find(source) == sources_.end()) return;
if (rate < 0 || rate >= 10) return;
if (N < 0 || N > ftl::rgbd::kMaxFrames) return;
DLOG(INFO) << "Adding Stream Peer: " << peer.to_string();
s = sources_[source];
//}
if (!s) return;
UNIQUE_LOCK(s->mutex, lk2);
for (int i=0; i<s->clients[rate].size(); i++) {
if (s->clients[rate][i].peerid == peer) {
StreamClient &c = s->clients[rate][i];
c.txmax = N;
c.txcount = 0;
return;
}
}
StreamClient c;
c.peerid = peer;
c.uri = dest;
c.txcount = 0;
c.txmax = N;
s->clients[rate].push_back(c);
}
}
void Streamer::remove(const std::string &) {
}
void Streamer::stop() {
active_ = false;
double wait = 1.0f / 25.0f; // TODO(Nick) Should be in config
auto start = std::chrono::high_resolution_clock::now();
// Create frame jobs at correct FPS interval
_schedule();
std::chrono::duration<double> elapsed =
std::chrono::high_resolution_clock::now() - start;
if (elapsed.count() >= wait) {
LOG(WARNING) << "Frame rate below optimal @ " << (1.0f / elapsed.count());
} else {
//LOG(INFO) << "Frame rate @ " << (1.0f / elapsed.count());
// Otherwise, wait until next frame should start.
// CHECK(Nick) Is this accurate enough? Almost certainly not
// TODO(Nick) Synchronise by time corrections and use of fixed time points
// but this only works if framerate can be achieved.
sleep_for(milliseconds((long long)((wait - elapsed.count()) * 1000.0f)));
}
}
void Streamer::run(bool block) {
active_ = true;
if (block) {
}
} else {
// Create thread job for frame ticking
pool_.push([this](int id) {
// Must be called in source locked state or src.state must be atomic
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
void Streamer::_swap(StreamSource *src) {
if (src->state == (ftl::rgbd::detail::kGrabbed | ftl::rgbd::detail::kRGB | ftl::rgbd::detail::kDepth)) {
UNIQUE_LOCK(src->mutex,lk);
if (src->rgb_buf.size() > 0 && src->d_buf.size() > 0) {
auto i = src->clients[0].begin();
while (i != src->clients[0].end()) {
try {
// TODO(Nick) Send pose and timestamp
if (!net_->send((*i).peerid, (*i).uri, src->rgb_buf, src->d_buf)) {
(*i).txcount = (*i).txmax;
}
} catch(...) {
(*i).txcount = (*i).txmax;
}
(*i).txcount++;
if ((*i).txcount >= (*i).txmax) {
LOG(INFO) << "Remove client: " << (*i).uri;
i = src->clients[0].erase(i);
} else {
i++;
}
}
}
src->src->getFrames(src->rgb, src->depth);
src->state = 0;
void Streamer::wait() {
// Do some jobs in this thread, might as well...
std::function<void(int)> j;
while ((bool)(j=pool_.pop())) {
j(-1);
}
// Wait for all jobs to complete before finishing frame
UNIQUE_LOCK(job_mtx_, lk);
job_cv_.wait(lk, [this]{ return jobs_ == 0; });
}
wait();
//std::mutex job_mtx;
//std::condition_variable job_cv;
//int jobs = 0;
// Prevent new clients during processing.
for (auto s : sources_) {
string uri = s.first;
if (s.second->clients[0].size() == 0) {
continue;
}
//UNIQUE_LOCK(job_mtx_,lk);
jobs_ += 3;
//lk.unlock();
StreamSource *src = sources_[uri];
if (src == nullptr || src->state != 0) continue;
// Grab job
pool_.push([this,src](int id) {
//StreamSource *src = sources_[uri];
// CHECK (Nick) Can state be an atomic instead?
--jobs_;
job_cv_.notify_one();
});
// Compress colour job
pool_.push([this,src](int id) {
if (!src->rgb.empty()) {
auto start = std::chrono::high_resolution_clock::now();
//vector<unsigned char> src->rgb_buf;
cv::imencode(".jpg", src->rgb, src->rgb_buf);
}
src->state |= ftl::rgbd::detail::kRGB;
_swap(src);
--jobs_;
job_cv_.notify_one();
});
// Compress depth job
pool_.push([this,src](int id) {
auto start = std::chrono::high_resolution_clock::now();
if (!src->depth.empty()) {
cv::Mat d2;
src->depth.convertTo(d2, CV_16UC1, 16*100);
//vector<unsigned char> d_buf;
// Setting 1 = fast but large
// Setting 9 = small but slow
// Anything up to 8 causes minimal if any impact on frame rate
// on my (Nicks) laptop, but 9 halves the frame rate.
vector<int> pngparams = {cv::IMWRITE_PNG_COMPRESSION, 1}; // Default is 1 for fast, 9 = small but slow.
cv::imencode(".png", d2, src->d_buf, pngparams);
}
std::chrono::duration<double> elapsed =
std::chrono::high_resolution_clock::now() - start;
LOG(INFO) << "Depth Compress in " << elapsed.count() << "s";
src->state |= ftl::rgbd::detail::kDepth;
_swap(src);
--jobs_;
job_cv_.notify_one();
// For any single source and bitrate there is only one thread
// meaning that no lock is required here since outer shared_lock
// prevents addition of new clients.
/* pool_.push([this,src](int id) {
//StreamSource *src = sources_[uri];
try {
if (src && src->rgb.rows > 0 && src->depth.rows > 0 && src->clients[0].size() > 0) {
auto start = std::chrono::high_resolution_clock::now();
vector<unsigned char> rgb_buf;
cv::imencode(".jpg", src->rgb, rgb_buf);
std::chrono::duration<double> elapsed =
std::chrono::high_resolution_clock::now() - start;
LOG(INFO) << "JPG in " << elapsed.count() << "s";
cv::Mat d2;
src->depth.convertTo(d2, CV_16UC1, 16*100);
vector<unsigned char> d_buf;
// Setting 1 = fast but large
// Setting 9 = small but slow
// Anything up to 8 causes minimal if any impact on frame rate
// on my (Nicks) laptop, but 9 halves the frame rate.
vector<int> pngparams = {cv::IMWRITE_PNG_COMPRESSION, 1}; // Default is 1 for fast, 9 = small but slow.
cv::imencode(".png", d2, d_buf, pngparams);
//LOG(INFO) << "Data size: " << ((rgb_buf.size() + d_buf.size()) / 1024) << "kb";
} catch(...) {
LOG(ERROR) << "Error in transmission loop";
}
// CHECK (Nick) Could state be an atomic?
src->state |= ftl::rgbd::detail::kTransmitted;
_swap(*src);
//UNIQUE_LOCK(job_mtx_,ulk);
//jobs_--;
//ulk.unlock();
--jobs_;
job_cv_.notify_one();
});*/
}
Source *Streamer::get(const std::string &uri) {
if (sources_.find(uri) != sources_.end()) return sources_[uri]->src;
else return nullptr;
}