Skip to content
Snippets Groups Projects
Commit 4bcf9caa authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Merge branch 'feature/50/sync' into 'master'

Partial improvement to sync

See merge request nicolas.pope/ftl!35
parents e41f87ac 57e75589
No related branches found
No related tags found
1 merge request!35Partial improvement to sync
Pipeline #11529 passed
...@@ -194,6 +194,11 @@ static void run(ftl::Configurable *root) { ...@@ -194,6 +194,11 @@ static void run(ftl::Configurable *root) {
//net.broadcast("grab"); // To sync cameras //net.broadcast("grab"); // To sync cameras
scene->nextFrame(); scene->nextFrame();
// TODO(Nick) Improve sync further...
for (size_t i = 0; i < inputs.size(); i++) {
if (inputs[i].source->isReady()) inputs[i].source->grab();
}
for (size_t i = 0; i < inputs.size(); i++) { for (size_t i = 0; i < inputs.size(); i++) {
if (!inputs[i].source->isReady()) { if (!inputs[i].source->isReady()) {
inputs[i].params.m_imageWidth = 0; inputs[i].params.m_imageWidth = 0;
...@@ -220,7 +225,6 @@ static void run(ftl::Configurable *root) { ...@@ -220,7 +225,6 @@ static void run(ftl::Configurable *root) {
// Get the RGB-Depth frame from input // Get the RGB-Depth frame from input
Source *input = inputs[i].source; Source *input = inputs[i].source;
Mat rgb, depth; Mat rgb, depth;
input->grab();
input->getFrames(rgb,depth); input->getFrames(rgb,depth);
active += 1; active += 1;
......
...@@ -389,6 +389,8 @@ void Peer::error(int e) { ...@@ -389,6 +389,8 @@ void Peer::error(int e) {
} }
void Peer::data() { void Peer::data() {
// TODO(Nick) Should not enter here twice if recv call has yet to be
// processed.
//if (!is_waiting_) return; //if (!is_waiting_) return;
//is_waiting_ = false; //is_waiting_ = false;
std::unique_lock<std::recursive_mutex> lk(recv_mtx_); std::unique_lock<std::recursive_mutex> lk(recv_mtx_);
......
...@@ -307,6 +307,9 @@ void Universe::_run() { ...@@ -307,6 +307,9 @@ void Universe::_run() {
block.tv_usec = 10000; block.tv_usec = 10000;
selres = select(n+1, &sfdread_, 0, &sfderror_, &block); selres = select(n+1, &sfdread_, 0, &sfderror_, &block);
// NOTE Nick: Is it possible that not all the recvs have been called before I
// again reach a select call!? What are the consequences of this? A double recv attempt?
//Some kind of error occured, it is usually possible to recover from this. //Some kind of error occured, it is usually possible to recover from this.
if (selres < 0) { if (selres < 0) {
switch (errno) { switch (errno) {
......
...@@ -70,12 +70,6 @@ void NetSource::_recv(const vector<unsigned char> &jpg, const vector<unsigned ch ...@@ -70,12 +70,6 @@ void NetSource::_recv(const vector<unsigned char> &jpg, const vector<unsigned ch
depth_.convertTo(depth_, CV_32FC1, 1.0f/(16.0f*100.0f)); depth_.convertTo(depth_, CV_32FC1, 1.0f/(16.0f*100.0f));
N_--; N_--;
if (N_ == 0) {
N_ += 10;
if (!host_->getNet()->send(peer_, "get_stream", *host_->get<string>("uri"), 10, 0, host_->getNet()->id(), *host_->get<string>("uri"))) {
active_ = false;
}
}
} }
void NetSource::setPose(const Eigen::Matrix4f &pose) { void NetSource::setPose(const Eigen::Matrix4f &pose) {
...@@ -134,7 +128,12 @@ void NetSource::_updateURI() { ...@@ -134,7 +128,12 @@ void NetSource::_updateURI() {
} }
bool NetSource::grab() { bool NetSource::grab() {
// net_.broadcast("grab"); if (N_ == 0) {
N_ += 10;
if (!host_->getNet()->send(peer_, "get_stream", *host_->get<string>("uri"), 10, 0, host_->getNet()->id(), *host_->get<string>("uri"))) {
active_ = false;
}
}
return true; return true;
} }
......
...@@ -139,7 +139,7 @@ void Streamer::stop() { ...@@ -139,7 +139,7 @@ void Streamer::stop() {
} }
void Streamer::poll() { void Streamer::poll() {
double wait = 1.0f / 25.0f; double wait = 1.0f / 25.0f; // TODO(Nick) Should be in config
auto start = std::chrono::high_resolution_clock::now(); auto start = std::chrono::high_resolution_clock::now();
// Create frame jobs at correct FPS interval // Create frame jobs at correct FPS interval
_schedule(); _schedule();
...@@ -194,22 +194,10 @@ void Streamer::_schedule() { ...@@ -194,22 +194,10 @@ void Streamer::_schedule() {
for (auto s : sources_) { for (auto s : sources_) {
string uri = s.first; string uri = s.first;
//shared_lock<shared_mutex> slk(s.second->mutex);
// CHECK Should never be true now
/*if (s.second->state != 0) {
if (!late_) LOG(WARNING) << "Stream not ready to schedule on time: " << uri;
late_ = true;
continue;
} else {
late_ = false;
}*/
// No point in doing work if no clients // No point in doing work if no clients
if (s.second->clients[0].size() == 0) { if (s.second->clients[0].size() == 0) {
//LOG(ERROR) << "Stream has no clients: " << uri;
continue; continue;
} }
//slk.unlock();
// There will be two jobs for this source... // There will be two jobs for this source...
unique_lock<mutex> lk(job_mtx); unique_lock<mutex> lk(job_mtx);
...@@ -220,15 +208,7 @@ void Streamer::_schedule() { ...@@ -220,15 +208,7 @@ void Streamer::_schedule() {
pool_.push([this,uri,&jobs,&job_mtx,&job_cv](int id) { pool_.push([this,uri,&jobs,&job_mtx,&job_cv](int id) {
StreamSource *src = sources_[uri]; StreamSource *src = sources_[uri];
//auto start = std::chrono::high_resolution_clock::now();
//try {
src->src->grab(); src->src->grab();
//} catch(...) {
// LOG(ERROR) << "Grab Exception for: " << uri;
//}
/*std::chrono::duration<double> elapsed =
std::chrono::high_resolution_clock::now() - start;
LOG(INFO) << "GRAB Elapsed: " << elapsed.count();*/
// CHECK (Nick) Can state be an atomic instead? // CHECK (Nick) Can state be an atomic instead?
unique_lock<shared_mutex> lk(src->mutex); unique_lock<shared_mutex> lk(src->mutex);
...@@ -259,7 +239,15 @@ void Streamer::_schedule() { ...@@ -259,7 +239,15 @@ void Streamer::_schedule() {
cv::Mat d2; cv::Mat d2;
src->depth.convertTo(d2, CV_16UC1, 16*100); src->depth.convertTo(d2, CV_16UC1, 16*100);
vector<unsigned char> d_buf; vector<unsigned char> d_buf;
cv::imencode(".png", d2, d_buf);
// Setting 1 = fast but large
// Setting 9 = small but slow
// Anything up to 8 causes minimal if any impact on frame rate
// on my (Nicks) laptop, but 9 halves the frame rate.
vector<int> pngparams = {cv::IMWRITE_PNG_COMPRESSION, 5}; // Default is 1 for fast, 9 = small but slow.
cv::imencode(".png", d2, d_buf, pngparams);
//LOG(INFO) << "Data size: " << ((rgb_buf.size() + d_buf.size()) / 1024) << "kb";
auto i = src->clients[0].begin(); auto i = src->clients[0].begin();
while (i != src->clients[0].end()) { while (i != src->clients[0].end()) {
...@@ -274,7 +262,6 @@ void Streamer::_schedule() { ...@@ -274,7 +262,6 @@ void Streamer::_schedule() {
(*i).txcount++; (*i).txcount++;
if ((*i).txcount >= (*i).txmax) { if ((*i).txcount >= (*i).txmax) {
LOG(INFO) << "Remove client: " << (*i).uri; LOG(INFO) << "Remove client: " << (*i).uri;
//unique_lock<shared_mutex> lk(src->mutex);
i = src->clients[0].erase(i); i = src->clients[0].erase(i);
} else { } else {
i++; i++;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment