Skip to content
Snippets Groups Projects
Commit f208a1a8 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Switch to atomics

parent 4df44401
No related branches found
No related tags found
1 merge request!37Resolves #83 net performance, partially
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
#include <vector> #include <vector>
#include <map> #include <map>
#include <shared_mutex> #include <shared_mutex>
#include <atomic>
namespace ftl { namespace ftl {
namespace rgbd { namespace rgbd {
...@@ -29,7 +30,7 @@ static const unsigned int kTransmitted = 0x2; ...@@ -29,7 +30,7 @@ static const unsigned int kTransmitted = 0x2;
struct StreamSource { struct StreamSource {
ftl::rgbd::Source *src; ftl::rgbd::Source *src;
unsigned int state; // Busy or ready to swap? std::atomic<unsigned int> state; // Busy or ready to swap?
cv::Mat rgb; // Tx buffer cv::Mat rgb; // Tx buffer
cv::Mat depth; // Tx buffer cv::Mat depth; // Tx buffer
std::vector<detail::StreamClient> clients[10]; // One list per bitrate std::vector<detail::StreamClient> clients[10]; // One list per bitrate
...@@ -104,7 +105,7 @@ class Streamer : public ftl::Configurable { ...@@ -104,7 +105,7 @@ class Streamer : public ftl::Configurable {
bool late_; bool late_;
std::mutex job_mtx_; std::mutex job_mtx_;
std::condition_variable job_cv_; std::condition_variable job_cv_;
int jobs_; std::atomic<int> jobs_;
void _schedule(); void _schedule();
void _swap(detail::StreamSource &); void _swap(detail::StreamSource &);
......
...@@ -106,6 +106,9 @@ void Streamer::add(Source *src) { ...@@ -106,6 +106,9 @@ void Streamer::add(Source *src) {
} }
void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID &peer, const string &dest) { void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID &peer, const string &dest) {
StreamSource *s = nullptr;
{
UNIQUE_LOCK(mutex_,slk); UNIQUE_LOCK(mutex_,slk);
if (sources_.find(source) == sources_.end()) return; if (sources_.find(source) == sources_.end()) return;
...@@ -114,7 +117,12 @@ void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID ...@@ -114,7 +117,12 @@ void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID
DLOG(INFO) << "Adding Stream Peer: " << peer.to_string(); DLOG(INFO) << "Adding Stream Peer: " << peer.to_string();
StreamSource *s = sources_[source]; s = sources_[source];
}
if (!s) return;
UNIQUE_LOCK(s->mutex, lk2);
for (int i=0; i<s->clients[rate].size(); i++) { for (int i=0; i<s->clients[rate].size(); i++) {
if (s->clients[rate][i].peerid == peer) { if (s->clients[rate][i].peerid == peer) {
StreamClient &c = s->clients[rate][i]; StreamClient &c = s->clients[rate][i];
...@@ -221,26 +229,29 @@ void Streamer::_schedule() { ...@@ -221,26 +229,29 @@ void Streamer::_schedule() {
} }
// There will be two jobs for this source... // There will be two jobs for this source...
UNIQUE_LOCK(job_mtx_,lk); //UNIQUE_LOCK(job_mtx_,lk);
jobs_ += 2; jobs_ += 2;
lk.unlock(); //lk.unlock();
// Grab job
pool_.push([this,uri](int id) {
StreamSource *src = sources_[uri]; StreamSource *src = sources_[uri];
if (src == nullptr || src->state != 0) continue;
// Grab job
pool_.push([this,src](int id) {
//StreamSource *src = sources_[uri];
src->src->grab(); src->src->grab();
// CHECK (Nick) Can state be an atomic instead? // CHECK (Nick) Can state be an atomic instead?
UNIQUE_LOCK(src->mutex, lk); //UNIQUE_LOCK(src->mutex, lk);
src->state |= ftl::rgbd::detail::kGrabbed; src->state |= ftl::rgbd::detail::kGrabbed;
_swap(*src); _swap(*src);
lk.unlock(); //lk.unlock();
// Mark job as finished // Mark job as finished
UNIQUE_LOCK(job_mtx_, ulk); //UNIQUE_LOCK(job_mtx_, ulk);
jobs_--; //jobs_--;
ulk.unlock(); //ulk.unlock();
--jobs_;
job_cv_.notify_one(); job_cv_.notify_one();
}); });
...@@ -249,8 +260,8 @@ void Streamer::_schedule() { ...@@ -249,8 +260,8 @@ void Streamer::_schedule() {
// meaning that no lock is required here since outer shared_lock // meaning that no lock is required here since outer shared_lock
// prevents addition of new clients. // prevents addition of new clients.
// TODO, could do one for each bitrate... // TODO, could do one for each bitrate...
pool_.push([this,uri](int id) { pool_.push([this,src](int id) {
StreamSource *src = sources_[uri]; //StreamSource *src = sources_[uri];
try { try {
if (src && src->rgb.rows > 0 && src->depth.rows > 0 && src->clients[0].size() > 0) { if (src && src->rgb.rows > 0 && src->depth.rows > 0 && src->clients[0].size() > 0) {
...@@ -270,6 +281,8 @@ void Streamer::_schedule() { ...@@ -270,6 +281,8 @@ void Streamer::_schedule() {
//LOG(INFO) << "Data size: " << ((rgb_buf.size() + d_buf.size()) / 1024) << "kb"; //LOG(INFO) << "Data size: " << ((rgb_buf.size() + d_buf.size()) / 1024) << "kb";
{
UNIQUE_LOCK(src->mutex,lk);
auto i = src->clients[0].begin(); auto i = src->clients[0].begin();
while (i != src->clients[0].end()) { while (i != src->clients[0].end()) {
try { try {
...@@ -289,6 +302,7 @@ void Streamer::_schedule() { ...@@ -289,6 +302,7 @@ void Streamer::_schedule() {
} }
} }
} }
}
} catch(...) { } catch(...) {
LOG(ERROR) << "Error in transmission loop"; LOG(ERROR) << "Error in transmission loop";
} }
...@@ -298,16 +312,17 @@ void Streamer::_schedule() { ...@@ -298,16 +312,17 @@ void Streamer::_schedule() {
LOG(INFO) << "Stream Elapsed: " << elapsed.count();*/ LOG(INFO) << "Stream Elapsed: " << elapsed.count();*/
// CHECK (Nick) Could state be an atomic? // CHECK (Nick) Could state be an atomic?
UNIQUE_LOCK(src->mutex,lk); //UNIQUE_LOCK(src->mutex,lk);
//LOG(INFO) << "Tx Frame: " << uri; //LOG(INFO) << "Tx Frame: " << uri;
src->state |= ftl::rgbd::detail::kTransmitted; src->state |= ftl::rgbd::detail::kTransmitted;
_swap(*src); _swap(*src);
lk.unlock(); //lk.unlock();
// Mark job as finished // Mark job as finished
UNIQUE_LOCK(job_mtx_,ulk); //UNIQUE_LOCK(job_mtx_,ulk);
jobs_--; //jobs_--;
ulk.unlock(); //ulk.unlock();
--jobs_;
job_cv_.notify_one(); job_cv_.notify_one();
}); });
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment