Skip to content
Snippets Groups Projects
Commit 90aaa7fa authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Merge branch 'bug/116/deadlocks' into 'master'

Resolves #116 deadlocks

Closes #116

See merge request nicolas.pope/ftl!63
parents 5054f48b 05811158
No related branches found
No related tags found
1 merge request!63Resolves #116 deadlocks
Pipeline #11983 passed
Showing
with 58 additions and 67 deletions
......@@ -36,7 +36,7 @@ class SourceWindow : public nanogui::Window {
std::vector<GLTexture> thumbs_;
bool refresh_thumbs_;
nanogui::Widget *ipanel_;
std::mutex mutex_;
MUTEX mutex_;
void _updateCameras();
......
......@@ -7,8 +7,6 @@
#include <loguru.hpp>
using ftl::rgbd::VirtualSource;
using std::mutex;
using std::unique_lock;
VirtualSource::VirtualSource(ftl::rgbd::Source *host)
: ftl::rgbd::detail::Source(host) {
......
......@@ -15,8 +15,6 @@
#include <vector>
#include <fstream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <opencv2/opencv.hpp>
#include <ftl/rgbd.hpp>
......@@ -47,8 +45,6 @@ using std::map;
using std::condition_variable;
using std::this_thread::sleep_for;
using std::chrono::milliseconds;
using std::mutex;
using std::unique_lock;
using cv::Mat;
using json = nlohmann::json;
......
......@@ -7,24 +7,28 @@
#define POOL_SIZE 10
#if defined _DEBUG && DEBUG_MUTEX
// #define DEBUG_MUTEX
#if defined DEBUG_MUTEX
#include <loguru.hpp>
#include <chrono>
#include <type_traits>
#define UNIQUE_LOCK(M,L) \
auto start_##L = std::chrono::high_resolution_clock::now(); \
std::unique_lock<std::remove_reference<decltype(M)>::type> L(M); \
LOG(INFO) << "LOCK(" << #M "," #L << ") in " << std::chrono::duration<double>(std::chrono::high_resolution_clock::now() - start_##L).count();
#define MUTEX std::timed_mutex
#define RECURSIVE_MUTEX std::recursive_timed_mutex
#define SHARED_MUTEX std::shared_timed_mutex
#define UNIQUE_LOCK(M,L) std::unique_lock<std::remove_reference<decltype(M)>::type> L(M, std::chrono::seconds(5)); if (!L) LOG(FATAL) << "Mutex deadlock";
#define SHARED_LOCK(M,L) std::shared_lock<std::remove_reference<decltype(M)>::type> L(M, std::chrono::seconds(5)); if (!L) LOG(FATAL) << "Mutex deadlock";
#define SHARED_LOCK(M,L) \
auto start_##L = std::chrono::high_resolution_clock::now(); \
std::shared_lock<std::remove_reference<decltype(M)>::type> L(M); \
LOG(INFO) << "LOCK(" << #M "," #L << ") in " << std::chrono::duration<double>(std::chrono::high_resolution_clock::now() - start_##L).count();
#else
#define MUTEX std::mutex
#define RECURSIVE_MUTEX std::recursive_mutex
#define SHARED_MUTEX std::shared_mutex
#define UNIQUE_LOCK(M,L) std::unique_lock<std::remove_reference<decltype(M)>::type> L(M);
#define SHARED_LOCK(M,L) std::shared_lock<std::remove_reference<decltype(M)>::type> L(M);
#endif // _DEBUG && DEBUG_MUTEX
#endif // DEBUG_MUTEX
namespace ftl {
extern ctpl::thread_pool pool;
......
......@@ -4,7 +4,7 @@
#include <ftl/net/universe.hpp>
#include <ftl/configurable.hpp>
#include <loguru.hpp>
#include <mutex>
#include <ftl/threads.hpp>
namespace ftl {
namespace ctrl {
......@@ -38,7 +38,7 @@ class Slave {
private:
std::vector<ftl::UUID> log_peers_;
ftl::net::Universe *net_;
std::recursive_mutex mutex_;
RECURSIVE_MUTEX mutex_;
bool in_log_;
bool active_;
SystemState state_;
......
......@@ -242,14 +242,14 @@ class Peer {
// Receive buffers
bool is_waiting_;
msgpack::unpacker recv_buf_;
std::recursive_mutex recv_mtx_;
RECURSIVE_MUTEX recv_mtx_;
bool ws_read_header_;
// Send buffers
msgpack::vrefbuffer send_buf_;
std::recursive_mutex send_mtx_;
RECURSIVE_MUTEX send_mtx_;
std::recursive_mutex cb_mtx_;
RECURSIVE_MUTEX cb_mtx_;
std::string uri_; // Original connection URI, or assumed URI
ftl::UUID peerid_; // Received in handshake or allocated
......@@ -292,7 +292,8 @@ R Peer::call(const std::string &name, ARGS... args) {
R result;
int id = asyncCall<R>(name, [&](const R &r) {
UNIQUE_LOCK(m,lk);
//UNIQUE_LOCK(m,lk);
std::unique_lock<std::mutex> lk(m);
hasreturned = true;
result = r;
lk.unlock();
......@@ -300,7 +301,8 @@ R Peer::call(const std::string &name, ARGS... args) {
}, std::forward<ARGS>(args)...);
{ // Block thread until async callback notifies us
UNIQUE_LOCK(m,lk);
//UNIQUE_LOCK(m,lk);
std::unique_lock<std::mutex> lk(m);
cv.wait_for(lk, std::chrono::seconds(1), [&hasreturned]{return hasreturned;});
}
......
......@@ -203,8 +203,8 @@ class Universe : public ftl::Configurable {
private:
bool active_;
ftl::UUID this_peer;
std::shared_mutex net_mutex_;
std::recursive_mutex handler_mutex_;
SHARED_MUTEX net_mutex_;
RECURSIVE_MUTEX handler_mutex_;
fd_set sfderror_;
fd_set sfdread_;
std::vector<ftl::net::Listener*> listeners_;
......@@ -275,7 +275,8 @@ std::optional<R> Universe::findOne(const std::string &name, ARGS... args) {
std::optional<R> result;
auto handler = [&](const std::optional<R> &r) {
UNIQUE_LOCK(m,lk);
//UNIQUE_LOCK(m,lk);
std::unique_lock<std::mutex> lk(m);
if (hasreturned || !r) return;
hasreturned = true;
result = r;
......@@ -292,7 +293,8 @@ std::optional<R> Universe::findOne(const std::string &name, ARGS... args) {
lk.unlock();
{ // Block thread until async callback notifies us
UNIQUE_LOCK(m,llk);
//UNIQUE_LOCK(m,llk);
std::unique_lock<std::mutex> llk(m);
cv.wait_for(llk, std::chrono::seconds(1), [&hasreturned]{return hasreturned;});
// Cancel any further results
......@@ -318,7 +320,8 @@ std::vector<R> Universe::findAll(const std::string &name, ARGS... args) {
std::vector<R> results;
auto handler = [&](const std::vector<R> &r) {
UNIQUE_LOCK(m,lk);
//UNIQUE_LOCK(m,lk);
std::unique_lock<std::mutex> lk(m);
returncount++;
results.insert(results.end(), r.begin(), r.end());
lk.unlock();
......@@ -337,7 +340,8 @@ std::vector<R> Universe::findAll(const std::string &name, ARGS... args) {
lk.unlock();
{ // Block thread until async callback notifies us
UNIQUE_LOCK(m,llk);
//UNIQUE_LOCK(m,llk);
std::unique_lock<std::mutex> llk(m);
cv.wait_for(llk, std::chrono::seconds(1), [&returncount,&sentcount]{return returncount == sentcount;});
// Cancel any further results
......
......@@ -41,10 +41,7 @@ using ftl::net::Dispatcher;
using std::chrono::seconds;
using ftl::net::Universe;
using ftl::net::callback_t;
using std::mutex;
using std::vector;
using std::recursive_mutex;
using std::unique_lock;
/*static std::string hexStr(const std::string &s)
{
......@@ -535,7 +532,8 @@ bool Peer::waitConnection() {
if (status_ == kConnected) return true;
std::mutex m;
UNIQUE_LOCK(m,lk);
//UNIQUE_LOCK(m,lk);
std::unique_lock<std::mutex> lk(m);
std::condition_variable cv;
callback_t h = universe_->onConnect([this,&cv](Peer *p) {
......
......@@ -19,10 +19,6 @@ using ftl::net::Universe;
using nlohmann::json;
using ftl::UUID;
using std::optional;
using std::unique_lock;
using std::shared_lock;
using std::mutex;
using std::shared_mutex;
using ftl::config::json_t;
using ftl::net::callback_t;
......
......@@ -3,12 +3,12 @@
#include <ftl/configuration.hpp>
#include <ftl/rgbd/camera.hpp>
#include <ftl/threads.hpp>
//#include <ftl/net/universe.hpp>
#include <ftl/uri.hpp>
#include <ftl/rgbd/detail/source.hpp>
#include <opencv2/opencv.hpp>
#include <Eigen/Eigen>
#include <shared_mutex>
#include <string>
namespace ftl {
......@@ -167,7 +167,7 @@ class Source : public ftl::Configurable {
void customImplementation(detail::Source *);
std::shared_mutex &mutex() { return mutex_; }
SHARED_MUTEX &mutex() { return mutex_; }
private:
detail::Source *impl_;
......@@ -177,7 +177,7 @@ class Source : public ftl::Configurable {
Camera params_; // TODO Find better solution
Eigen::Matrix4d pose_;
ftl::net::Universe *net_;
std::shared_mutex mutex_;
SHARED_MUTEX mutex_;
bool paused_;
bool bullet_;
......
......@@ -10,7 +10,6 @@
#include <string>
#include <vector>
#include <map>
#include <shared_mutex>
#include <atomic>
namespace ftl {
......@@ -41,7 +40,7 @@ struct StreamSource {
cv::Mat prev_rgb;
cv::Mat prev_depth;
std::list<detail::StreamClient> clients[10]; // One list per bitrate
std::shared_mutex mutex;
SHARED_MUTEX mutex;
unsigned long long frame;
};
......@@ -107,7 +106,7 @@ class Streamer : public ftl::Configurable {
private:
std::map<std::string, detail::StreamSource*> sources_;
//ctpl::thread_pool pool_;
std::shared_mutex mutex_;
SHARED_MUTEX mutex_;
bool active_;
ftl::net::Universe *net_;
bool late_;
......
......@@ -4,11 +4,11 @@
#include <ftl/config.h>
#include <ftl/configurable.hpp>
#include <ftl/threads.hpp>
#include <ftl/camera_params.hpp>
#include <ftl/net/universe.hpp>
#include <opencv2/opencv.hpp>
#include <Eigen/Eigen>
#include <mutex>
namespace ftl {
namespace rgbd {
......@@ -77,7 +77,7 @@ class RGBDSource : public ftl::Configurable {
protected:
CameraParameters params_;
ftl::net::Universe *net_;
std::mutex mutex_;
MUTEX mutex_;
cv::Mat rgb_;
cv::Mat depth_;
......
......@@ -2,7 +2,6 @@
#include <vector>
#include <thread>
#include <chrono>
#include <shared_mutex>
#include <tuple>
#include "colour.hpp"
......@@ -14,8 +13,6 @@ using ftl::net::Universe;
using ftl::UUID;
using std::string;
using ftl::rgbd::Camera;
using std::shared_mutex;
using std::unique_lock;
using std::vector;
using std::this_thread::sleep_for;
using std::chrono::milliseconds;
......
......@@ -4,8 +4,8 @@
#include <ftl/net/universe.hpp>
#include <ftl/rgbd/source.hpp>
#include <ftl/threads.hpp>
#include <string>
#include <mutex>
namespace ftl {
namespace rgbd {
......@@ -36,7 +36,7 @@ class NetSource : public detail::Source {
bool active_;
std::string uri_;
ftl::net::callback_t h_;
std::mutex mutex_;
MUTEX mutex_;
int chunks_dim_;
int chunk_width_;
int chunk_height_;
......
......@@ -20,9 +20,6 @@ using ftl::rgbd::detail::RealsenseSource;
using ftl::rgbd::Source;
using ftl::Configurable;
using std::string;
using std::shared_mutex;
using std::unique_lock;
using std::shared_lock;
using ftl::rgbd::detail::StereoVideoSource;
using ftl::rgbd::detail::NetSource;
using ftl::rgbd::detail::ImageSource;
......
#include <loguru.hpp>
#include "stereovideo.hpp"
#include <ftl/configuration.hpp>
#include <ftl/threads.hpp>
#include "calibrate.hpp"
#include "local.hpp"
#include "disparity.hpp"
#include <mutex>
using ftl::rgbd::detail::Calibrate;
using ftl::rgbd::detail::LocalSource;
using ftl::rgbd::detail::StereoVideoSource;
using std::string;
using std::mutex;
using std::unique_lock;
StereoVideoSource::StereoVideoSource(ftl::rgbd::Source *host)
: ftl::rgbd::detail::Source(host), ready_(false) {
......@@ -83,14 +81,14 @@ void StereoVideoSource::init(const string &file) {
// Add event handlers to allow calibration changes...
host_->on("baseline", [this](const ftl::config::Event &e) {
params_.baseline = host_->value("baseline", params_.baseline);
std::unique_lock<std::shared_mutex> lk(host_->mutex());
UNIQUE_LOCK(host_->mutex(), lk);
calib_->updateCalibration(params_);
});
host_->on("focal", [this](const ftl::config::Event &e) {
params_.fx = host_->value("focal", params_.fx);
params_.fy = params_.fx;
std::unique_lock<std::shared_mutex> lk(host_->mutex());
UNIQUE_LOCK(host_->mutex(), lk);
calib_->updateCalibration(params_);
});
......
......@@ -18,10 +18,6 @@ using std::list;
using std::map;
using std::optional;
using std::vector;
using std::mutex;
using std::shared_mutex;
using std::unique_lock;
using std::shared_lock;
using std::this_thread::sleep_for;
using std::chrono::milliseconds;
using std::tuple;
......@@ -259,8 +255,12 @@ void Streamer::wait() {
}
// Wait for all jobs to complete before finishing frame
UNIQUE_LOCK(job_mtx_, lk);
job_cv_.wait(lk, [this]{ return jobs_ == 0; });
//UNIQUE_LOCK(job_mtx_, lk);
std::unique_lock<std::mutex> lk(job_mtx_);
job_cv_.wait_for(lk, std::chrono::seconds(20), [this]{ return jobs_ == 0; });
if (jobs_ != 0) {
LOG(FATAL) << "Deadlock detected";
}
}
void Streamer::_schedule() {
......@@ -282,12 +282,12 @@ void Streamer::_schedule() {
// There will be two jobs for this source...
//UNIQUE_LOCK(job_mtx_,lk);
jobs_ += 1 + kChunkDim*kChunkDim;
jobs_ += 1 + kChunkCount;
//lk.unlock();
StreamSource *src = sources_[uri];
if (src == nullptr || src->jobs != 0) continue;
src->jobs = 1 + kChunkDim*kChunkDim;
src->jobs = 1 + kChunkCount;
// Grab job
ftl::pool.push([this,src](int id) {
......@@ -310,6 +310,7 @@ void Streamer::_schedule() {
_swap(src);
// Mark job as finished
std::unique_lock<std::mutex> lk(job_mtx_);
--jobs_;
job_cv_.notify_one();
});
......@@ -388,6 +389,7 @@ void Streamer::_schedule() {
src->jobs--;
_swap(src);
std::unique_lock<std::mutex> lk(job_mtx_);
--jobs_;
job_cv_.notify_one();
}, i);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment