Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • nicolaspope/ftl
1 result
Show changes
Commits on Source (11)
Showing
with 205 additions and 288 deletions
......@@ -121,7 +121,7 @@ void ControlWindow::_addNode() {
}
void ControlWindow::_updateDetails() {
node_details_ = ctrl_->getSlaves();
node_details_ = ctrl_->getControllers();
node_titles_.clear();
for (auto &d : node_details_) {
......
......@@ -23,32 +23,6 @@ int main(int argc, char **argv) {
}
});
std::map<ftl::UUID, std::vector<ftl::NetConfigurable*>> peerConfigurables;
// FIXME: Move this elsewhere, it is not just for GUI
net->onConnect([&controller, &peerConfigurables](ftl::net::Peer *p) {
ftl::UUID peer = p->id();
auto cs = controller->getConfigurables(peer);
for (auto c : cs) {
//LOG(INFO) << "NET CONFIG: " << c;
ftl::config::json_t *configuration = new ftl::config::json_t;
*configuration = controller->get(peer, c);
if (!configuration->empty()) {
ftl::NetConfigurable *nc = new ftl::NetConfigurable(peer, c, *controller, *configuration);
peerConfigurables[peer].push_back(nc);
}
}
});
net->onDisconnect([&peerConfigurables](ftl::net::Peer *p) {
ftl::UUID peer = p->id();
for (ftl::NetConfigurable *nc : peerConfigurables[peer]) {
ftl::config::json_t *configuration = &(nc->getConfig());
delete nc;
delete configuration;
}
});
net->start();
net->waitConnections();
......
......@@ -244,7 +244,7 @@ ftl::gui::Screen::Screen(ftl::Configurable *proot, ftl::net::Universe *pnet, ftl
//net_->onConnect([this,popup](ftl::net::Peer *p) {
{
LOG(INFO) << "NET CONNECT";
auto node_details = ctrl_->getSlaves();
auto node_details = ctrl_->getControllers();
for (auto &d : node_details) {
LOG(INFO) << "ADDING TITLE: " << d.dump();
......
......@@ -40,6 +40,21 @@ static void visualizeDepthMap( const cv::Mat &depth, cv::Mat &out,
//out.setTo(cv::Scalar(255, 255, 255), mask);
}
static std::string nameForCodec(ftl::codecs::codec_t c) {
switch(c) {
case codec_t::JPG : return "JPEG";
case codec_t::PNG : return "PNG";
case codec_t::H264 : return "H264";
case codec_t::HEVC : return "HEVC";
case codec_t::JSON : return "JSON";
case codec_t::POSE : return "POSE";
case codec_t::RAW : return "RAW";
case codec_t::CALIBRATION : return "CALIBRATION";
case codec_t::MSGPACK : return "MSGPACK";
default: return std::string("UNKNOWN (") + std::to_string((int)c) + std::string(")");
}
}
int main(int argc, char **argv) {
std::string filename(argv[1]);
LOG(INFO) << "Playing: " << filename;
......@@ -73,6 +88,11 @@ int main(int argc, char **argv) {
if (!(channel_mask[spkt.streamID][(int)spkt.channel])) {
channel_mask[spkt.streamID].set((int)spkt.channel);
LOG(INFO) << " - Channel " << (int)spkt.channel << " found (" << (int)spkt.streamID << ")";
LOG(INFO) << " - Codec = " << nameForCodec(pkt.codec);
LOG(INFO) << " - Width = " << ftl::codecs::getWidth(pkt.definition);
LOG(INFO) << " - Height = " << ftl::codecs::getHeight(pkt.definition);
LOG(INFO) << " - Start Time = " << float(spkt.timestamp - r.getStartTime()) / 1000.0f << "(s)";
LOG(INFO) << " - Blocks = " << (int)pkt.block_total;
}
if (spkt.streamID == current_stream) {
......
......@@ -12,7 +12,7 @@
#include <ftl/rgbd.hpp>
#include <ftl/rgbd/virtual.hpp>
#include <ftl/rgbd/streamer.hpp>
#include <ftl/slave.hpp>
#include <ftl/master.hpp>
#include <ftl/rgbd/group.hpp>
#include <ftl/threads.hpp>
#include <ftl/codecs/writer.hpp>
......@@ -82,58 +82,9 @@ static Eigen::Affine3d create_rotation_matrix(float ax, float ay, float az) {
return rz * rx * ry;
}
// TODO: * Remove this class (requires more general solution). Also does
// not process disconnections/reconnections/types etc. correctly.
// * Update when new options become available.
class ConfigProxy {
private:
vector<ftl::UUID> peers_;
vector<std::string> uris_;
ftl::net::Universe *net_;
public:
ConfigProxy(ftl::net::Universe *net) {
net_ = net;
auto response = net_->findAll<std::string>("node_details");
for (auto &r : response) {
auto r_json = json_t::parse(r);
peers_.push_back(ftl::UUID(r_json["id"].get<std::string>()));
uris_.push_back(r_json["title"].get<std::string>());
}
}
void add(ftl::Configurable *root, const std::string &uri, const std::string &name) {
auto config = json_t::parse(net_->call<string>(peers_[0], "get_cfg", uris_[0] + "/" + uri));
auto *proxy = ftl::create<ftl::Configurable>(root, name);
try {
for (auto &itm : config.get<json::object_t>()) {
auto key = itm.first;
auto value = itm.second;
if (*key.begin() == '$') { continue; }
proxy->set(key, value);
proxy->on(key, [this, uri, key, value, proxy](const ftl::config::Event&) {
for (size_t i = 0; i < uris_.size(); i++) {
// TODO: check that config exists?
auto peer = peers_[i];
std::string name = uris_[i] + "/" + uri + "/" + key;
net_->send(peer, "update_cfg", name, proxy->getConfig()[key].dump());
}
});
}
}
catch (nlohmann::detail::type_error) {
LOG(ERROR) << "Failed to add config proxy for: " << uri << "/" << name;
}
}
};
static void run(ftl::Configurable *root) {
Universe *net = ftl::create<Universe>(root, "net");
ftl::ctrl::Slave slave(net, root);
ftl::ctrl::Master ctrl(root, net);
// Controls
auto *controls = ftl::create<ftl::Configurable>(root, "controls");
......@@ -192,17 +143,6 @@ static void run(ftl::Configurable *root) {
return;
}
ConfigProxy *configproxy = nullptr;
if (net->numberOfPeers() > 0) {
configproxy = new ConfigProxy(net); // TODO delete
auto *disparity = ftl::create<ftl::Configurable>(root, "disparity");
configproxy->add(disparity, "source/disparity/algorithm", "algorithm");
configproxy->add(disparity, "source/disparity/bilateral_filter", "bilateral_filter");
configproxy->add(disparity, "source/disparity/optflow_filter", "optflow_filter");
configproxy->add(disparity, "source/disparity/mls", "mls");
configproxy->add(disparity, "source/disparity/cross", "cross");
}
// Must find pose for each source...
if (sources.size() > 1) {
std::map<std::string, Eigen::Matrix4d> transformations;
......@@ -383,7 +323,7 @@ static void run(ftl::Configurable *root) {
LOG(INFO) << "Shutting down...";
ftl::timer::stop();
slave.stop();
ctrl.stop();
net->shutdown();
ftl::pool.stop();
......
......@@ -11,7 +11,7 @@
#include <ftl/rgbd.hpp>
#include <ftl/rgbd/virtual.hpp>
#include <ftl/rgbd/streamer.hpp>
#include <ftl/slave.hpp>
#include <ftl/master.hpp>
#include <ftl/rgbd/group.hpp>
#include <ftl/threads.hpp>
#include <ftl/codecs/writer.hpp>
......@@ -64,7 +64,7 @@ static Eigen::Affine3d create_rotation_matrix(float ax, float ay, float az) {
static void run(ftl::Configurable *root) {
Universe *net = ftl::create<Universe>(root, "net");
ftl::ctrl::Slave slave(net, root);
ftl::ctrl::Master ctrl(root, net);
// Controls
auto *controls = ftl::create<ftl::Configurable>(root, "controls");
......@@ -202,7 +202,7 @@ static void run(ftl::Configurable *root) {
LOG(INFO) << "Shutting down...";
ftl::timer::stop();
slave.stop();
ctrl.stop();
net->shutdown();
ftl::pool.stop();
......
......@@ -22,7 +22,7 @@
//#include <ftl/display.hpp>
#include <ftl/rgbd/streamer.hpp>
#include <ftl/net/universe.hpp>
#include <ftl/slave.hpp>
#include <ftl/master.hpp>
#include <nlohmann/json.hpp>
#include "opencv2/imgproc.hpp"
......@@ -50,7 +50,7 @@ using json = nlohmann::json;
static void run(ftl::Configurable *root) {
Universe *net = ftl::create<Universe>(root, "net");
ftl::ctrl::Slave slave(net, root);
ftl::ctrl::Master ctrl(root, net);
auto paths = root->get<vector<string>>("paths");
string file = "";
......@@ -84,7 +84,7 @@ static void run(ftl::Configurable *root) {
ftl::timer::start(true);
LOG(INFO) << "Stopping...";
slave.stop();
ctrl.stop();
stream->stop();
net->shutdown();
......
......@@ -97,6 +97,10 @@ inline NALType getNALType(const std::vector<uint8_t> &data) {
return static_cast<NALType>((data[4] >> 1) & 0x3F);
}
inline bool validNAL(const std::vector<uint8_t> &data) {
return data[0] == 0 && data[1] == 0 && data[2] == 0 && data[3] == 1;
}
/**
* Check the HEVC bitstream for an I-Frame. With NvPipe, all I-Frames start
* with a VPS NAL unit so just check for this.
......
......@@ -37,6 +37,7 @@ bool NvPipeDecoder::decode(const ftl::codecs::Packet &pkt, cv::cuda::GpuMat &out
is_float_channel_ = is_float_frame;
last_definition_ = pkt.definition;
//LOG(INFO) << "DECODE OUT: " << out.rows << ", " << out.type();
//LOG(INFO) << "DECODE RESOLUTION: (" << (int)pkt.definition << ") " << ftl::codecs::getWidth(pkt.definition) << "x" << ftl::codecs::getHeight(pkt.definition);
// Build a decoder instance of the correct kind
......@@ -49,8 +50,6 @@ bool NvPipeDecoder::decode(const ftl::codecs::Packet &pkt, cv::cuda::GpuMat &out
if (!nv_decoder_) {
//LOG(INFO) << "Bitrate=" << (int)bitrate << " width=" << ABRController::getColourWidth(bitrate);
LOG(FATAL) << "Could not create decoder: " << NvPipe_GetError(NULL);
} else {
DLOG(INFO) << "Decoder created";
}
seen_iframe_ = false;
......@@ -60,38 +59,46 @@ bool NvPipeDecoder::decode(const ftl::codecs::Packet &pkt, cv::cuda::GpuMat &out
tmp_.create(cv::Size(ftl::codecs::getWidth(pkt.definition),ftl::codecs::getHeight(pkt.definition)), (is_float_frame) ? CV_16U : CV_8UC4);
// Check for an I-Frame
if (pkt.codec == ftl::codecs::codec_t::HEVC) {
if (ftl::codecs::hevc::isIFrame(pkt.data)) seen_iframe_ = true;
} else if (pkt.codec == ftl::codecs::codec_t::H264) {
if (ftl::codecs::h264::isIFrame(pkt.data)) seen_iframe_ = true;
if (!seen_iframe_) {
if (pkt.codec == ftl::codecs::codec_t::HEVC) {
if (ftl::codecs::hevc::isIFrame(pkt.data)) seen_iframe_ = true;
} else if (pkt.codec == ftl::codecs::codec_t::H264) {
if (ftl::codecs::h264::isIFrame(pkt.data)) seen_iframe_ = true;
}
}
// No I-Frame yet so don't attempt to decode P-Frames.
if (!seen_iframe_) return false;
// Final checks for validity
if (pkt.data.size() == 0 || tmp_.size() != out.size()) { // || !ftl::codecs::hevc::validNAL(pkt.data)) {
LOG(ERROR) << "Failed to decode packet";
return false;
}
int rc = NvPipe_Decode(nv_decoder_, pkt.data.data(), pkt.data.size(), tmp_.data, tmp_.cols, tmp_.rows, tmp_.step);
if (rc == 0) LOG(ERROR) << "NvPipe decode error: " << NvPipe_GetError(nv_decoder_);
if (is_float_frame) {
// Is the received frame the same size as requested output?
if (out.rows == ftl::codecs::getHeight(pkt.definition)) {
//if (out.rows == ftl::codecs::getHeight(pkt.definition)) {
tmp_.convertTo(out, CV_32FC1, 1.0f/1000.0f, stream_);
} else {
/*} else {
LOG(WARNING) << "Resizing decoded frame from " << tmp_.size() << " to " << out.size();
// FIXME: This won't work on GPU
tmp_.convertTo(tmp_, CV_32FC1, 1.0f/1000.0f, stream_);
cv::cuda::resize(tmp_, out, out.size(), 0, 0, cv::INTER_NEAREST, stream_);
}
}*/
} else {
// Is the received frame the same size as requested output?
if (out.rows == ftl::codecs::getHeight(pkt.definition)) {
//if (out.rows == ftl::codecs::getHeight(pkt.definition)) {
// Flag 0x1 means frame is in RGB so needs conversion to BGR
if (pkt.flags & 0x1) {
cv::cuda::cvtColor(tmp_, out, cv::COLOR_RGBA2BGR, 0, stream_);
} else {
cv::cuda::cvtColor(tmp_, out, cv::COLOR_BGRA2BGR, 0, stream_);
}
} else {
/*} else {
LOG(WARNING) << "Resizing decoded frame from " << tmp_.size() << " to " << out.size();
// FIXME: This won't work on GPU, plus it allocates extra memory...
// Flag 0x1 means frame is in RGB so needs conversion to BGR
......@@ -101,7 +108,7 @@ bool NvPipeDecoder::decode(const ftl::codecs::Packet &pkt, cv::cuda::GpuMat &out
cv::cuda::cvtColor(tmp_, tmp_, cv::COLOR_BGRA2BGR, 0, stream_);
}
cv::cuda::resize(tmp_, out, out.size(), 0.0, 0.0, cv::INTER_LINEAR, stream_);
}
}*/
}
stream_.waitForCompletion();
......
......@@ -123,7 +123,7 @@ bool NvPipeEncoder::encode(const cv::cuda::GpuMat &in, definition_t odefinition,
pkt.data.resize(cs);
was_reset_ = false;
if (cs == 0) {
if (cs == 0 || cs >= ftl::codecs::kVideoBufferSize) {
LOG(ERROR) << "Could not encode video frame: " << NvPipe_GetError(nvenc_);
return false;
} else {
......
......@@ -81,7 +81,8 @@ TEST_CASE( "NvPipeDecoder::decode() - A colour test image" ) {
});
}
SECTION("Full HD in, 720 out, FHD encoding") {
// No longer supported
/*SECTION("Full HD in, 720 out, FHD encoding") {
in = cv::cuda::GpuMat(cv::Size(1920,1080), CV_8UC3, cv::Scalar(255,0,0));
out = cv::cuda::GpuMat(cv::Size(1280,720), CV_8UC3, cv::Scalar(0,0,0));
......@@ -90,9 +91,10 @@ TEST_CASE( "NvPipeDecoder::decode() - A colour test image" ) {
});
REQUIRE( (out.rows == 720) );
}
}*/
SECTION("HHD in, FHD out, FHD encoding") {
// No longer supported
/*SECTION("HHD in, FHD out, FHD encoding") {
in = cv::cuda::GpuMat(cv::Size(1280,720), CV_8UC3, cv::Scalar(255,0,0));
out = cv::cuda::GpuMat(cv::Size(1920,1080), CV_8UC3, cv::Scalar(0,0,0));
......@@ -101,9 +103,10 @@ TEST_CASE( "NvPipeDecoder::decode() - A colour test image" ) {
});
REQUIRE( (out.rows == 1080) );
}
}*/
SECTION("FHD in, HHD out, SD encoding") {
// No longer supported
/*SECTION("FHD in, HHD out, SD encoding") {
in = cv::cuda::GpuMat(cv::Size(1920,1080), CV_8UC3, cv::Scalar(255,0,0));
out = cv::cuda::GpuMat(cv::Size(1280,720), CV_8UC3, cv::Scalar(0,0,0));
......@@ -112,7 +115,7 @@ TEST_CASE( "NvPipeDecoder::decode() - A colour test image" ) {
});
REQUIRE( (out.rows == 720) );
}
}*/
REQUIRE( r );
REQUIRE( (cv::cuda::sum(out) != cv::Scalar(0,0,0)) );
......
add_library(ftlctrl
src/slave.cpp
src/master.cpp
)
......
......@@ -10,8 +10,15 @@
#include <Eigen/Eigen>
namespace ftl {
class NetConfigurable;
namespace ctrl {
struct SystemState {
bool paused;
};
struct LogEvent {
int verbosity;
std::string preamble;
......@@ -43,7 +50,7 @@ class Master {
std::vector<std::string> getConfigurables(const ftl::UUID &peer);
std::vector<ftl::config::json_t> getSlaves();
std::vector<ftl::config::json_t> getControllers();
std::vector<ftl::config::json_t> get(const std::string &uri);
......@@ -51,12 +58,26 @@ class Master {
ftl::config::json_t get(const ftl::UUID &peer, const std::string &uri);
ftl::config::json_t getConfigurable(const ftl::UUID &peer, const std::string &uri);
void watch(const std::string &uri, std::function<void()> f);
Eigen::Matrix4d getPose(const std::string &uri);
void setPose(const std::string &uri, const Eigen::Matrix4d &pose);
/**
* Clean up to remove log and status forwarding over the network.
*/
void stop();
/**
* Do not call! Automatically called from logging subsystem.
*/
void sendLog(const loguru::Message& message);
bool isPaused() const { return state_.paused; }
// Events
//void onError();
......@@ -72,6 +93,12 @@ class Master {
std::vector<std::function<void(const LogEvent&)>> log_handlers_;
ftl::Configurable *root_;
ftl::net::Universe *net_;
std::map<ftl::UUID, std::vector<ftl::NetConfigurable*>> peerConfigurables_;
std::vector<ftl::UUID> log_peers_;
RECURSIVE_MUTEX mutex_;
bool in_log_;
bool active_;
SystemState state_;
};
}
......
#ifndef _FTL_CTRL_SLAVE_HPP_
#define _FTL_CTRL_SLAVE_HPP_
#include <ftl/net/universe.hpp>
#include <ftl/configurable.hpp>
#include <loguru.hpp>
#include <ftl/threads.hpp>
namespace ftl {
namespace ctrl {
struct SystemState {
bool paused;
};
/**
* Allows a node to be remote controlled and observed over the network. All
* such nodes should create a single instance of this class, but must call
* "stop()" before terminating the network.
*/
class Slave {
public:
Slave(ftl::net::Universe *, ftl::Configurable *);
~Slave();
/**
* Clean up to remove log and status forwarding over the network.
*/
void stop();
/**
* Do not call! Automatically called from logging subsystem.
*/
void sendLog(const loguru::Message& message);
bool isPaused() const { return state_.paused; }
private:
std::vector<ftl::UUID> log_peers_;
ftl::net::Universe *net_;
RECURSIVE_MUTEX mutex_;
bool in_log_;
bool active_;
SystemState state_;
};
}
}
#endif // _FTL_CTRL_SLAVE_HPP_
#include <ftl/master.hpp>
#include <ftl/net_configurable.hpp>
using ftl::ctrl::Master;
using ftl::net::Universe;
......@@ -11,6 +12,51 @@ using ftl::ctrl::LogEvent;
Master::Master(Configurable *root, Universe *net)
: root_(root), net_(net) {
// Init system state
state_.paused = false;
net->bind("restart", []() {
LOG(WARNING) << "Remote restart...";
//exit(1);
ftl::exit_code = 1;
ftl::running = false;
});
net->bind("shutdown", []() {
LOG(WARNING) << "Remote shutdown...";
//exit(0);
ftl::running = false;
});
net->bind("pause", [this]() {
state_.paused = !state_.paused;
});
net->bind("update_cfg", [](const std::string &uri, const std::string &value) {
ftl::config::update(uri, nlohmann::json::parse(value));
});
net->bind("get_cfg", [](const std::string &uri) -> std::string {
return ftl::config::resolve(uri, false).dump();
});
net->bind("get_configurable", [](const std::string &uri) -> std::string {
return ftl::config::find(uri)->getConfig().dump();
});
net->bind("list_configurables", []() {
return ftl::config::list();
});
net->bind("log_subscribe", [this](const ftl::UUID &peer) {
UNIQUE_LOCK(mutex_, lk);
log_peers_.push_back(peer);
});
net->bind("connect", [this](const std::string &url) {
net_->connect(url);
});
net->bind("log", [this](int v, const std::string &pre, const std::string &msg) {
for (auto f : log_handlers_) {
f({v,pre,msg});
......@@ -20,16 +66,35 @@ Master::Master(Configurable *root, Universe *net)
net->bind("node_details", [net,root]() -> std::vector<std::string> {
ftl::config::json_t json {
{"id", net->id().to_string()},
{"title", root->value("title", *root->get<string>("$id"))},
{"kind", "master"}
{"title", root->value("title", *root->get<string>("$id"))}
};
return {json.dump()};
});
//net->broadcast("log_subscribe", net->id());
net->onConnect([this](ftl::net::Peer*) {
net->onConnect([this](ftl::net::Peer *p) {
//net_->broadcast("log_subscribe", net_->id());
ftl::UUID peer = p->id();
auto cs = getConfigurables(peer);
for (auto c : cs) {
//LOG(INFO) << "NET CONFIG: " << c;
ftl::config::json_t *configuration = new ftl::config::json_t;
*configuration = getConfigurable(peer, c);
if (!configuration->empty()) {
ftl::NetConfigurable *nc = new ftl::NetConfigurable(peer, c, *this, *configuration);
peerConfigurables_[peer].push_back(nc);
}
}
});
net->onDisconnect([this](ftl::net::Peer *p) {
ftl::UUID peer = p->id();
for (ftl::NetConfigurable *nc : peerConfigurables_[peer]) {
ftl::config::json_t *configuration = &(nc->getConfig());
delete nc;
delete configuration;
}
});
}
......@@ -70,7 +135,7 @@ void Master::set(const ftl::UUID &peer, const string &uri, const json_t &value)
net_->send(peer, "update_cfg", uri, value.dump());
}
vector<json_t> Master::getSlaves() {
vector<json_t> Master::getControllers() {
auto response = net_->findAll<string>("node_details");
vector<json_t> result;
for (auto &r : response) {
......@@ -105,6 +170,10 @@ json_t Master::get(const ftl::UUID &peer, const string &uri) {
return json_t::parse(net_->call<string>(peer, "get_cfg", uri));
}
json_t Master::getConfigurable(const ftl::UUID &peer, const string &uri) {
return json_t::parse(net_->call<string>(peer, "get_configurable", uri));
}
void Master::watch(const string &uri, function<void()> f) {
}
......@@ -133,4 +202,34 @@ void Master::setPose(const std::string &uri, const Eigen::Matrix4d &pose) {
//void onError();
void Master::onLog(function<void(const LogEvent &)> h) {
log_handlers_.push_back(h);
}
void Master::stop() {
if (!active_) return;
active_ = false;
loguru::remove_all_callbacks();
net_->unbind("restart");
net_->unbind("shutdown");
net_->unbind("update_cfg");
net_->unbind("get_cfg");
net_->unbind("slave_details"); // TODO: Remove
net_->unbind("log_subscribe");
}
void Master::sendLog(const loguru::Message& message) {
UNIQUE_LOCK(mutex_, lk);
if (in_log_) return;
in_log_ = true;
for (auto &p : log_peers_) {
auto peer = net_->getPeer(p);
if (!peer || !peer->isConnected()) continue;
std::cout << "sending log to master..." << std::endl;
if (!net_->send(p, "log", message.verbosity, message.preamble, message.message)) {
// TODO(Nick) Remove peer from loggers list...
}
}
in_log_ = false;
}
\ No newline at end of file
#include <ftl/slave.hpp>
#include <ftl/threads.hpp>
using ftl::Configurable;
using ftl::net::Universe;
using ftl::ctrl::Slave;
using std::string;
// static void netLog(void* user_data, const loguru::Message& message) {
// Slave *slave = static_cast<Slave*>(user_data);
// slave->sendLog(message);
// }
Slave::Slave(Universe *net, ftl::Configurable *root) : net_(net), in_log_(false), active_(true) {
// Init system state
state_.paused = false;
net->bind("restart", []() {
LOG(WARNING) << "Remote restart...";
//exit(1);
ftl::exit_code = 1;
ftl::running = false;
});
net->bind("shutdown", []() {
LOG(WARNING) << "Remote shutdown...";
//exit(0);
ftl::running = false;
});
net->bind("pause", [this]() {
state_.paused = !state_.paused;
});
net->bind("update_cfg", [](const std::string &uri, const std::string &value) {
ftl::config::update(uri, nlohmann::json::parse(value));
});
net->bind("get_cfg", [](const std::string &uri) -> std::string {
return ftl::config::resolve(uri, false).dump();
});
net->bind("list_configurables", []() {
return ftl::config::list();
});
net->bind("node_details", [net,root]() -> std::vector<std::string> {
ftl::config::json_t json {
{"id", net->id().to_string()},
{"title", root->value("title", *root->get<string>("$id"))},
{"kind", "slave"}
};
return {json.dump()};
});
net->bind("log_subscribe", [this](const ftl::UUID &peer) {
UNIQUE_LOCK(mutex_, lk);
log_peers_.push_back(peer);
});
net->bind("connect", [this](const std::string &url) {
net_->connect(url);
});
//net->onConnect([this](ftl::net::Peer *peer) {
// net_->broadcast("new_peer", peer->id());
//});
//loguru::add_callback("net_log", netLog, this, loguru::Verbosity_INFO);
}
Slave::~Slave() {
stop();
}
void Slave::stop() {
if (!active_) return;
active_ = false;
loguru::remove_all_callbacks();
net_->unbind("restart");
net_->unbind("shutdown");
net_->unbind("update_cfg");
net_->unbind("get_cfg");
net_->unbind("slave_details");
net_->unbind("log_subscribe");
}
void Slave::sendLog(const loguru::Message& message) {
UNIQUE_LOCK(mutex_, lk);
if (in_log_) return;
in_log_ = true;
for (auto &p : log_peers_) {
auto peer = net_->getPeer(p);
if (!peer || !peer->isConnected()) continue;
std::cout << "sending log to master..." << std::endl;
if (!net_->send(p, "log", message.verbosity, message.preamble, message.message)) {
// TODO(Nick) Remove peer from loggers list...
}
}
in_log_ = false;
}
......@@ -12,5 +12,5 @@ void ftl::NetConfigurable::inject(const std::string &name, nlohmann::json &value
}
void ftl::NetConfigurable::refresh() {
(*config_) = ctrl.get(peer, suri);
(*config_) = ctrl.getConfigurable(peer, suri);
}
#include "catch.hpp"
#include <ftl/net_configurable.hpp>
#include <ftl/slave.hpp>
#include <ftl/master.hpp>
using ftl::NetConfigurable;
......@@ -14,20 +14,20 @@ SCENARIO( "NetConfigurable::set()" ) {
net->start();
ftl::ctrl::Master *controller = new ftl::ctrl::Master(root, net);
// Set up a slave, then call getSlaves() to get the UUID string
// Set up a slave, then call getControllers() to get the UUID string
nlohmann::json jsonSlave = {{"$id", "slave"}, {"test", {{"peers", {"tcp://localhost:7077"}}}}};
ftl::Configurable *rootSlave;
rootSlave = new ftl::Configurable(jsonSlave);
ftl::net::Universe *netSlave = ftl::config::create<ftl::net::Universe>(rootSlave, std::string("test"));
ftl::ctrl::Slave slave(netSlave, rootSlave);
ftl::ctrl::Master ctrl(rootSlave, netSlave);
netSlave->start();
netSlave->waitConnections();
net->waitConnections();
auto slaves = controller->getSlaves();
REQUIRE( slaves.size() == 1 );
auto controllers = controller->getControllers();
REQUIRE( controllers.size() == 1 );
ftl::UUID peer = ftl::UUID(slaves[0]["id"].get<std::string>());
ftl::UUID peer = ftl::UUID(controllers[0]["id"].get<std::string>());
const std::string suri = "slave_test";
nlohmann::json jsonTest = {{"$id", "slave_test"}, {"test", {{"peers", {"tcp://localhost:7077"}}}}};
NetConfigurable nc(peer, suri, *controller, jsonTest);
......
......@@ -223,7 +223,7 @@ ftl::cuda::TextureObject<T> &Frame::createTexture(ftl::codecs::Channel c, const
//LOG(INFO) << "Creating texture object";
m.tex = ftl::cuda::TextureObject<T>(m.gpu, interpolated);
} else if (m.tex.cvType() != ftl::traits::OpenCVType<T>::value || m.tex.width() != m.gpu.cols || m.tex.height() != m.gpu.rows) {
LOG(INFO) << "Recreating texture object for '" << ftl::codecs::name(c) << "'";
//LOG(INFO) << "Recreating texture object for '" << ftl::codecs::name(c) << "'";
m.tex.free();
m.tex = ftl::cuda::TextureObject<T>(m.gpu, interpolated);
}
......@@ -256,7 +256,7 @@ ftl::cuda::TextureObject<T> &Frame::createTexture(ftl::codecs::Channel c, bool i
//LOG(INFO) << "Creating texture object";
m.tex = ftl::cuda::TextureObject<T>(m.gpu, interpolated);
} else if (m.tex.cvType() != ftl::traits::OpenCVType<T>::value || m.tex.width() != m.gpu.cols || m.tex.height() != m.gpu.rows || m.tex.devicePtr() != m.gpu.data) {
LOG(INFO) << "Recreating texture object for '" << ftl::codecs::name(c) << "'.";
//LOG(INFO) << "Recreating texture object for '" << ftl::codecs::name(c) << "'.";
m.tex.free();
m.tex = ftl::cuda::TextureObject<T>(m.gpu, interpolated);
}
......
......@@ -41,7 +41,7 @@ bitrate_t ABRController::selectBitrate(const NetFrame &frame) {
float actual_mbps = (float(frame.tx_size) * 8.0f * (1000.0f / float(frame.tx_latency))) / 1048576.0f;
float min_mbps = (float(frame.tx_size) * 8.0f * (1000.0f / float(ftl::timer::getInterval()))) / 1048576.0f;
//LOG(INFO) << "Bitrate = " << actual_mbps << "Mbps, min required = " << min_mbps << "Mbps";
//if (actual_mbps < min_mbps) LOG(WARNING) << "Bitrate = " << actual_mbps << "Mbps, min required = " << min_mbps << "Mbps";
float ratio = actual_mbps / min_mbps;
//LOG(INFO) << "Rate Ratio = " << frame.tx_latency;
......