Skip to content
Snippets Groups Projects
Commit 170db52a authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Should improve #55 stability and fps stability

parent 2f2120f8
No related branches found
No related tags found
No related merge requests found
Pipeline #11215 passed
......@@ -96,16 +96,16 @@ static void run(ftl::Configurable *root) {
stream->add(source);
stream->run();
while (display->active()) {
while (ftl::running && display->active()) {
cv::Mat rgb, depth;
source->getRGBD(rgb, depth);
if (!rgb.empty()) display->render(rgb, depth, source->getParameters());
display->wait(10);
}
LOG(INFO) << "Stopping...";
stream->stop();
LOG(INFO) << "Finished.";
delete stream;
delete display;
delete source;
......@@ -125,5 +125,7 @@ int main(int argc, char **argv) {
//} else {
// ftl::middlebury::test(config);
//}
return ftl::exit_code;
}
......@@ -12,6 +12,9 @@
namespace ftl {
extern bool running;
extern int exit_code;
class Configurable;
bool is_directory(const std::string &path);
......
......@@ -47,6 +47,9 @@ using ftl::config::config;
static Configurable *rootCFG = nullptr;
bool ftl::running = true;
int ftl::exit_code = 0;
bool ftl::is_directory(const std::string &path) {
#ifdef WIN32
DWORD attrib = GetFileAttributesA(path.c_str());
......@@ -413,7 +416,7 @@ static void signalIntHandler( int signum ) {
// cleanup and close up stuff here
// terminate program
exit(0);
ftl::running = false;
}
Configurable *ftl::config::configure(int argc, char **argv, const std::string &root) {
......
......@@ -7,7 +7,7 @@ using ftl::ctrl::Slave;
static void netLog(void* user_data, const loguru::Message& message) {
Universe *net = (Universe*)user_data;
net->publish("log", message.preamble, message.message);
//net->publish("log", message.preamble, message.message);
}
Slave::Slave(Universe *net, ftl::Configurable *root) {
......
......@@ -108,7 +108,7 @@ class Universe : public ftl::Configurable {
R call(const UUID &pid, const std::string &name, ARGS... args);
template <typename... ARGS>
void send(const UUID &pid, const std::string &name, ARGS... args);
bool send(const UUID &pid, const std::string &name, ARGS... args);
template <typename R, typename... ARGS>
std::optional<R> findOne(const std::string &name, ARGS... args);
......@@ -316,13 +316,13 @@ R Universe::call(const ftl::UUID &pid, const std::string &name, ARGS... args) {
}
template <typename... ARGS>
void Universe::send(const ftl::UUID &pid, const std::string &name, ARGS... args) {
bool Universe::send(const ftl::UUID &pid, const std::string &name, ARGS... args) {
Peer *p = getPeer(pid);
if (p == nullptr) {
LOG(ERROR) << "Attempting to call an unknown peer : " << pid.to_string();
throw -1;
}
p->send(name, args...);
return p->send(name, args...) > 0;
}
template <typename... ARGS>
......
......@@ -253,6 +253,7 @@ void Universe::_run() {
int n = _setDescriptors();
int selres = 1;
// It is an error to use "select" with no sockets ... so just sleep
if (n == 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(300));
continue;
......@@ -294,6 +295,8 @@ void Universe::_run() {
_installBindings(p);
p->onConnect([this](Peer &p) {
peer_ids_[p.id()] = &p;
// Note, called in another thread so above lock
// does not apply.
_notifyConnect(&p);
});
}
......@@ -314,6 +317,7 @@ void Universe::_run() {
}
}
}
// TODO(Nick) Don't always need to call this
_cleanupPeers();
}
}
......@@ -381,8 +385,8 @@ void Universe::_notifyConnect(Peer *p) {
}
void Universe::_notifyDisconnect(Peer *p) {
// In all cases, should already be locked outside this function call
//unique_lock<mutex> lk(net_mutex_);
LOG(INFO) << "NOTIFY DISCONNECT";
for (auto &i : on_disconnect_) {
try {
i.h(p);
......@@ -393,5 +397,5 @@ void Universe::_notifyDisconnect(Peer *p) {
}
void Universe::_notifyError(Peer *p, const ftl::net::Error &e) {
// TODO(Nick)
}
......@@ -113,6 +113,7 @@ void Streamer::remove(const std::string &) {
void Streamer::stop() {
active_ = false;
pool_.stop();
}
void Streamer::run(bool block) {
......@@ -128,7 +129,11 @@ void Streamer::run(bool block) {
std::chrono::duration<double> elapsed =
std::chrono::high_resolution_clock::now() - start;
sleep_for(milliseconds((long long)((wait - elapsed.count()) * 1000.0f)));
if (elapsed.count() >= wait) {
LOG(WARNING) << "Frame rate below optimal @ " << (1.0f / elapsed.count());
} else {
sleep_for(milliseconds((long long)((wait - elapsed.count()) * 1000.0f)));
}
}
} else {
// Create thread job for frame ticking
......@@ -142,7 +147,11 @@ void Streamer::run(bool block) {
std::chrono::duration<double> elapsed =
std::chrono::high_resolution_clock::now() - start;
sleep_for(milliseconds((long long)((wait - elapsed.count()) * 1000.0f)));
if (elapsed.count() >= wait) {
LOG(WARNING) << "Frame rate below optimal @ " << (1.0f / elapsed.count());
} else {
sleep_for(milliseconds((long long)((wait - elapsed.count()) * 1000.0f)));
}
}
});
}
......@@ -156,12 +165,16 @@ void Streamer::_swap(StreamSource &src) {
}
void Streamer::_schedule() {
std::mutex job_mtx;
std::condition_variable job_cv;
int jobs = 0;
for (auto s : sources_) {
string uri = s.first;
shared_lock<shared_mutex> slk(s.second->mutex);
if (s.second->state != 0) {
LOG(WARNING) << "Stream not ready to schedule on time: " << uri;
LOG(WARNING) << "Stream not ready to schedule on time: " << uri << " (" << s.second->state << ")";
continue;
}
if (s.second->clients[0].size() == 0) {
......@@ -170,19 +183,34 @@ void Streamer::_schedule() {
}
slk.unlock();
unique_lock<mutex> lk(job_mtx);
jobs += 2;
lk.unlock();
// Grab job
pool_.push([this,uri](int id) {
pool_.push([this,uri,&jobs,&job_mtx,&job_cv](int id) {
StreamSource *src = sources_[uri];
src->src->grab();
try {
src->src->grab();
} catch(...) {
LOG(ERROR) << "Grab Exception for: " << uri;
}
unique_lock<shared_mutex> lk(src->mutex);
src->state |= ftl::rgbd::detail::kGrabbed;
_swap(*src);
lk.unlock();
unique_lock<mutex> ulk(job_mtx);
jobs--;
ulk.unlock();
job_cv.notify_one();
});
// Transmit job
// TODO, could do one for each bitrate...
pool_.push([this,uri](int id) {
pool_.push([this,uri,&jobs,&job_mtx,&job_cv](int id) {
StreamSource *src = sources_[uri];
if (src->rgb.rows > 0 && src->depth.rows > 0 && src->clients[0].size() > 0) {
......@@ -197,13 +225,16 @@ void Streamer::_schedule() {
auto i = src->clients[0].begin();
while (i != src->clients[0].end()) {
try {
net_->send((*i).peerid, (*i).uri, rgb_buf, d_buf);
if (!net_->send((*i).peerid, (*i).uri, rgb_buf, d_buf)) {
(*i).txcount = (*i).txmax;
}
} catch(...) {
(*i).txcount = (*i).txmax;
}
(*i).txcount++;
if ((*i).txcount >= (*i).txmax) {
LOG(INFO) << "Remove client";
DLOG(2) << "Remove client";
unique_lock<shared_mutex> lk(src->mutex);
i = src->clients[0].erase(i);
} else {
i++;
......@@ -212,11 +243,21 @@ void Streamer::_schedule() {
}
unique_lock<shared_mutex> lk(src->mutex);
LOG(INFO) << "Tx Frame: " << uri;
DLOG(1) << "Tx Frame: " << uri;
src->state |= ftl::rgbd::detail::kTransmitted;
_swap(*src);
lk.unlock();
unique_lock<mutex> ulk(job_mtx);
jobs--;
ulk.unlock();
job_cv.notify_one();
});
}
// TODO Wait until all jobs completed...
unique_lock<mutex> lk(job_mtx);
job_cv.wait(lk, [&jobs]{ return jobs == 0; });
}
RGBDSource *Streamer::get(const std::string &uri) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment