Skip to content
Snippets Groups Projects
Commit c69312c4 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Resolves #140 with fps adjustment

parent b3cb4352
No related branches found
No related tags found
No related merge requests found
...@@ -138,9 +138,7 @@ int Universe::_setDescriptors() { ...@@ -138,9 +138,7 @@ int Universe::_setDescriptors() {
n = s->_socket(); n = s->_socket();
} }
//if (s->isWaiting()) { FD_SET(s->_socket(), &sfdread_);
FD_SET(s->_socket(), &sfdread_);
//}
FD_SET(s->_socket(), &sfderror_); FD_SET(s->_socket(), &sfderror_);
} }
} }
...@@ -154,17 +152,7 @@ void Universe::_installBindings(Peer *p) { ...@@ -154,17 +152,7 @@ void Universe::_installBindings(Peer *p) {
} }
void Universe::_installBindings() { void Universe::_installBindings() {
/*bind("__subscribe__", [this](const UUID &id, const string &uri) -> bool {
LOG(INFO) << "Subscription to " << uri << " by " << id.to_string();
unique_lock<shared_mutex> lk(net_mutex_);
subscribers_[ftl::URI(uri).to_string()].push_back(id);
return true;
});
bind("__owner__", [this](const std::string &res) -> optional<UUID> {
if (owned_.count(res) > 0) return this_peer;
else return {};
});*/
} }
// Note: should be called inside a net lock // Note: should be called inside a net lock
...@@ -173,6 +161,8 @@ void Universe::_cleanupPeers() { ...@@ -173,6 +161,8 @@ void Universe::_cleanupPeers() {
if (ftl::pool.n_idle() == ftl::pool.size()) { if (ftl::pool.n_idle() == ftl::pool.size()) {
if (garbage_.size() > 0) LOG(INFO) << "Garbage collection"; if (garbage_.size() > 0) LOG(INFO) << "Garbage collection";
while (garbage_.size() > 0) { while (garbage_.size() > 0) {
// FIXME: There is possibly still something with a peer pointer
// that is causing this throw an exception sometimes?
delete garbage_.front(); delete garbage_.front();
garbage_.pop_front(); garbage_.pop_front();
} }
...@@ -287,8 +277,8 @@ void Universe::_run() { ...@@ -287,8 +277,8 @@ void Universe::_run() {
continue; continue;
} }
// CHECK Could this mutex be the problem!?
{ {
// TODO:(Nick) Shared lock unless connection is made
UNIQUE_LOCK(net_mutex_,lk); UNIQUE_LOCK(net_mutex_,lk);
//If connection request is waiting //If connection request is waiting
...@@ -304,7 +294,7 @@ void Universe::_run() { ...@@ -304,7 +294,7 @@ void Universe::_run() {
if (csock != INVALID_SOCKET) { if (csock != INVALID_SOCKET) {
auto p = new Peer(csock, this, &disp_); auto p = new Peer(csock, this, &disp_);
peers_.push_back(p); peers_.push_back(p);
_installBindings(p); //_installBindings(p);
} }
} }
} }
......
...@@ -29,7 +29,9 @@ struct StreamClient { ...@@ -29,7 +29,9 @@ struct StreamClient {
static const unsigned int kGrabbed = 0x1; static const unsigned int kGrabbed = 0x1;
static const unsigned int kRGB = 0x2; static const unsigned int kRGB = 0x2;
static const unsigned int kDepth = 0x4; static const unsigned int kDepth = 0x4;
static const unsigned int kFrameDropLimit = 5;
struct StreamSource { struct StreamSource {
ftl::rgbd::Source *src; ftl::rgbd::Source *src;
...@@ -119,12 +121,18 @@ class Streamer : public ftl::Configurable { ...@@ -119,12 +121,18 @@ class Streamer : public ftl::Configurable {
int64_t last_frame_; int64_t last_frame_;
int64_t frame_no_; int64_t frame_no_;
int64_t mspf_;
float actual_fps_;
//int64_t last_dropped_;
//int drop_count_;
void _schedule(); void _schedule();
void _swap(detail::StreamSource *); void _swap(detail::StreamSource *);
void _addClient(const std::string &source, int N, int rate, const ftl::UUID &peer, const std::string &dest); void _addClient(const std::string &source, int N, int rate, const ftl::UUID &peer, const std::string &dest);
void _encodeAndTransmit(detail::StreamSource *src, int chunk); void _encodeAndTransmit(detail::StreamSource *src, int chunk);
void _encodeChannel1(const cv::Mat &in, std::vector<unsigned char> &out, unsigned int b); void _encodeChannel1(const cv::Mat &in, std::vector<unsigned char> &out, unsigned int b);
bool _encodeChannel2(const cv::Mat &in, std::vector<unsigned char> &out, ftl::rgbd::channel_t c, unsigned int b); bool _encodeChannel2(const cv::Mat &in, std::vector<unsigned char> &out, ftl::rgbd::channel_t c, unsigned int b);
void _decideFrameRate(int64_t framesdropped, int64_t msremainder);
}; };
} }
......
...@@ -161,7 +161,7 @@ void NetSource::_recvChunk(int64_t frame, int chunk, bool delta, const vector<un ...@@ -161,7 +161,7 @@ void NetSource::_recvChunk(int64_t frame, int chunk, bool delta, const vector<un
depth_ = d_depth_; depth_ = d_depth_;
d_depth_ = tmp; d_depth_ = tmp;
timestamp_ = current_frame_*40; // FIXME: Don't hardcode 40ms timestamp_ = current_frame_;
current_frame_ = frame; current_frame_ = frame;
} }
...@@ -277,13 +277,6 @@ void NetSource::_updateURI() { ...@@ -277,13 +277,6 @@ void NetSource::_updateURI() {
N_ = 0; N_ = 0;
// Initiate stream with request for first 10 frames
//try {
// host_->getNet()->send(peer_, "get_stream", *uri, N_, 0, host_->getNet()->id(), *uri);
//} catch(...) {
// LOG(ERROR) << "Could not connect to stream " << *uri;
//}
// Update chunk details // Update chunk details
chunks_dim_ = ftl::rgbd::kChunkDim; chunks_dim_ = ftl::rgbd::kChunkDim;
chunk_width_ = params_.width / chunks_dim_; chunk_width_ = params_.width / chunks_dim_;
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
#include <thread> #include <thread>
#include <chrono> #include <chrono>
#include <tuple> #include <tuple>
#include <algorithm>
#include "bitrate_settings.hpp" #include "bitrate_settings.hpp"
...@@ -31,6 +32,9 @@ Streamer::Streamer(nlohmann::json &config, Universe *net) ...@@ -31,6 +32,9 @@ Streamer::Streamer(nlohmann::json &config, Universe *net)
net_ = net; net_ = net;
time_peer_ = ftl::UUID(0); time_peer_ = ftl::UUID(0);
clock_adjust_ = 0; clock_adjust_ = 0;
mspf_ = 1000 / value("fps", 25);
//last_dropped_ = 0;
//drop_count_ = 0;
compress_level_ = value("compression", 1); compress_level_ = value("compression", 1);
...@@ -199,42 +203,43 @@ void Streamer::remove(const std::string &) { ...@@ -199,42 +203,43 @@ void Streamer::remove(const std::string &) {
} }
void Streamer::_decideFrameRate(int64_t framesdropped, int64_t msremainder) {
actual_fps_ = 1000.0f / (float)((framesdropped+1)*mspf_+(msremainder));
LOG(INFO) << "Actual FPS = " << actual_fps_;
/*if (framesdropped > 0) {
// If N consecutive frames are dropped, work out new rate
if (last_dropped_/mspf_ >= last_frame_/mspf_ - 2*framesdropped) drop_count_++;
else drop_count_ = 0;
last_dropped_ = last_frame_+mspf_;
if (drop_count_ >= ftl::rgbd::detail::kFrameDropLimit) {
drop_count_ = 0;
const int64_t actualmspf = std::min((int64_t)1000, framesdropped*mspf_+(mspf_ - msremainder));
LOG(WARNING) << "Suggest FPS @ " << (1000 / actualmspf);
//mspf_ = actualmspf;
// Also notify all clients of change
}
} else {
// Perhaps we can boost framerate?
const int64_t actualmspf = std::min((int64_t)1000, framesdropped*mspf_+(mspf_ - msremainder));
LOG(INFO) << "Boost framerate: " << (1000 / actualmspf);
//mspf_ = actualmspf;
}*/
}
void Streamer::stop() { void Streamer::stop() {
active_ = false; active_ = false;
wait(); wait();
} }
void Streamer::poll() { void Streamer::poll() {
//double wait = 1.0f / 25.0f; // TODO:(Nick) Should be in config
//auto start = std::chrono::high_resolution_clock::now();
//int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count()+clock_adjust_;
//int64_t msdelay = 40 - (now % 40);
//while (msdelay >= 20) {
// sleep_for(milliseconds(10));
// now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count()+clock_adjust_;
// msdelay = 40 - (now % 40);
//}
//LOG(INFO) << "Required Delay: " << (now / 40) << " = " << msdelay;
// Create frame jobs at correct FPS interval // Create frame jobs at correct FPS interval
_schedule(); _schedule();
//std::function<void(int)> j = ftl::pool.pop();
//if (j) j(-1);
//std::chrono::duration<double> elapsed =
// std::chrono::high_resolution_clock::now() - start;
//if (elapsed.count() >= wait) {
//LOG(WARNING) << "Frame rate below optimal @ " << (1.0f / elapsed.count());
//} else {
//LOG(INFO) << "Frame rate @ " << (1.0f / elapsed.count());
// Otherwise, wait until next frame should start.
// FIXME:(Nick) Is this accurate enough? Almost certainly not
// TODO:(Nick) Synchronise by time corrections and use of fixed time points
// but this only works if framerate can be achieved.
//sleep_for(milliseconds((long long)((wait - elapsed.count()) * 1000.0f)));
//}
} }
void Streamer::run(bool block) { void Streamer::run(bool block) {
...@@ -306,12 +311,6 @@ void Streamer::wait() { ...@@ -306,12 +311,6 @@ void Streamer::wait() {
void Streamer::_schedule() { void Streamer::_schedule() {
wait(); wait();
//std::mutex job_mtx;
//std::condition_variable job_cv;
//int jobs = 0;
//auto now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count()+clock_adjust_;
//LOG(INFO) << "Frame time = " << (now-(last_frame_*40)) << "ms";
// Prevent new clients during processing. // Prevent new clients during processing.
SHARED_LOCK(mutex_,slk); SHARED_LOCK(mutex_,slk);
...@@ -339,28 +338,30 @@ void Streamer::_schedule() { ...@@ -339,28 +338,30 @@ void Streamer::_schedule() {
auto start = std::chrono::high_resolution_clock::now(); auto start = std::chrono::high_resolution_clock::now();
int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count()+clock_adjust_; int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count()+clock_adjust_;
int64_t target = now / 40; int64_t target = now / mspf_;
// TODO:(Nick) A now%40 == 0 should be accepted // TODO:(Nick) A now%mspf_ == 0 should be accepted
if (target != last_frame_) LOG(WARNING) << "Frame " << "(" << (target-last_frame_) << ") dropped by " << (now%40) << "ms"; if (target != last_frame_) {
LOG(WARNING) << "Frame " << "(" << (target-last_frame_) << ") dropped by " << (now%mspf_) << "ms";
}
//_decideFrameRate(target-last_frame_, now%mspf_);
// Use sleep_for for larger delays // Use sleep_for for larger delays
int64_t msdelay = 40 - (now % 40); int64_t msdelay = mspf_ - (now % mspf_);
//LOG(INFO) << "Required Delay: " << (now / 40) << " = " << msdelay; //LOG(INFO) << "Required Delay: " << (now / 40) << " = " << msdelay;
while (msdelay >= 20) { while (msdelay >= 20) {
sleep_for(milliseconds(10)); sleep_for(milliseconds(10));
now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count()+clock_adjust_; now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count()+clock_adjust_;
msdelay = 40 - (now % 40); msdelay = mspf_ - (now % mspf_);
} }
// Spin loop until exact grab time // Spin loop until exact grab time
//LOG(INFO) << "Spin Delay: " << (now / 40) << " = " << (40 - (now%40)); target = now / mspf_;
target = now / 40; while ((now/mspf_) == target) {
while ((now/40) == target) {
_mm_pause(); // SSE2 nano pause intrinsic _mm_pause(); // SSE2 nano pause intrinsic
now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count()+clock_adjust_; now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count()+clock_adjust_;
}; };
last_frame_ = now/40; last_frame_ = now/mspf_;
try { try {
src->src->capture(); src->src->capture();
...@@ -372,20 +373,13 @@ void Streamer::_schedule() { ...@@ -372,20 +373,13 @@ void Streamer::_schedule() {
LOG(ERROR) << "Unknown exception when grabbing frame"; LOG(ERROR) << "Unknown exception when grabbing frame";
} }
//now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count()+clock_adjust_;
//if (now%40 > 0) LOG(INFO) << "Grab in: " << (now%40) << "ms";
//std::chrono::duration<double> elapsed =
// std::chrono::high_resolution_clock::now() - start;
//LOG(INFO) << "Grab in " << elapsed.count() << "s";
src->jobs--; src->jobs--;
_swap(src); _swap(src);
// Mark job as finished // Mark job as finished
std::unique_lock<std::mutex> lk(job_mtx_); std::unique_lock<std::mutex> lk(job_mtx_);
--jobs_; --jobs_;
job_cv_.notify_one(); if (jobs_ == 0) job_cv_.notify_one();
}); });
// Compute job // Compute job
...@@ -426,7 +420,7 @@ void Streamer::_schedule() { ...@@ -426,7 +420,7 @@ void Streamer::_schedule() {
_swap(src); _swap(src);
std::unique_lock<std::mutex> lk(job_mtx_); std::unique_lock<std::mutex> lk(job_mtx_);
--jobs_; --jobs_;
job_cv_.notify_one(); if (jobs_ == 0) job_cv_.notify_one();
}); });
} }
} }
...@@ -490,8 +484,8 @@ void Streamer::_encodeAndTransmit(StreamSource *src, int chunk) { ...@@ -490,8 +484,8 @@ void Streamer::_encodeAndTransmit(StreamSource *src, int chunk) {
auto c = src->clients[b].begin(); auto c = src->clients[b].begin();
while (c != src->clients[b].end()) { while (c != src->clients[b].end()) {
try { try {
// TODO:(Nick) Send pose and timestamp // TODO:(Nick) Send pose
if (!net_->send((*c).peerid, (*c).uri, frame_no_, chunk, delta, rgb_buf, d_buf)) { if (!net_->send((*c).peerid, (*c).uri, frame_no_*mspf_, chunk, delta, rgb_buf, d_buf)) {
// Send failed so mark as client stream completed // Send failed so mark as client stream completed
(*c).txcount = (*c).txmax; (*c).txcount = (*c).txmax;
} else { } else {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment