Skip to content
Snippets Groups Projects
Commit 7ac88821 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Merge branch 'feature/50/sync' into 'master'

Resolves #50 stream sync

Closes #50

See merge request nicolas.pope/ftl!67
parents d0ac179e b6654659
No related branches found
No related tags found
1 merge request!67Resolves #50 stream sync
Checking pipeline status
......@@ -126,9 +126,10 @@ static void run(ftl::Configurable *root) {
active = scene->upload();
// Make sure previous virtual camera frame has finished rendering
stream->wait();
//stream->wait();
cudaSafeCall(cudaStreamSynchronize(scene->getIntegrationStream()));
LOG(INFO) << "Heap: " << scene->getHeapFreeCount();
//LOG(INFO) << "Heap: " << scene->getHeapFreeCount();
// Merge new frames into the voxel structure
scene->integrate();
......
......@@ -195,8 +195,9 @@ Peer::Peer(SOCKET s, Universe *u, Dispatcher *d) : sock_(s), can_reconnect_(fals
LOG(INFO) << "Peer elected to disconnect: " << id().to_string();
});
bind("__ping__", [this](unsigned long long timestamp) {
return timestamp;
bind("__ping__", [this]() {
auto now = std::chrono::high_resolution_clock::now();
return std::chrono::time_point_cast<std::chrono::milliseconds>(now).time_since_epoch().count();
});
send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, ftl::net::this_peer);
......@@ -269,8 +270,9 @@ Peer::Peer(const char *pUri, Universe *u, Dispatcher *d) : can_reconnect_(true),
LOG(INFO) << "Peer elected to disconnect: " << id().to_string();
});
bind("__ping__", [this](unsigned long long timestamp) {
return timestamp;
bind("__ping__", [this]() {
auto now = std::chrono::high_resolution_clock::now();
return std::chrono::time_point_cast<std::chrono::milliseconds>(now).time_since_epoch().count();
});
}
}
......
......@@ -24,7 +24,7 @@ class Source {
friend class ftl::rgbd::Source;
public:
explicit Source(ftl::rgbd::Source *host) : capabilities_(0), host_(host), params_({0}) { }
explicit Source(ftl::rgbd::Source *host) : capabilities_(0), host_(host), params_({0}), timestamp_(0) { }
virtual ~Source() {}
/**
......@@ -41,6 +41,7 @@ class Source {
ftl::rgbd::Camera params_;
cv::Mat rgb_;
cv::Mat depth_;
int64_t timestamp_;
//Eigen::Matrix4f pose_;
};
......
......@@ -118,6 +118,8 @@ class Source : public ftl::Configurable {
void writeFrames(const ftl::cuda::TextureObject<uchar4> &rgb, const ftl::cuda::TextureObject<uint> &depth, cudaStream_t stream);
void writeFrames(const ftl::cuda::TextureObject<uchar4> &rgb, const ftl::cuda::TextureObject<float> &depth, cudaStream_t stream);
int64_t timestamp() const { return (impl_) ? impl_->timestamp_ : 0; }
/**
* Directly upload source RGB and Depth to GPU.
*/
......@@ -201,6 +203,7 @@ class Source : public ftl::Configurable {
bool paused_;
bool bullet_;
channel_t channel_;
cudaStream_t stream_;
detail::Source *_createImplementation();
detail::Source *_createFileImpl(const ftl::URI &uri);
......
......@@ -114,6 +114,10 @@ class Streamer : public ftl::Configurable {
std::condition_variable job_cv_;
std::atomic<int> jobs_;
int compress_level_;
int64_t clock_adjust_;
ftl::UUID time_peer_;
int64_t last_frame_;
int64_t frame_no_;
void _schedule();
void _swap(detail::StreamSource *);
......
......@@ -59,7 +59,7 @@ bool NetSource::_getCalibration(Universe &net, const UUID &peer, const string &s
}
NetSource::NetSource(ftl::rgbd::Source *host)
: ftl::rgbd::detail::Source(host), active_(false), minB_(9), maxN_(1) {
: ftl::rgbd::detail::Source(host), active_(false), minB_(9), maxN_(1), current_frame_(0) {
gamma_ = host->value("gamma", 1.0f);
temperature_ = host->value("temperature", 6500);
......@@ -105,11 +105,31 @@ NetSource::~NetSource() {
host_->getNet()->removeCallback(h_);
}
void NetSource::_recvChunk(int frame, int chunk, bool delta, const vector<unsigned char> &jpg, const vector<unsigned char> &d) {
void NetSource::_recvChunk(int64_t frame, int chunk, bool delta, const vector<unsigned char> &jpg, const vector<unsigned char> &d) {
cv::Mat tmp_rgb, tmp_depth;
if (!active_) return;
// A new frame has been started... finish the last one
if (frame > current_frame_) {
// Lock host to prevent grab
UNIQUE_LOCK(host_->mutex(),lk);
// Swap the double buffers
cv::Mat tmp;
tmp = rgb_;
rgb_ = d_rgb_;
d_rgb_ = tmp;
tmp = depth_;
depth_ = d_depth_;
d_depth_ = tmp;
timestamp_ = current_frame_*40; // FIXME: Don't hardcode 40ms
current_frame_ = frame;
} else if (frame < current_frame_) {
LOG(WARNING) << "Chunk dropped";
}
// Decode in temporary buffers to prevent long locks
cv::imdecode(jpg, cv::IMREAD_COLOR, &tmp_rgb);
if (d.size() > 0) cv::imdecode(d, cv::IMREAD_UNCHANGED, &tmp_depth);
......@@ -123,10 +143,12 @@ void NetSource::_recvChunk(int frame, int chunk, bool delta, const vector<unsign
// Lock host to prevent grab
UNIQUE_LOCK(host_->mutex(),lk);
// TODO:(Nick) Decode directly into double buffer if no scaling
cv::Rect roi(cx,cy,chunk_width_,chunk_height_);
cv::Mat chunkRGB = rgb_(roi);
cv::Mat chunkDepth = depth_(roi);
cv::Mat chunkRGB = d_rgb_(roi);
cv::Mat chunkDepth = d_depth_(roi);
// Original size so just copy
if (tmp_rgb.cols == chunkRGB.cols) {
......@@ -191,7 +213,7 @@ void NetSource::_updateURI() {
has_calibration_ = _getCalibration(*host_->getNet(), peer_, *uri, params_);
host_->getNet()->bind(*uri, [this](int frame, int chunk, bool delta, const vector<unsigned char> &jpg, const vector<unsigned char> &d) {
host_->getNet()->bind(*uri, [this](int64_t frame, int chunk, bool delta, const vector<unsigned char> &jpg, const vector<unsigned char> &d) {
_recvChunk(frame, chunk, delta, jpg, d);
});
......@@ -210,6 +232,8 @@ void NetSource::_updateURI() {
chunk_height_ = params_.height / chunks_dim_;
rgb_ = cv::Mat(cv::Size(params_.width, params_.height), CV_8UC3, cv::Scalar(0,0,0));
depth_ = cv::Mat(cv::Size(params_.width, params_.height), CV_32FC1, 0.0f);
d_rgb_ = cv::Mat(cv::Size(params_.width, params_.height), CV_8UC3, cv::Scalar(0,0,0));
d_depth_ = cv::Mat(cv::Size(params_.width, params_.height), CV_32FC1, 0.0f);
uri_ = *uri;
active_ = true;
......
......@@ -47,10 +47,15 @@ class NetSource : public detail::Source {
int maxN_;
int default_quality_;
ftl::rgbd::channel_t prev_chan_;
int64_t current_frame_;
// Double buffering
cv::Mat d_depth_;
cv::Mat d_rgb_;
bool _getCalibration(ftl::net::Universe &net, const ftl::UUID &peer, const std::string &src, ftl::rgbd::Camera &p);
void _recv(const std::vector<unsigned char> &jpg, const std::vector<unsigned char> &d);
void _recvChunk(int frame, int chunk, bool delta, const std::vector<unsigned char> &jpg, const std::vector<unsigned char> &d);
void _recvChunk(int64_t frame, int chunk, bool delta, const std::vector<unsigned char> &jpg, const std::vector<unsigned char> &d);
void _updateURI();
};
......
......@@ -29,6 +29,7 @@ using ftl::rgbd::capability_t;
Source::Source(ftl::config::json_t &cfg) : Configurable(cfg), pose_(Eigen::Matrix4d::Identity()), net_(nullptr) {
impl_ = nullptr;
params_ = {0};
stream_ = 0;
reset();
on("uri", [this](const ftl::config::Event &e) {
......@@ -40,6 +41,7 @@ Source::Source(ftl::config::json_t &cfg) : Configurable(cfg), pose_(Eigen::Matri
Source::Source(ftl::config::json_t &cfg, ftl::net::Universe *net) : Configurable(cfg), pose_(Eigen::Matrix4d::Identity()), net_(net) {
impl_ = nullptr;
params_ = {0};
stream_ = 0;
reset();
on("uri", [this](const ftl::config::Event &e) {
......@@ -220,7 +222,12 @@ void Source::reset() {
bool Source::grab() {
UNIQUE_LOCK(mutex_,lk);
if (impl_ && impl_->grab(-1,-1)) {
if (!impl_ && stream_ != 0) {
cudaSafeCall(cudaStreamSynchronize(stream_));
if (depth_.type() == CV_32SC1) depth_.convertTo(depth_, CV_32F, 1.0f / 1000.0f);
stream_ = 0;
return true;
} else if (impl_ && impl_->grab(-1,-1)) {
impl_->rgb_.copyTo(rgb_);
impl_->depth_.copyTo(depth_);
return true;
......@@ -243,9 +250,9 @@ void Source::writeFrames(const ftl::cuda::TextureObject<uchar4> &rgb, const ftl:
cudaSafeCall(cudaMemcpy2DAsync(rgb_.data, rgb_.step, rgb.devicePtr(), rgb.pitch(), rgb_.cols * sizeof(uchar4), rgb_.rows, cudaMemcpyDeviceToHost, stream));
depth_.create(depth.height(), depth.width(), CV_32SC1);
cudaSafeCall(cudaMemcpy2DAsync(depth_.data, depth_.step, depth.devicePtr(), depth.pitch(), depth_.cols * sizeof(uint), depth_.rows, cudaMemcpyDeviceToHost, stream));
cudaSafeCall(cudaStreamSynchronize(stream)); // TODO:(Nick) Don't wait here.
depth_.convertTo(depth_, CV_32F, 1.0f / 1000.0f);
//cudaSafeCall(cudaStreamSynchronize(stream)); // TODO:(Nick) Don't wait here.
stream_ = stream;
//depth_.convertTo(depth_, CV_32F, 1.0f / 1000.0f);
} else {
LOG(ERROR) << "writeFrames cannot be done on this source: " << getURI();
}
......@@ -258,12 +265,17 @@ void Source::writeFrames(const ftl::cuda::TextureObject<uchar4> &rgb, const ftl:
cudaSafeCall(cudaMemcpy2DAsync(rgb_.data, rgb_.step, rgb.devicePtr(), rgb.pitch(), rgb_.cols * sizeof(uchar4), rgb_.rows, cudaMemcpyDeviceToHost, stream));
depth_.create(depth.height(), depth.width(), CV_32FC1);
cudaSafeCall(cudaMemcpy2DAsync(depth_.data, depth_.step, depth.devicePtr(), depth.pitch(), depth_.cols * sizeof(float), depth_.rows, cudaMemcpyDeviceToHost, stream));
cudaSafeCall(cudaStreamSynchronize(stream)); // TODO:(Nick) Don't wait here.
stream_ = stream;
}
}
bool Source::thumbnail(cv::Mat &t) {
if (impl_) {
if (!impl_ && stream_ != 0) {
cudaSafeCall(cudaStreamSynchronize(stream_));
if (depth_.type() == CV_32SC1) depth_.convertTo(depth_, CV_32F, 1.0f / 1000.0f);
stream_ = 0;
return true;
} else if (impl_) {
UNIQUE_LOCK(mutex_,lk);
impl_->grab(1, 9);
impl_->rgb_.copyTo(rgb_);
......
......@@ -135,19 +135,19 @@ bool StereoVideoSource::grab(int n, int b) {
//rgb_ = lsrc_->cachedLeft();
depth_tmp_.download(depth_, stream_);
stream_.waitForCompletion();
stream_.waitForCompletion(); // TODO:(Nick) Move to getFrames
} else if (chan == ftl::rgbd::kChanRight) {
lsrc_->get(left_, right_, stream_);
calib_->rectifyStereo(left_, right_, stream_);
left_.download(rgb_, stream_);
right_.download(depth_, stream_);
stream_.waitForCompletion();
stream_.waitForCompletion(); // TODO:(Nick) Move to getFrames
} else {
lsrc_->get(left_, right_, stream_);
calib_->rectifyStereo(left_, right_, stream_);
//rgb_ = lsrc_->cachedLeft();
left_.download(rgb_, stream_);
stream_.waitForCompletion();
stream_.waitForCompletion(); // TODO:(Nick) Move to getFrames
}
return true;
}
......
......@@ -29,6 +29,8 @@ Streamer::Streamer(nlohmann::json &config, Universe *net)
active_ = false;
net_ = net;
time_peer_ = ftl::UUID(0);
clock_adjust_ = 0;
compress_level_ = value("compression", 1);
......@@ -99,13 +101,13 @@ Streamer::Streamer(nlohmann::json &config, Universe *net)
}
});
net->bind("sync_streams", [this](unsigned long long time) {
//net->bind("sync_streams", [this](unsigned long long time) {
// Calc timestamp delta
});
//});
net->bind("ping_streamer", [this](unsigned long long time) -> unsigned long long {
return time;
});
//net->bind("ping_streamer", [this](unsigned long long time) -> unsigned long long {
// return time;
//});
}
Streamer::~Streamer() {
......@@ -119,8 +121,6 @@ Streamer::~Streamer() {
}
void Streamer::add(Source *src) {
StreamSource *s = nullptr;
{
UNIQUE_LOCK(mutex_,ulk);
if (sources_.find(src->getID()) != sources_.end()) return;
......@@ -148,9 +148,24 @@ void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID
if (rate < 0 || rate >= 10) return;
if (N < 0 || N > ftl::rgbd::kMaxFrames) return;
LOG(INFO) << "Adding Stream Peer: " << peer.to_string() << " rate=" << rate << " N=" << N;
DLOG(INFO) << "Adding Stream Peer: " << peer.to_string() << " rate=" << rate << " N=" << N;
s = sources_[source];
// Set a time peer for clock sync
if (time_peer_ == ftl::UUID(0)) {
time_peer_ = peer;
// Also do a time sync (but should be repeated periodically)
auto start = std::chrono::high_resolution_clock::now();
int64_t mastertime = net_->call<int64_t>(peer, "__ping__");
auto elapsed = std::chrono::high_resolution_clock::now() - start;
int64_t latency = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count();
clock_adjust_ = mastertime - (std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count() + (latency/2));
LOG(INFO) << "Clock adjustment: " << clock_adjust_;
LOG(INFO) << "Latency: " << (latency / 2);
LOG(INFO) << "Local: " << std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count() << ", master: " << mastertime;
}
}
if (!s) return; // No matching stream
......@@ -189,24 +204,36 @@ void Streamer::stop() {
}
void Streamer::poll() {
double wait = 1.0f / 25.0f; // TODO:(Nick) Should be in config
auto start = std::chrono::high_resolution_clock::now();
//double wait = 1.0f / 25.0f; // TODO:(Nick) Should be in config
//auto start = std::chrono::high_resolution_clock::now();
//int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count()+clock_adjust_;
//int64_t msdelay = 40 - (now % 40);
//while (msdelay >= 20) {
// sleep_for(milliseconds(10));
// now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count()+clock_adjust_;
// msdelay = 40 - (now % 40);
//}
//LOG(INFO) << "Required Delay: " << (now / 40) << " = " << msdelay;
// Create frame jobs at correct FPS interval
_schedule();
//std::function<void(int)> j = ftl::pool.pop();
//if (j) j(-1);
std::chrono::duration<double> elapsed =
std::chrono::high_resolution_clock::now() - start;
//std::chrono::duration<double> elapsed =
// std::chrono::high_resolution_clock::now() - start;
if (elapsed.count() >= wait) {
LOG(WARNING) << "Frame rate below optimal @ " << (1.0f / elapsed.count());
} else {
//if (elapsed.count() >= wait) {
//LOG(WARNING) << "Frame rate below optimal @ " << (1.0f / elapsed.count());
//} else {
//LOG(INFO) << "Frame rate @ " << (1.0f / elapsed.count());
// Otherwise, wait until next frame should start.
// FIXME:(Nick) Is this accurate enough? Almost certainly not
// TODO:(Nick) Synchronise by time corrections and use of fixed time points
// but this only works if framerate can be achieved.
sleep_for(milliseconds((long long)((wait - elapsed.count()) * 1000.0f)));
}
//sleep_for(milliseconds((long long)((wait - elapsed.count()) * 1000.0f)));
//}
}
void Streamer::run(bool block) {
......@@ -246,6 +273,7 @@ void Streamer::_swap(StreamSource *src) {
}
src->src->getFrames(src->rgb, src->depth);
//if (!src->rgb.empty() && src->prev_depth.empty()) {
//src->prev_depth = cv::Mat(src->rgb.size(), CV_16UC1, cv::Scalar(0));
//LOG(INFO) << "Creating prevdepth: " << src->rgb.cols << "," << src->rgb.rows;
......@@ -269,6 +297,9 @@ void Streamer::wait() {
if (jobs_ != 0) {
LOG(FATAL) << "Deadlock detected";
}
// Capture frame number?
frame_no_ = last_frame_;
}
void Streamer::_schedule() {
......@@ -277,6 +308,9 @@ void Streamer::_schedule() {
//std::condition_variable job_cv;
//int jobs = 0;
//auto now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count()+clock_adjust_;
//LOG(INFO) << "Frame time = " << (now-(last_frame_*40)) << "ms";
// Prevent new clients during processing.
SHARED_LOCK(mutex_,slk);
......@@ -301,6 +335,31 @@ void Streamer::_schedule() {
ftl::pool.push([this,src](int id) {
//auto start = std::chrono::high_resolution_clock::now();
auto start = std::chrono::high_resolution_clock::now();
int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count()+clock_adjust_;
int64_t target = now / 40;
// TODO:(Nick) A now%40 == 0 should be accepted
if (target != last_frame_) LOG(WARNING) << "Frame " << "(" << (target-last_frame_) << ") dropped by " << (now%40) << "ms";
// Use sleep_for for larger delays
int64_t msdelay = 40 - (now % 40);
//LOG(INFO) << "Required Delay: " << (now / 40) << " = " << msdelay;
while (msdelay >= 20) {
sleep_for(milliseconds(10));
now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count()+clock_adjust_;
msdelay = 40 - (now % 40);
}
// Spin loop until exact grab time
//LOG(INFO) << "Spin Delay: " << (now / 40) << " = " << (40 - (now%40));
target = now / 40;
while ((now/40) == target) {
_mm_pause(); // SSE2 nano pause intrinsic
now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count()+clock_adjust_;
};
last_frame_ = now/40;
try {
src->src->grab();
} catch (std::exception &ex) {
......@@ -311,6 +370,9 @@ void Streamer::_schedule() {
LOG(ERROR) << "Unknown exception when grabbing frame";
}
//now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count()+clock_adjust_;
//if (now%40 > 0) LOG(INFO) << "Grab in: " << (now%40) << "ms";
//std::chrono::duration<double> elapsed =
// std::chrono::high_resolution_clock::now() - start;
//LOG(INFO) << "Grab in " << elapsed.count() << "s";
......@@ -405,7 +467,7 @@ void Streamer::_encodeAndTransmit(StreamSource *src, int chunk) {
while (c != src->clients[b].end()) {
try {
// TODO:(Nick) Send pose and timestamp
if (!net_->send((*c).peerid, (*c).uri, 0, chunk, delta, rgb_buf, d_buf)) {
if (!net_->send((*c).peerid, (*c).uri, frame_no_, chunk, delta, rgb_buf, d_buf)) {
// Send failed so mark as client stream completed
(*c).txcount = (*c).txmax;
} else {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment