Skip to content
Snippets Groups Projects
Commit 576fa0e5 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Merge branch 'feature/156/configopt' into 'master'

Implements #156 expose config options

Closes #156

See merge request nicolas.pope/ftl!95
parents 8a0b7a60 6b701a10
No related branches found
No related tags found
1 merge request!95Implements #156 expose config options
Pipeline #12944 failed
Showing
with 110 additions and 48 deletions
......@@ -60,6 +60,8 @@ struct TimerHandle {
*/
void setInterval(int ms);
int getInterval();
/**
* Add the specified number of milliseconds to the clock when generating
* timestamps. This is used to synchronise clocks on multiple machines as it
......
......@@ -19,6 +19,7 @@
#include <ftl/configurable.hpp>
#include <ftl/uri.hpp>
#include <ftl/threads.hpp>
#include <ftl/timer.hpp>
#include <fstream>
#include <string>
......@@ -515,6 +516,13 @@ Configurable *ftl::config::configure(int argc, char **argv, const std::string &r
}
});
// Some global settings
ftl::timer::setInterval(rootcfg->value("fps",20));
int pool_size = rootcfg->value("thread_pool_factor", 2.0f)*std::thread::hardware_concurrency();
if (pool_size != ftl::pool.size()) ftl::pool.resize(pool_size);
//LOG(INFO) << "CONFIG: " << config["vision_default"];
//CHECK_EQ( &config, config_index["ftl://utu.fi"] );
......
......@@ -77,6 +77,10 @@ void ftl::timer::setInterval(int ms) {
mspf = ms;
}
int ftl::timer::getInterval() {
return mspf;
}
void ftl::timer::setClockAdjustment(int64_t ms) {
clock_adjust = ms;
}
......
......@@ -6,6 +6,12 @@
using ftl::Configurable;
using std::string;
namespace ftl {
namespace timer {
void setInterval(int i) {}
}
}
SCENARIO( "Configurable::get()" ) {
GIVEN( "a non-existent property" ) {
// cppcheck-suppress constStatement
......
......@@ -189,6 +189,9 @@ class Universe : public ftl::Configurable {
void removeCallback(ftl::net::callback_t cbid);
size_t getSendBufferSize() const { return send_size_; }
size_t getRecvBufferSize() const { return recv_size_; }
private:
void _run();
int _setDescriptors();
......@@ -220,6 +223,13 @@ class Universe : public ftl::Configurable {
std::list<ReconnectInfo> reconnects_;
size_t phase_;
std::list<ftl::net::Peer*> garbage_;
size_t send_size_;
size_t recv_size_;
double periodic_time_;
int reconnect_attempts_;
// NOTE: Must always be last member
std::thread thread_;
struct ConnHandler {
......
......@@ -48,9 +48,6 @@ using ftl::net::Universe;
using ftl::net::callback_t;
using std::vector;
#define TCP_SEND_BUFFER_SIZE (1024*1024*1)
#define TCP_RECEIVE_BUFFER_SIZE (1024*1024*1)
/*static std::string hexStr(const std::string &s)
{
const char *data = s.data();
......@@ -70,7 +67,7 @@ ftl::UUID ftl::net::this_peer;
//static ctpl::thread_pool pool(5);
// TODO:(nick) Move to tcp_internal.cpp
static SOCKET tcpConnect(URI &uri) {
static SOCKET tcpConnect(URI &uri, int ssize, int rsize) {
int rc;
//sockaddr_in destAddr;
......@@ -93,11 +90,11 @@ static SOCKET tcpConnect(URI &uri) {
int flags =1;
if (setsockopt(csocket, IPPROTO_TCP, TCP_NODELAY, (const char *)&flags, sizeof(flags))) { LOG(ERROR) << "ERROR: setsocketopt(), TCP_NODELAY"; };
int a = TCP_RECEIVE_BUFFER_SIZE;
int a = rsize;
if (setsockopt(csocket, SOL_SOCKET, SO_RCVBUF, (const char *)&a, sizeof(int)) == -1) {
fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno));
}
a = TCP_SEND_BUFFER_SIZE;
a = ssize;
if (setsockopt(csocket, SOL_SOCKET, SO_SNDBUF, (const char *)&a, sizeof(int)) == -1) {
fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno));
}
......@@ -185,11 +182,11 @@ Peer::Peer(SOCKET s, Universe *u, Dispatcher *d) : sock_(s), can_reconnect_(fals
int flags =1;
if (setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (const char *)&flags, sizeof(flags))) { LOG(ERROR) << "ERROR: setsocketopt(), TCP_NODELAY"; };
int a = TCP_RECEIVE_BUFFER_SIZE;
int a = u->getRecvBufferSize();
if (setsockopt(s, SOL_SOCKET, SO_RCVBUF, (const char *)&a, sizeof(int)) == -1) {
fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno));
}
a = TCP_SEND_BUFFER_SIZE;
a = u->getSendBufferSize();
if (setsockopt(s, SOL_SOCKET, SO_SNDBUF, (const char *)&a, sizeof(int)) == -1) {
fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno));
}
......@@ -242,12 +239,12 @@ Peer::Peer(const char *pUri, Universe *u, Dispatcher *d) : can_reconnect_(true),
scheme_ = uri.getProtocol();
if (uri.getProtocol() == URI::SCHEME_TCP) {
sock_ = tcpConnect(uri);
sock_ = tcpConnect(uri, u->getSendBufferSize(), u->getRecvBufferSize());
if (sock_ != INVALID_SOCKET) status_ = kConnecting;
else status_ = kReconnecting;
} else if (uri.getProtocol() == URI::SCHEME_WS) {
LOG(INFO) << "Websocket connect " << uri.getPath();
sock_ = tcpConnect(uri);
sock_ = tcpConnect(uri, u->getSendBufferSize(), u->getRecvBufferSize());
if (sock_ != INVALID_SOCKET) {
if (!ws_connect(sock_, uri)) {
LOG(ERROR) << "Websocket connection failed";
......@@ -310,7 +307,7 @@ bool Peer::reconnect() {
LOG(INFO) << "Reconnecting to " << uri_ << " ...";
if (scheme_ == URI::SCHEME_TCP) {
sock_ = tcpConnect(uri);
sock_ = tcpConnect(uri, universe_->getSendBufferSize(), universe_->getRecvBufferSize());
if (sock_ != INVALID_SOCKET) {
status_ = kConnecting;
is_waiting_ = true;
......@@ -319,7 +316,7 @@ bool Peer::reconnect() {
return false;
}
} else if (scheme_ == URI::SCHEME_WS) {
sock_ = tcpConnect(uri);
sock_ = tcpConnect(uri, universe_->getSendBufferSize(), universe_->getRecvBufferSize());
if (sock_ != INVALID_SOCKET) {
if (!ws_connect(sock_, uri)) {
return false;
......@@ -441,7 +438,7 @@ void Peer::data() {
auto buf = recv_buf_.buffer();
lk.unlock();
#ifndef WIN32
/*#ifndef WIN32
int n;
unsigned int m = sizeof(n);
getsockopt(sock_,SOL_SOCKET,SO_RCVBUF,(void *)&n, &m);
......@@ -449,7 +446,7 @@ void Peer::data() {
int pending;
ioctl(sock_, SIOCINQ, &pending);
if (pending > 100000) LOG(INFO) << "Buffer usage: " << float(pending) / float(n);
#endif
#endif*/
rc = ftl::net::internal::recv(sock_, buf, cap, 0);
if (rc >= cap-1) {
......
......@@ -22,16 +22,38 @@ using std::optional;
using ftl::config::json_t;
using ftl::net::callback_t;
#define TCP_SEND_BUFFER_SIZE (512*1024)
#define TCP_RECEIVE_BUFFER_SIZE (1024*1024*1)
callback_t ftl::net::Universe::cbid__ = 0;
Universe::Universe() : Configurable(), active_(true), this_peer(ftl::net::this_peer), phase_(0), thread_(Universe::__start, this) {
Universe::Universe() :
Configurable(),
active_(true),
this_peer(ftl::net::this_peer),
phase_(0),
send_size_(TCP_SEND_BUFFER_SIZE),
recv_size_(TCP_RECEIVE_BUFFER_SIZE),
periodic_time_(1.0),
reconnect_attempts_(50),
thread_(Universe::__start, this) {
_installBindings();
}
Universe::Universe(nlohmann::json &config) :
Configurable(config), active_(true), this_peer(ftl::net::this_peer), phase_(0), thread_(Universe::__start, this) {
Configurable(config),
active_(true),
this_peer(ftl::net::this_peer),
phase_(0),
send_size_(value("tcp_send_buffer",TCP_SEND_BUFFER_SIZE)),
recv_size_(value("tcp_recv_buffer",TCP_RECEIVE_BUFFER_SIZE)),
periodic_time_(value("periodics", 1.0)),
reconnect_attempts_(value("reconnect_attempts",50)),
thread_(Universe::__start, this) {
_installBindings();
LOG(INFO) << "SEND BUFFER SIZE = " << send_size_;
}
Universe::~Universe() {
......@@ -186,7 +208,7 @@ void Universe::_cleanupPeers() {
i = peers_.erase(i);
if (p->status() == ftl::net::Peer::kReconnecting) {
reconnects_.push_back({50, 1.0f, p});
reconnects_.push_back({reconnect_attempts_, 1.0f, p});
} else {
//delete p;
garbage_.push_back(p);
......@@ -250,14 +272,13 @@ void Universe::_run() {
// Do periodics
auto now = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed = now - start;
if (elapsed.count() >= 1.0) {
if (elapsed.count() >= periodic_time_) {
start = now;
_periodic();
}
// It is an error to use "select" with no sockets ... so just sleep
if (n == 0) {
LOG(ERROR) << "NO SOCKETS";
std::this_thread::sleep_for(std::chrono::milliseconds(300));
continue;
}
......
......@@ -50,6 +50,9 @@ class Universe {
callback_t onConnect(const std::function<void(Peer*)> &f) { return 0; }
callback_t onDisconnect(const std::function<void(Peer*)> &f) { return 0; }
size_t getSendBufferSize() const { return 10*1024; }
size_t getRecvBufferSize() const { return 10*1024; }
};
}
}
......
......@@ -16,8 +16,8 @@
namespace ftl {
namespace rgbd {
static const int kChunkDim = 4;
static constexpr int kChunkCount = kChunkDim * kChunkDim;
//static const int kChunkDim = 4;
//static constexpr int kChunkCount = kChunkDim * kChunkDim;
namespace detail {
......@@ -122,6 +122,8 @@ class Streamer : public ftl::Configurable {
ftl::UUID time_peer_;
int64_t last_frame_;
int64_t frame_no_;
size_t chunk_count_;
size_t chunk_dim_;
int64_t mspf_;
float actual_fps_;
......
......@@ -15,7 +15,10 @@ Group::Group() : framesets_(kFrameBufferSize), head_(0) {
framesets_[0].timestamp = -1;
jobs_ = 0;
skip_ = false;
setFPS(20);
//setFPS(20);
mspf_ = ftl::timer::getInterval();
setLatency(5);
}
......@@ -188,7 +191,7 @@ void Group::sync(std::function<bool(ftl::rgbd::FrameSet &)> cb) {
// The buffers are invalid after callback so mark stale
fs->stale = true;
} else {
DLOG(INFO) << "NO FRAME FOUND: " << last_ts_ - latency_*mspf_;
//LOG(INFO) << "NO FRAME FOUND: " << last_ts_ - latency_*mspf_;
}
}
......@@ -229,11 +232,12 @@ ftl::rgbd::FrameSet *Group::_getFrameset(int f) {
}
void Group::_addFrameset(int64_t timestamp) {
int count = (framesets_[head_].timestamp == -1) ? 1 : (timestamp - framesets_[head_].timestamp) / mspf_;
int count = (framesets_[head_].timestamp == -1) ? 200 : (timestamp - framesets_[head_].timestamp) / mspf_;
//LOG(INFO) << "Massive timestamp difference: " << count;
// Allow for massive timestamp changes (Windows clock adjust)
// Only add a single frameset for large changes
if (count < -kFrameBufferSize || count >= kFrameBufferSize-1) {
if (count < -int(kFrameBufferSize) || count >= kFrameBufferSize-1) {
head_ = (head_+1) % kFrameBufferSize;
if (!framesets_[head_].mtx.try_lock()) {
......
......@@ -147,6 +147,9 @@ NetSource::NetSource(ftl::rgbd::Source *host)
default_quality_ = host->value("quality", 0);
});
chunks_dim_ = host->value("chunking",4);
chunk_count_ = chunks_dim_*chunks_dim_;
_updateURI();
h_ = host_->getNet()->onConnect([this](ftl::net::Peer *p) {
......@@ -182,7 +185,7 @@ void NetSource::_recvChunk(int64_t ts, int chunk, bool delta, const vector<unsig
auto start = std::chrono::high_resolution_clock::now();
int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count();
//if (now-ts < 10) LOG(INFO) << ts << " - Chunk Latency = " << (now - ts) << " - " << ftl::pool.q_size();
//LOG(INFO) << ts << " - Chunk Latency (" << chunk_count_ << ") = " << (now - ts) << " - " << ftl::pool.q_size();
//if (now - ts > 160) {
// LOG(INFO) << "OLD PACKET: " << host_->getURI() << " (" << chunk << ") - " << ts << " (" << (now - ts) << ")";
//}
......@@ -228,14 +231,14 @@ void NetSource::_recvChunk(int64_t ts, int chunk, bool delta, const vector<unsig
++frame.chunk_count;
if (frame.chunk_count > kChunkCount) LOG(FATAL) << "TOO MANY CHUNKS";
if (frame.chunk_count > chunk_count_) LOG(FATAL) << "TOO MANY CHUNKS";
if (frame.chunk_count == kChunkCount) {
if (frame.chunk_count == chunk_count_) {
UNIQUE_LOCK(frame.mtx, flk);
timestamp_ = frame.timestamp;
if (frame.timestamp >= 0 && frame.chunk_count == kChunkCount) {
//LOG(INFO) << "Frame finished";
if (frame.timestamp >= 0 && frame.chunk_count == chunk_count_) {
//LOG(INFO) << "Frame finished: " << frame.timestamp;
auto cb = host_->callback();
if (cb) {
cb(frame.timestamp, frame.channel1, frame.channel2);
......@@ -308,7 +311,7 @@ void NetSource::_updateURI() {
N_ = 0;
// Update chunk details
chunks_dim_ = ftl::rgbd::kChunkDim;
//chunks_dim_ = ftl::rgbd::kChunkDim;
chunk_width_ = params_.width / chunks_dim_;
chunk_height_ = params_.height / chunks_dim_;
//chunk_count_ = 0;
......
......@@ -79,6 +79,7 @@ class NetSource : public detail::Source {
int minB_;
int maxN_;
int default_quality_;
int chunk_count_;
ftl::rgbd::channel_t prev_chan_;
//volatile int64_t current_frame_;
//std::atomic<int> chunk_count_;
......
......@@ -206,8 +206,6 @@ bool StereoVideoSource::compute(int n, int b) {
stream_.waitForCompletion(); // TODO:(Nick) Move to getFrames
}
LOG(INFO) << "STEREO VIDEO COMPUTE: " << timestamp_;
auto cb = host_->callback();
if (cb) cb(timestamp_, rgb_, depth_);
return true;
......
......@@ -33,11 +33,16 @@ Streamer::Streamer(nlohmann::json &config, Universe *net)
net_ = net;
time_peer_ = ftl::UUID(0);
clock_adjust_ = 0;
mspf_ = 1000 / value("fps", 20);
mspf_ = ftl::timer::getInterval(); //1000 / value("fps", 20);
//last_dropped_ = 0;
//drop_count_ = 0;
group_.setFPS(value("fps", 20));
chunk_dim_ = value("chunking",4);
chunk_count_ = chunk_dim_*chunk_dim_;
LOG(INFO) << "CHUNK COUNT = " << chunk_count_;
//group_.setFPS(value("fps", 20));
group_.setLatency(10);
compress_level_ = value("compression", 1);
......@@ -105,8 +110,6 @@ Streamer::Streamer(nlohmann::json &config, Universe *net)
net->bind("set_channel", [this](const string &uri, unsigned int chan) {
SHARED_LOCK(mutex_,slk);
LOG(INFO) << "SET CHANNEL " << chan;
if (sources_.find(uri) != sources_.end()) {
sources_[uri]->src->setChannel((ftl::rgbd::channel_t)chan);
}
......@@ -189,7 +192,7 @@ void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID
for (auto &client : s->clients[rate]) {
// If already listening, just update chunk counters
if (client.peerid == peer) {
client.txmax = N * kChunkCount;
client.txmax = N * chunk_count_;
client.txcount = 0;
return;
}
......@@ -200,7 +203,7 @@ void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID
c.peerid = peer;
c.uri = dest;
c.txcount = 0;
c.txmax = N * kChunkCount;
c.txmax = N * chunk_count_;
++s->clientCount;
}
......@@ -279,7 +282,7 @@ void Streamer::_transmit(ftl::rgbd::FrameSet &fs) {
totalclients += src->clientCount;
// Create jobs for each chunk
for (int i=0; i<kChunkCount; ++i) {
for (int i=0; i<chunk_count_; ++i) {
// Add chunk job to thread pool
ftl::pool.push([this,&fs,j,i,src](int id) {
int chunk = i;
......@@ -296,7 +299,7 @@ void Streamer::_transmit(ftl::rgbd::FrameSet &fs) {
});
}
jobs_ += kChunkCount;
jobs_ += chunk_count_;
}
std::unique_lock<std::mutex> lk(job_mtx_);
......@@ -314,12 +317,12 @@ void Streamer::_encodeAndTransmit(StreamSource *src, const cv::Mat &rgb, const c
bool hasChan2 = (!depth.empty() && src->src->getChannel() != ftl::rgbd::kChanNone);
bool delta = (chunk+src->frame) % 8 > 0; // Do XOR or not
int chunk_width = rgb.cols / kChunkDim;
int chunk_height = rgb.rows / kChunkDim;
int chunk_width = rgb.cols / chunk_dim_;
int chunk_height = rgb.rows / chunk_dim_;
// Build chunk heads
int cx = (chunk % kChunkDim) * chunk_width;
int cy = (chunk / kChunkDim) * chunk_height;
int cx = (chunk % chunk_dim_) * chunk_width;
int cy = (chunk / chunk_dim_) * chunk_height;
cv::Rect roi(cx,cy,chunk_width,chunk_height);
vector<unsigned char> rgb_buf;
cv::Mat chunkRGB = rgb(roi);
......@@ -354,8 +357,8 @@ void Streamer::_encodeAndTransmit(StreamSource *src, const cv::Mat &rgb, const c
// TODO:(Nick) could reuse downscales
} else {
cv::Mat downrgb, downdepth;
cv::resize(chunkRGB, downrgb, cv::Size(bitrate_settings[b].width / kChunkDim, bitrate_settings[b].height / kChunkDim));
if (hasChan2) cv::resize(d2, downdepth, cv::Size(bitrate_settings[b].width / kChunkDim, bitrate_settings[b].height / kChunkDim));
cv::resize(chunkRGB, downrgb, cv::Size(bitrate_settings[b].width / chunk_dim_, bitrate_settings[b].height / chunk_dim_));
if (hasChan2) cv::resize(d2, downdepth, cv::Size(bitrate_settings[b].width / chunk_dim_, bitrate_settings[b].height / chunk_dim_));
_encodeChannel1(downrgb, rgb_buf, b);
if (hasChan2) _encodeChannel2(downdepth, d_buf, src->src->getChannel(), b);
......@@ -374,7 +377,7 @@ void Streamer::_encodeAndTransmit(StreamSource *src, const cv::Mat &rgb, const c
(*c).txcount = (*c).txmax;
} else {
++(*c).txcount;
//LOG(INFO) << "SENT CHUNK : " << frame_no_*mspf_ << "-" << chunk;
//LOG(INFO) << "SENT CHUNK : " << frame_no_ << "-" << chunk;
}
} catch(...) {
(*c).txcount = (*c).txmax;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment