diff --git a/components/common/cpp/include/ftl/timer.hpp b/components/common/cpp/include/ftl/timer.hpp index 045f29375d26acc4f3d1406a65ca31febce1f319..6af48cd1cd4b053e69586af88e90be3c4ded0c35 100644 --- a/components/common/cpp/include/ftl/timer.hpp +++ b/components/common/cpp/include/ftl/timer.hpp @@ -60,6 +60,8 @@ struct TimerHandle { */ void setInterval(int ms); +int getInterval(); + /** * Add the specified number of milliseconds to the clock when generating * timestamps. This is used to synchronise clocks on multiple machines as it diff --git a/components/common/cpp/src/configuration.cpp b/components/common/cpp/src/configuration.cpp index 3343832ac133e5128e135ba1199b26154e0eb641..786c07e5f55750fde8a1f543b5549f1c1e980901 100644 --- a/components/common/cpp/src/configuration.cpp +++ b/components/common/cpp/src/configuration.cpp @@ -19,6 +19,7 @@ #include <ftl/configurable.hpp> #include <ftl/uri.hpp> #include <ftl/threads.hpp> +#include <ftl/timer.hpp> #include <fstream> #include <string> @@ -515,6 +516,13 @@ Configurable *ftl::config::configure(int argc, char **argv, const std::string &r } }); + // Some global settings + ftl::timer::setInterval(rootcfg->value("fps",20)); + + int pool_size = rootcfg->value("thread_pool_factor", 2.0f)*std::thread::hardware_concurrency(); + if (pool_size != ftl::pool.size()) ftl::pool.resize(pool_size); + + //LOG(INFO) << "CONFIG: " << config["vision_default"]; //CHECK_EQ( &config, config_index["ftl://utu.fi"] ); diff --git a/components/common/cpp/src/timer.cpp b/components/common/cpp/src/timer.cpp index eb6ea802472d848f3cd8ef029a4f8aff092d377c..5f96f9045aac2cf506f28e9440f787b5d3f6a8d9 100644 --- a/components/common/cpp/src/timer.cpp +++ b/components/common/cpp/src/timer.cpp @@ -77,6 +77,10 @@ void ftl::timer::setInterval(int ms) { mspf = ms; } +int ftl::timer::getInterval() { + return mspf; +} + void ftl::timer::setClockAdjustment(int64_t ms) { clock_adjust = ms; } diff --git a/components/common/cpp/test/configurable_unit.cpp b/components/common/cpp/test/configurable_unit.cpp index 428208f7465a85421eb1c7864f384a9dcc3f7a98..af44e026a552279da53986dde38079e7695a889b 100644 --- a/components/common/cpp/test/configurable_unit.cpp +++ b/components/common/cpp/test/configurable_unit.cpp @@ -6,6 +6,12 @@ using ftl::Configurable; using std::string; +namespace ftl { +namespace timer { +void setInterval(int i) {} +} +} + SCENARIO( "Configurable::get()" ) { GIVEN( "a non-existent property" ) { // cppcheck-suppress constStatement diff --git a/components/net/cpp/include/ftl/net/universe.hpp b/components/net/cpp/include/ftl/net/universe.hpp index 24384b39af91a9c655a27217b99741276d08a627..6394e26e36aed185c3b57af30e55eb97c5ef5876 100644 --- a/components/net/cpp/include/ftl/net/universe.hpp +++ b/components/net/cpp/include/ftl/net/universe.hpp @@ -188,6 +188,9 @@ class Universe : public ftl::Configurable { ftl::net::callback_t onError(const std::function<void(ftl::net::Peer*, const ftl::net::Error &)>&); void removeCallback(ftl::net::callback_t cbid); + + size_t getSendBufferSize() const { return send_size_; } + size_t getRecvBufferSize() const { return recv_size_; } private: void _run(); @@ -220,6 +223,13 @@ class Universe : public ftl::Configurable { std::list<ReconnectInfo> reconnects_; size_t phase_; std::list<ftl::net::Peer*> garbage_; + + size_t send_size_; + size_t recv_size_; + double periodic_time_; + int reconnect_attempts_; + + // NOTE: Must always be last member std::thread thread_; struct ConnHandler { diff --git a/components/net/cpp/src/peer.cpp b/components/net/cpp/src/peer.cpp index 517273c7d41f6bd9261651944dd6d466153c68c4..6738470fe013e533d769a1cb3c860537a0cfa4ff 100644 --- a/components/net/cpp/src/peer.cpp +++ b/components/net/cpp/src/peer.cpp @@ -48,9 +48,6 @@ using ftl::net::Universe; using ftl::net::callback_t; using std::vector; -#define TCP_SEND_BUFFER_SIZE (1024*1024*1) -#define TCP_RECEIVE_BUFFER_SIZE (1024*1024*1) - /*static std::string hexStr(const std::string &s) { const char *data = s.data(); @@ -70,7 +67,7 @@ ftl::UUID ftl::net::this_peer; //static ctpl::thread_pool pool(5); // TODO:(nick) Move to tcp_internal.cpp -static SOCKET tcpConnect(URI &uri) { +static SOCKET tcpConnect(URI &uri, int ssize, int rsize) { int rc; //sockaddr_in destAddr; @@ -93,11 +90,11 @@ static SOCKET tcpConnect(URI &uri) { int flags =1; if (setsockopt(csocket, IPPROTO_TCP, TCP_NODELAY, (const char *)&flags, sizeof(flags))) { LOG(ERROR) << "ERROR: setsocketopt(), TCP_NODELAY"; }; - int a = TCP_RECEIVE_BUFFER_SIZE; + int a = rsize; if (setsockopt(csocket, SOL_SOCKET, SO_RCVBUF, (const char *)&a, sizeof(int)) == -1) { fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno)); } - a = TCP_SEND_BUFFER_SIZE; + a = ssize; if (setsockopt(csocket, SOL_SOCKET, SO_SNDBUF, (const char *)&a, sizeof(int)) == -1) { fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno)); } @@ -185,11 +182,11 @@ Peer::Peer(SOCKET s, Universe *u, Dispatcher *d) : sock_(s), can_reconnect_(fals int flags =1; if (setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (const char *)&flags, sizeof(flags))) { LOG(ERROR) << "ERROR: setsocketopt(), TCP_NODELAY"; }; - int a = TCP_RECEIVE_BUFFER_SIZE; + int a = u->getRecvBufferSize(); if (setsockopt(s, SOL_SOCKET, SO_RCVBUF, (const char *)&a, sizeof(int)) == -1) { fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno)); } - a = TCP_SEND_BUFFER_SIZE; + a = u->getSendBufferSize(); if (setsockopt(s, SOL_SOCKET, SO_SNDBUF, (const char *)&a, sizeof(int)) == -1) { fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno)); } @@ -242,12 +239,12 @@ Peer::Peer(const char *pUri, Universe *u, Dispatcher *d) : can_reconnect_(true), scheme_ = uri.getProtocol(); if (uri.getProtocol() == URI::SCHEME_TCP) { - sock_ = tcpConnect(uri); + sock_ = tcpConnect(uri, u->getSendBufferSize(), u->getRecvBufferSize()); if (sock_ != INVALID_SOCKET) status_ = kConnecting; else status_ = kReconnecting; } else if (uri.getProtocol() == URI::SCHEME_WS) { LOG(INFO) << "Websocket connect " << uri.getPath(); - sock_ = tcpConnect(uri); + sock_ = tcpConnect(uri, u->getSendBufferSize(), u->getRecvBufferSize()); if (sock_ != INVALID_SOCKET) { if (!ws_connect(sock_, uri)) { LOG(ERROR) << "Websocket connection failed"; @@ -310,7 +307,7 @@ bool Peer::reconnect() { LOG(INFO) << "Reconnecting to " << uri_ << " ..."; if (scheme_ == URI::SCHEME_TCP) { - sock_ = tcpConnect(uri); + sock_ = tcpConnect(uri, universe_->getSendBufferSize(), universe_->getRecvBufferSize()); if (sock_ != INVALID_SOCKET) { status_ = kConnecting; is_waiting_ = true; @@ -319,7 +316,7 @@ bool Peer::reconnect() { return false; } } else if (scheme_ == URI::SCHEME_WS) { - sock_ = tcpConnect(uri); + sock_ = tcpConnect(uri, universe_->getSendBufferSize(), universe_->getRecvBufferSize()); if (sock_ != INVALID_SOCKET) { if (!ws_connect(sock_, uri)) { return false; @@ -441,7 +438,7 @@ void Peer::data() { auto buf = recv_buf_.buffer(); lk.unlock(); - #ifndef WIN32 + /*#ifndef WIN32 int n; unsigned int m = sizeof(n); getsockopt(sock_,SOL_SOCKET,SO_RCVBUF,(void *)&n, &m); @@ -449,7 +446,7 @@ void Peer::data() { int pending; ioctl(sock_, SIOCINQ, &pending); if (pending > 100000) LOG(INFO) << "Buffer usage: " << float(pending) / float(n); - #endif + #endif*/ rc = ftl::net::internal::recv(sock_, buf, cap, 0); if (rc >= cap-1) { diff --git a/components/net/cpp/src/universe.cpp b/components/net/cpp/src/universe.cpp index ee6f8911ba186b6ad43670a993f89fb16cfbb1a4..a70e42141dfe096b1cb7f71cac27bbda2ed8671d 100644 --- a/components/net/cpp/src/universe.cpp +++ b/components/net/cpp/src/universe.cpp @@ -22,16 +22,38 @@ using std::optional; using ftl::config::json_t; using ftl::net::callback_t; +#define TCP_SEND_BUFFER_SIZE (512*1024) +#define TCP_RECEIVE_BUFFER_SIZE (1024*1024*1) + callback_t ftl::net::Universe::cbid__ = 0; -Universe::Universe() : Configurable(), active_(true), this_peer(ftl::net::this_peer), phase_(0), thread_(Universe::__start, this) { +Universe::Universe() : + Configurable(), + active_(true), + this_peer(ftl::net::this_peer), + phase_(0), + send_size_(TCP_SEND_BUFFER_SIZE), + recv_size_(TCP_RECEIVE_BUFFER_SIZE), + periodic_time_(1.0), + reconnect_attempts_(50), + thread_(Universe::__start, this) { _installBindings(); } Universe::Universe(nlohmann::json &config) : - Configurable(config), active_(true), this_peer(ftl::net::this_peer), phase_(0), thread_(Universe::__start, this) { + Configurable(config), + active_(true), + this_peer(ftl::net::this_peer), + phase_(0), + send_size_(value("tcp_send_buffer",TCP_SEND_BUFFER_SIZE)), + recv_size_(value("tcp_recv_buffer",TCP_RECEIVE_BUFFER_SIZE)), + periodic_time_(value("periodics", 1.0)), + reconnect_attempts_(value("reconnect_attempts",50)), + thread_(Universe::__start, this) { _installBindings(); + + LOG(INFO) << "SEND BUFFER SIZE = " << send_size_; } Universe::~Universe() { @@ -186,7 +208,7 @@ void Universe::_cleanupPeers() { i = peers_.erase(i); if (p->status() == ftl::net::Peer::kReconnecting) { - reconnects_.push_back({50, 1.0f, p}); + reconnects_.push_back({reconnect_attempts_, 1.0f, p}); } else { //delete p; garbage_.push_back(p); @@ -250,14 +272,13 @@ void Universe::_run() { // Do periodics auto now = std::chrono::high_resolution_clock::now(); std::chrono::duration<double> elapsed = now - start; - if (elapsed.count() >= 1.0) { + if (elapsed.count() >= periodic_time_) { start = now; _periodic(); } // It is an error to use "select" with no sockets ... so just sleep if (n == 0) { - LOG(ERROR) << "NO SOCKETS"; std::this_thread::sleep_for(std::chrono::milliseconds(300)); continue; } diff --git a/components/net/cpp/test/peer_unit.cpp b/components/net/cpp/test/peer_unit.cpp index 09eb4bef9e70171b84e57f1e954e833aa9768566..c4ace71797063eef78bafee6e3784f0f5e4fa124 100644 --- a/components/net/cpp/test/peer_unit.cpp +++ b/components/net/cpp/test/peer_unit.cpp @@ -50,6 +50,9 @@ class Universe { callback_t onConnect(const std::function<void(Peer*)> &f) { return 0; } callback_t onDisconnect(const std::function<void(Peer*)> &f) { return 0; } + + size_t getSendBufferSize() const { return 10*1024; } + size_t getRecvBufferSize() const { return 10*1024; } }; } } diff --git a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp index e7c4b85819ac8b5da82d19c6cf85661d170afbe2..3ca7b8574c972e829873c492677253702399bd05 100644 --- a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp +++ b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp @@ -16,8 +16,8 @@ namespace ftl { namespace rgbd { -static const int kChunkDim = 4; -static constexpr int kChunkCount = kChunkDim * kChunkDim; +//static const int kChunkDim = 4; +//static constexpr int kChunkCount = kChunkDim * kChunkDim; namespace detail { @@ -122,6 +122,8 @@ class Streamer : public ftl::Configurable { ftl::UUID time_peer_; int64_t last_frame_; int64_t frame_no_; + size_t chunk_count_; + size_t chunk_dim_; int64_t mspf_; float actual_fps_; diff --git a/components/rgbd-sources/src/group.cpp b/components/rgbd-sources/src/group.cpp index 3b3d9afffede25362bf3d44681e06ac2e219fb45..1d700e10856192dca7eca6a2bc9544e3b887af16 100644 --- a/components/rgbd-sources/src/group.cpp +++ b/components/rgbd-sources/src/group.cpp @@ -15,7 +15,10 @@ Group::Group() : framesets_(kFrameBufferSize), head_(0) { framesets_[0].timestamp = -1; jobs_ = 0; skip_ = false; - setFPS(20); + //setFPS(20); + + mspf_ = ftl::timer::getInterval(); + setLatency(5); } @@ -188,7 +191,7 @@ void Group::sync(std::function<bool(ftl::rgbd::FrameSet &)> cb) { // The buffers are invalid after callback so mark stale fs->stale = true; } else { - DLOG(INFO) << "NO FRAME FOUND: " << last_ts_ - latency_*mspf_; + //LOG(INFO) << "NO FRAME FOUND: " << last_ts_ - latency_*mspf_; } } @@ -229,11 +232,12 @@ ftl::rgbd::FrameSet *Group::_getFrameset(int f) { } void Group::_addFrameset(int64_t timestamp) { - int count = (framesets_[head_].timestamp == -1) ? 1 : (timestamp - framesets_[head_].timestamp) / mspf_; + int count = (framesets_[head_].timestamp == -1) ? 200 : (timestamp - framesets_[head_].timestamp) / mspf_; + //LOG(INFO) << "Massive timestamp difference: " << count; // Allow for massive timestamp changes (Windows clock adjust) // Only add a single frameset for large changes - if (count < -kFrameBufferSize || count >= kFrameBufferSize-1) { + if (count < -int(kFrameBufferSize) || count >= kFrameBufferSize-1) { head_ = (head_+1) % kFrameBufferSize; if (!framesets_[head_].mtx.try_lock()) { diff --git a/components/rgbd-sources/src/net.cpp b/components/rgbd-sources/src/net.cpp index 86fa08a2b9adbddce2529e992bccc0f5307cd012..3cab6c86d5e6cf45dc1a22a0643e2e58c4f3cd73 100644 --- a/components/rgbd-sources/src/net.cpp +++ b/components/rgbd-sources/src/net.cpp @@ -147,6 +147,9 @@ NetSource::NetSource(ftl::rgbd::Source *host) default_quality_ = host->value("quality", 0); }); + chunks_dim_ = host->value("chunking",4); + chunk_count_ = chunks_dim_*chunks_dim_; + _updateURI(); h_ = host_->getNet()->onConnect([this](ftl::net::Peer *p) { @@ -182,7 +185,7 @@ void NetSource::_recvChunk(int64_t ts, int chunk, bool delta, const vector<unsig auto start = std::chrono::high_resolution_clock::now(); int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count(); - //if (now-ts < 10) LOG(INFO) << ts << " - Chunk Latency = " << (now - ts) << " - " << ftl::pool.q_size(); + //LOG(INFO) << ts << " - Chunk Latency (" << chunk_count_ << ") = " << (now - ts) << " - " << ftl::pool.q_size(); //if (now - ts > 160) { // LOG(INFO) << "OLD PACKET: " << host_->getURI() << " (" << chunk << ") - " << ts << " (" << (now - ts) << ")"; //} @@ -228,14 +231,14 @@ void NetSource::_recvChunk(int64_t ts, int chunk, bool delta, const vector<unsig ++frame.chunk_count; - if (frame.chunk_count > kChunkCount) LOG(FATAL) << "TOO MANY CHUNKS"; + if (frame.chunk_count > chunk_count_) LOG(FATAL) << "TOO MANY CHUNKS"; - if (frame.chunk_count == kChunkCount) { + if (frame.chunk_count == chunk_count_) { UNIQUE_LOCK(frame.mtx, flk); timestamp_ = frame.timestamp; - if (frame.timestamp >= 0 && frame.chunk_count == kChunkCount) { - //LOG(INFO) << "Frame finished"; + if (frame.timestamp >= 0 && frame.chunk_count == chunk_count_) { + //LOG(INFO) << "Frame finished: " << frame.timestamp; auto cb = host_->callback(); if (cb) { cb(frame.timestamp, frame.channel1, frame.channel2); @@ -308,7 +311,7 @@ void NetSource::_updateURI() { N_ = 0; // Update chunk details - chunks_dim_ = ftl::rgbd::kChunkDim; + //chunks_dim_ = ftl::rgbd::kChunkDim; chunk_width_ = params_.width / chunks_dim_; chunk_height_ = params_.height / chunks_dim_; //chunk_count_ = 0; diff --git a/components/rgbd-sources/src/net.hpp b/components/rgbd-sources/src/net.hpp index 52852320a89db79a3b8534e63cdd1a07967f9a32..23206896c844cf4a29aa844771d66347010305d6 100644 --- a/components/rgbd-sources/src/net.hpp +++ b/components/rgbd-sources/src/net.hpp @@ -79,6 +79,7 @@ class NetSource : public detail::Source { int minB_; int maxN_; int default_quality_; + int chunk_count_; ftl::rgbd::channel_t prev_chan_; //volatile int64_t current_frame_; //std::atomic<int> chunk_count_; diff --git a/components/rgbd-sources/src/stereovideo.cpp b/components/rgbd-sources/src/stereovideo.cpp index 84cf30c0013c9bc268ab715d3e23f3606aa3313a..57e57fbbd245ee9e8f4e7038d4b24b85b4259145 100644 --- a/components/rgbd-sources/src/stereovideo.cpp +++ b/components/rgbd-sources/src/stereovideo.cpp @@ -206,8 +206,6 @@ bool StereoVideoSource::compute(int n, int b) { stream_.waitForCompletion(); // TODO:(Nick) Move to getFrames } - LOG(INFO) << "STEREO VIDEO COMPUTE: " << timestamp_; - auto cb = host_->callback(); if (cb) cb(timestamp_, rgb_, depth_); return true; diff --git a/components/rgbd-sources/src/streamer.cpp b/components/rgbd-sources/src/streamer.cpp index 35339ecf0696b94248eb8aecc6394a7817330182..7a47a207f7e9d3b3f5d0617b39818adeceed2c42 100644 --- a/components/rgbd-sources/src/streamer.cpp +++ b/components/rgbd-sources/src/streamer.cpp @@ -33,11 +33,16 @@ Streamer::Streamer(nlohmann::json &config, Universe *net) net_ = net; time_peer_ = ftl::UUID(0); clock_adjust_ = 0; - mspf_ = 1000 / value("fps", 20); + mspf_ = ftl::timer::getInterval(); //1000 / value("fps", 20); //last_dropped_ = 0; //drop_count_ = 0; - group_.setFPS(value("fps", 20)); + chunk_dim_ = value("chunking",4); + chunk_count_ = chunk_dim_*chunk_dim_; + + LOG(INFO) << "CHUNK COUNT = " << chunk_count_; + + //group_.setFPS(value("fps", 20)); group_.setLatency(10); compress_level_ = value("compression", 1); @@ -105,8 +110,6 @@ Streamer::Streamer(nlohmann::json &config, Universe *net) net->bind("set_channel", [this](const string &uri, unsigned int chan) { SHARED_LOCK(mutex_,slk); - LOG(INFO) << "SET CHANNEL " << chan; - if (sources_.find(uri) != sources_.end()) { sources_[uri]->src->setChannel((ftl::rgbd::channel_t)chan); } @@ -189,7 +192,7 @@ void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID for (auto &client : s->clients[rate]) { // If already listening, just update chunk counters if (client.peerid == peer) { - client.txmax = N * kChunkCount; + client.txmax = N * chunk_count_; client.txcount = 0; return; } @@ -200,7 +203,7 @@ void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID c.peerid = peer; c.uri = dest; c.txcount = 0; - c.txmax = N * kChunkCount; + c.txmax = N * chunk_count_; ++s->clientCount; } @@ -279,7 +282,7 @@ void Streamer::_transmit(ftl::rgbd::FrameSet &fs) { totalclients += src->clientCount; // Create jobs for each chunk - for (int i=0; i<kChunkCount; ++i) { + for (int i=0; i<chunk_count_; ++i) { // Add chunk job to thread pool ftl::pool.push([this,&fs,j,i,src](int id) { int chunk = i; @@ -296,7 +299,7 @@ void Streamer::_transmit(ftl::rgbd::FrameSet &fs) { }); } - jobs_ += kChunkCount; + jobs_ += chunk_count_; } std::unique_lock<std::mutex> lk(job_mtx_); @@ -314,12 +317,12 @@ void Streamer::_encodeAndTransmit(StreamSource *src, const cv::Mat &rgb, const c bool hasChan2 = (!depth.empty() && src->src->getChannel() != ftl::rgbd::kChanNone); bool delta = (chunk+src->frame) % 8 > 0; // Do XOR or not - int chunk_width = rgb.cols / kChunkDim; - int chunk_height = rgb.rows / kChunkDim; + int chunk_width = rgb.cols / chunk_dim_; + int chunk_height = rgb.rows / chunk_dim_; // Build chunk heads - int cx = (chunk % kChunkDim) * chunk_width; - int cy = (chunk / kChunkDim) * chunk_height; + int cx = (chunk % chunk_dim_) * chunk_width; + int cy = (chunk / chunk_dim_) * chunk_height; cv::Rect roi(cx,cy,chunk_width,chunk_height); vector<unsigned char> rgb_buf; cv::Mat chunkRGB = rgb(roi); @@ -354,8 +357,8 @@ void Streamer::_encodeAndTransmit(StreamSource *src, const cv::Mat &rgb, const c // TODO:(Nick) could reuse downscales } else { cv::Mat downrgb, downdepth; - cv::resize(chunkRGB, downrgb, cv::Size(bitrate_settings[b].width / kChunkDim, bitrate_settings[b].height / kChunkDim)); - if (hasChan2) cv::resize(d2, downdepth, cv::Size(bitrate_settings[b].width / kChunkDim, bitrate_settings[b].height / kChunkDim)); + cv::resize(chunkRGB, downrgb, cv::Size(bitrate_settings[b].width / chunk_dim_, bitrate_settings[b].height / chunk_dim_)); + if (hasChan2) cv::resize(d2, downdepth, cv::Size(bitrate_settings[b].width / chunk_dim_, bitrate_settings[b].height / chunk_dim_)); _encodeChannel1(downrgb, rgb_buf, b); if (hasChan2) _encodeChannel2(downdepth, d_buf, src->src->getChannel(), b); @@ -374,7 +377,7 @@ void Streamer::_encodeAndTransmit(StreamSource *src, const cv::Mat &rgb, const c (*c).txcount = (*c).txmax; } else { ++(*c).txcount; - //LOG(INFO) << "SENT CHUNK : " << frame_no_*mspf_ << "-" << chunk; + //LOG(INFO) << "SENT CHUNK : " << frame_no_ << "-" << chunk; } } catch(...) { (*c).txcount = (*c).txmax;