Skip to content
Snippets Groups Projects
Commit b57a5234 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Split jpg and png compression

parent 5ffe6279
No related branches found
No related tags found
1 merge request!37Resolves #83 net performance, partially
Pipeline #11568 passed
...@@ -273,7 +273,8 @@ int Peer::send(const std::string &s, ARGS... args) { ...@@ -273,7 +273,8 @@ int Peer::send(const std::string &s, ARGS... args) {
auto args_obj = std::make_tuple(args...); auto args_obj = std::make_tuple(args...);
auto call_obj = std::make_tuple(0,s,args_obj); auto call_obj = std::make_tuple(0,s,args_obj);
msgpack::pack(send_buf_, call_obj); msgpack::pack(send_buf_, call_obj);
return _send(); int rc = _send();
return rc;
} }
template <typename F> template <typename F>
......
...@@ -26,13 +26,16 @@ struct StreamClient { ...@@ -26,13 +26,16 @@ struct StreamClient {
}; };
static const unsigned int kGrabbed = 0x1; static const unsigned int kGrabbed = 0x1;
static const unsigned int kTransmitted = 0x2; static const unsigned int kRGB = 0x2;
static const unsigned int kDepth = 0x4;
struct StreamSource { struct StreamSource {
ftl::rgbd::Source *src; ftl::rgbd::Source *src;
std::atomic<unsigned int> state; // Busy or ready to swap? std::atomic<unsigned int> state; // Busy or ready to swap?
cv::Mat rgb; // Tx buffer cv::Mat rgb; // Tx buffer
cv::Mat depth; // Tx buffer cv::Mat depth; // Tx buffer
std::vector<unsigned char> rgb_buf;
std::vector<unsigned char> d_buf;
std::vector<detail::StreamClient> clients[10]; // One list per bitrate std::vector<detail::StreamClient> clients[10]; // One list per bitrate
std::shared_mutex mutex; std::shared_mutex mutex;
}; };
...@@ -108,7 +111,7 @@ class Streamer : public ftl::Configurable { ...@@ -108,7 +111,7 @@ class Streamer : public ftl::Configurable {
std::atomic<int> jobs_; std::atomic<int> jobs_;
void _schedule(); void _schedule();
void _swap(detail::StreamSource &); void _swap(detail::StreamSource *);
void _addClient(const std::string &source, int N, int rate, const ftl::UUID &peer, const std::string &dest); void _addClient(const std::string &source, int N, int rate, const ftl::UUID &peer, const std::string &dest);
}; };
......
...@@ -170,6 +170,7 @@ void Streamer::poll() { ...@@ -170,6 +170,7 @@ void Streamer::poll() {
if (elapsed.count() >= wait) { if (elapsed.count() >= wait) {
LOG(WARNING) << "Frame rate below optimal @ " << (1.0f / elapsed.count()); LOG(WARNING) << "Frame rate below optimal @ " << (1.0f / elapsed.count());
} else { } else {
LOG(INFO) << "Frame rate @ " << (1.0f / elapsed.count());
// Otherwise, wait until next frame should start. // Otherwise, wait until next frame should start.
// CHECK(Nick) Is this accurate enough? Almost certainly not // CHECK(Nick) Is this accurate enough? Almost certainly not
// TODO(Nick) Synchronise by time corrections and use of fixed time points // TODO(Nick) Synchronise by time corrections and use of fixed time points
...@@ -196,10 +197,32 @@ void Streamer::run(bool block) { ...@@ -196,10 +197,32 @@ void Streamer::run(bool block) {
} }
// Must be called in source locked state or src.state must be atomic // Must be called in source locked state or src.state must be atomic
void Streamer::_swap(StreamSource &src) { void Streamer::_swap(StreamSource *src) {
if (src.state == (ftl::rgbd::detail::kGrabbed | ftl::rgbd::detail::kTransmitted)) { if (src->state == (ftl::rgbd::detail::kGrabbed | ftl::rgbd::detail::kRGB | ftl::rgbd::detail::kDepth)) {
src.src->getFrames(src.rgb, src.depth); UNIQUE_LOCK(src->mutex,lk);
src.state = 0;
if (src->rgb_buf.size() > 0 && src->d_buf.size() > 0) {
auto i = src->clients[0].begin();
while (i != src->clients[0].end()) {
try {
// TODO(Nick) Send pose and timestamp
if (!net_->send((*i).peerid, (*i).uri, src->rgb_buf, src->d_buf)) {
(*i).txcount = (*i).txmax;
}
} catch(...) {
(*i).txcount = (*i).txmax;
}
(*i).txcount++;
if ((*i).txcount >= (*i).txmax) {
LOG(INFO) << "Remove client: " << (*i).uri;
i = src->clients[0].erase(i);
} else {
i++;
}
}
}
src->src->getFrames(src->rgb, src->depth);
src->state = 0;
} }
} }
...@@ -234,7 +257,7 @@ void Streamer::_schedule() { ...@@ -234,7 +257,7 @@ void Streamer::_schedule() {
// There will be two jobs for this source... // There will be two jobs for this source...
//UNIQUE_LOCK(job_mtx_,lk); //UNIQUE_LOCK(job_mtx_,lk);
jobs_ += 2; jobs_ += 3;
//lk.unlock(); //lk.unlock();
StreamSource *src = sources_[uri]; StreamSource *src = sources_[uri];
...@@ -248,13 +271,45 @@ void Streamer::_schedule() { ...@@ -248,13 +271,45 @@ void Streamer::_schedule() {
// CHECK (Nick) Can state be an atomic instead? // CHECK (Nick) Can state be an atomic instead?
//UNIQUE_LOCK(src->mutex, lk); //UNIQUE_LOCK(src->mutex, lk);
src->state |= ftl::rgbd::detail::kGrabbed; src->state |= ftl::rgbd::detail::kGrabbed;
_swap(*src); _swap(src);
//lk.unlock();
// Mark job as finished // Mark job as finished
//UNIQUE_LOCK(job_mtx_, ulk); --jobs_;
//jobs_--; job_cv_.notify_one();
//ulk.unlock(); });
// Compress colour job
pool_.push([this,src](int id) {
if (!src->rgb.empty()) {
auto start = std::chrono::high_resolution_clock::now();
//vector<unsigned char> src->rgb_buf;
cv::imencode(".jpg", src->rgb, src->rgb_buf);
}
src->state |= ftl::rgbd::detail::kRGB;
_swap(src);
--jobs_;
job_cv_.notify_one();
});
// Compress depth job
pool_.push([this,src](int id) {
if (!src->depth.empty()) {
cv::Mat d2;
src->depth.convertTo(d2, CV_16UC1, 16*100);
//vector<unsigned char> d_buf;
// Setting 1 = fast but large
// Setting 9 = small but slow
// Anything up to 8 causes minimal if any impact on frame rate
// on my (Nicks) laptop, but 9 halves the frame rate.
vector<int> pngparams = {cv::IMWRITE_PNG_COMPRESSION, 1}; // Default is 1 for fast, 9 = small but slow.
cv::imencode(".png", d2, src->d_buf, pngparams);
}
src->state |= ftl::rgbd::detail::kDepth;
_swap(src);
--jobs_; --jobs_;
job_cv_.notify_one(); job_cv_.notify_one();
}); });
...@@ -264,14 +319,20 @@ void Streamer::_schedule() { ...@@ -264,14 +319,20 @@ void Streamer::_schedule() {
// meaning that no lock is required here since outer shared_lock // meaning that no lock is required here since outer shared_lock
// prevents addition of new clients. // prevents addition of new clients.
// TODO, could do one for each bitrate... // TODO, could do one for each bitrate...
pool_.push([this,src](int id) { /* pool_.push([this,src](int id) {
//StreamSource *src = sources_[uri]; //StreamSource *src = sources_[uri];
try { try {
if (src && src->rgb.rows > 0 && src->depth.rows > 0 && src->clients[0].size() > 0) { if (src && src->rgb.rows > 0 && src->depth.rows > 0 && src->clients[0].size() > 0) {
auto start = std::chrono::high_resolution_clock::now();
vector<unsigned char> rgb_buf; vector<unsigned char> rgb_buf;
cv::imencode(".jpg", src->rgb, rgb_buf); cv::imencode(".jpg", src->rgb, rgb_buf);
std::chrono::duration<double> elapsed =
std::chrono::high_resolution_clock::now() - start;
LOG(INFO) << "JPG in " << elapsed.count() << "s";
cv::Mat d2; cv::Mat d2;
src->depth.convertTo(d2, CV_16UC1, 16*100); src->depth.convertTo(d2, CV_16UC1, 16*100);
vector<unsigned char> d_buf; vector<unsigned char> d_buf;
...@@ -285,36 +346,12 @@ void Streamer::_schedule() { ...@@ -285,36 +346,12 @@ void Streamer::_schedule() {
//LOG(INFO) << "Data size: " << ((rgb_buf.size() + d_buf.size()) / 1024) << "kb"; //LOG(INFO) << "Data size: " << ((rgb_buf.size() + d_buf.size()) / 1024) << "kb";
{
UNIQUE_LOCK(src->mutex,lk);
auto i = src->clients[0].begin();
while (i != src->clients[0].end()) {
try {
// TODO(Nick) Send pose and timestamp
if (!net_->send((*i).peerid, (*i).uri, rgb_buf, d_buf)) {
(*i).txcount = (*i).txmax;
}
} catch(...) {
(*i).txcount = (*i).txmax;
}
(*i).txcount++;
if ((*i).txcount >= (*i).txmax) {
LOG(INFO) << "Remove client: " << (*i).uri;
i = src->clients[0].erase(i);
} else {
i++;
}
}
}
} }
} catch(...) { } catch(...) {
LOG(ERROR) << "Error in transmission loop"; LOG(ERROR) << "Error in transmission loop";
} }
/*std::chrono::duration<double> elapsed =
std::chrono::high_resolution_clock::now() - start;
LOG(INFO) << "Stream Elapsed: " << elapsed.count();*/
// CHECK (Nick) Could state be an atomic? // CHECK (Nick) Could state be an atomic?
//UNIQUE_LOCK(src->mutex,lk); //UNIQUE_LOCK(src->mutex,lk);
//LOG(INFO) << "Tx Frame: " << uri; //LOG(INFO) << "Tx Frame: " << uri;
...@@ -326,9 +363,10 @@ void Streamer::_schedule() { ...@@ -326,9 +363,10 @@ void Streamer::_schedule() {
//UNIQUE_LOCK(job_mtx_,ulk); //UNIQUE_LOCK(job_mtx_,ulk);
//jobs_--; //jobs_--;
//ulk.unlock(); //ulk.unlock();
--jobs_; --jobs_;
job_cv_.notify_one(); job_cv_.notify_one();
}); });*/
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment