Skip to content
Snippets Groups Projects
Commit fbd12f4b authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Code tidy, remove old todos

parent 186fdcc9
No related branches found
No related tags found
No related merge requests found
Pipeline #11229 passed
#include <ftl/net.hpp>
#include <ftl/net/listener.hpp>
#include <ftl/net/peer.hpp>
#ifdef WIN32
#include <Ws2tcpip.h>
#endif
#include <vector>
#include <iostream>
#include <chrono>
using namespace std;
using namespace std::chrono;
using ftl::net::Listener;
using ftl::net::Peer;
std::vector<shared_ptr<ftl::net::Peer>> peers;
static std::vector<shared_ptr<ftl::net::Listener>> listeners;
static fd_set sfdread;
static fd_set sfderror;
/*static int freeSocket() {
int freeclient = -1;
//Find a free client slot and allocated it
for (unsigned int i=0; i<sockets.size(); i++) {
if (sockets[i] == nullptr) { // CHECK, was 0 which seems wrong
freeclient = i;
break;
}
}
//Max clients reached, so send error
if (freeclient == -1) {
if (sockets.size() < ftl::net::MAX_CONNECTIONS) {
sockets.push_back(shared_ptr<Socket>(nullptr));
freeclient = sockets.size()-1;
} else {
// exceeded max connections
return -1;
}
}
return freeclient;
}*/
int setDescriptors() {
//Reset all file descriptors
FD_ZERO(&sfdread);
FD_ZERO(&sfderror);
int n = 0;
//Set file descriptor for the listening sockets.
for (auto l : listeners) {
if (l != nullptr && l->isListening()) {
FD_SET(l->_socket(), &sfdread);
FD_SET(l->_socket(), &sfderror);
if (l->_socket() > n) n = l->_socket();
}
}
//Set the file descriptors for each client
for (auto s : peers) {
if (s != nullptr && s->isValid()) {
if (s->_socket() > n) {
n = s->_socket();
}
FD_SET(s->_socket(), &sfdread);
FD_SET(s->_socket(), &sfderror);
}
}
return n;
}
shared_ptr<Listener> ftl::net::listen(const char *uri) {
shared_ptr<Listener> l(new Listener(uri));
listeners.push_back(l);
return l;
}
shared_ptr<Peer> ftl::net::connect(const char *uri) {
shared_ptr<Peer> s(new Peer((uri == NULL) ? "" : uri));
peers.push_back(s);
return s;
}
void ftl::net::stop() {
for (auto s : peers) {
s->close();
}
peers.clear();
for (auto l : listeners) {
l->close();
}
listeners.clear();
}
bool _run(bool blocking, bool nodelay) {
timeval block;
//if (ssock == INVALID_SOCKET) return 1;
bool active = true;
bool repeat = nodelay;
while (active || repeat) {
int n = setDescriptors();
int selres = 1;
//Wait for a network event or timeout in 3 seconds
block.tv_sec = (repeat) ? 0 : 3;
block.tv_usec = 0;
selres = select(n+1, &sfdread, 0, &sfderror, &block);
repeat = false;
active = blocking;
//Some kind of error occured, it is usually possible to recover from this.
if (selres < 0) {
std::cout << "SELECT ERROR " << selres << std::endl;
//return false;
continue;
} else if (selres == 0) {
// Timeout, nothing to do...
continue;
}
//If connection request is waiting
for (auto l : listeners) {
if (l && l->isListening()) {
if (FD_ISSET(l->_socket(), &sfdread)) {
int rsize = sizeof(sockaddr_storage);
sockaddr_storage addr;
//int freeclient = freeSocket();
//if (freeclient >= 0) {
// TODO Limit connection rate or allow a pause in accepting
// TODO Send auto reject message under heavy load
//Finally accept this client connection.
int csock = accept(l->_socket(), (sockaddr*)&addr, (socklen_t*)&rsize);
if (csock != INVALID_SOCKET) {
auto sock = make_shared<Peer>(csock);
peers.push_back(sock);
// Call connection handlers
l->connection(sock);
}
//}
}
}
}
//Also check each clients socket to see if any messages or errors are waiting
for (auto s : peers) {
if (s != NULL && s->isValid()) {
//If message received from this client then deal with it
if (FD_ISSET(s->_socket(), &sfdread)) {
repeat |= s->data();
}
if (FD_ISSET(s->_socket(), &sfderror)) {
s->socketError();
}
} else if (s != NULL) {
// Erase it
for (auto i=peers.begin(); i!=peers.end(); i++) {
if ((*i) == s) {
std::cout << "REMOVING SOCKET" << std::endl;
peers.erase(i); break;
}
}
}
}
}
return true;
}
bool ftl::net::check() {
return _run(false,true);
}
bool ftl::net::wait() {
return _run(false,false);
}
void ftl::net::wait(std::function<bool(void)> f, float to) {
auto start = steady_clock::now();
while (!f() && duration<float>(steady_clock::now() - start).count() < to)
_run(false,false);
}
bool ftl::net::run(bool async) {
if (async) {
// TODO Start thread
} else {
return _run(true,false);
}
return false;
}
#include <ftl/net/p2p.hpp>
using ftl::net::P2P;
using std::optional;
using std::tuple;
using ftl::UUID;
using std::get;
using namespace std::chrono;
using std::vector;
using std::string;
P2P::P2P(const char *uri) : Protocol(uri) {
_registerRPC();
}
P2P::P2P(const string &uri) : Protocol(uri) {
_registerRPC();
}
void P2P::_registerRPC() {
bind_find_one("ping", &P2P::_ping);
}
vector<string> P2P::getAddresses(const UUID &peer) {
vector<string> results;
return results;
}
optional<long int> P2P::ping(const UUID &peer) {
long int time = duration_cast<milliseconds>(steady_clock::now().time_since_epoch()).count();
auto p = find_one<long int>("ping", peer, time);
if (!p) return {};
return *p - time;
}
optional<long int> P2P::_ping(const UUID &peer, long int time) {
if (id() == peer) {
return time;
} else {
return {};
}
}
......@@ -77,12 +77,6 @@ static SOCKET tcpConnect(URI &uri) {
return INVALID_SOCKET;
}
/*#ifdef WIN32
HOSTENT *host = gethostbyname(uri.getHost().c_str());
#else
hostent *host = gethostbyname(uri.getHost().c_str());
#endif*/
addrinfo hints = {}, *addrs;
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
......@@ -100,10 +94,6 @@ static SOCKET tcpConnect(URI &uri) {
return INVALID_SOCKET;
}
//destAddr.sin_family = AF_INET;
//destAddr.sin_addr.s_addr = ((in_addr *)(host->h_addr))->s_addr;
//destAddr.sin_port = htons(uri.getPort());
// Make nonblocking
/*long arg = fcntl(csocket, F_GETFL, NULL));
arg |= O_NONBLOCK;
......
......@@ -13,19 +13,6 @@ target_link_libraries(peer_unit
Threads::Threads
${UUID_LIBRARIES})
### P2P Base Unit ##############################################################
# TODO(nick) Actually make this a unit test
#add_executable(p2p_base_unit
# ./tests.cpp
# ./p2p_base_unit.cpp)
#add_dependencies(p2p_base_unit ftlnet)
#target_link_libraries(p2p_base_unit
# ftlnet
# ${URIPARSER_LIBRARIES}
# glog::glog
# ${UUID_LIBRARIES})
### Net Integration ############################################################
add_executable(net_integration
./tests.cpp
......
......@@ -352,79 +352,3 @@ TEST_CASE("Universe::publish()", "") {
}
}
/*TEST_CASE("net::listen()", "[net]") {
SECTION("tcp any interface") {
REQUIRE( ftl::net::listen("tcp://localhost:9001")->isListening() );
SECTION("can connect to listening socket") {
auto sock = ftl::net::connect("tcp://127.0.0.1:9001");
REQUIRE(sock->isValid());
ftl::net::wait([&sock]() { return sock->isConnected(); });
REQUIRE(sock->isConnected());
// TODO Need way of knowing about connection
}
ftl::net::stop();
}
SECTION("on connection event") {
auto l = ftl::net::listen("tcp://localhost:9002");
REQUIRE( l->isListening() );
bool connected = false;
l->onConnection([&](shared_ptr<Socket> s) {
ftl::net::wait([&s]() { return s->isConnected(); });
REQUIRE( s->isConnected() );
connected = true;
});
auto sock = ftl::net::connect("tcp://127.0.0.1:9002");
ftl::net::wait();
REQUIRE( connected );
ftl::net::stop();
}
}
TEST_CASE("Net Integration", "[integrate]") {
std::string data;
Protocol p("ftl://utu.fi");
p.bind("add", [](int a, int b) {
return a + b;
});
p.bind(100, [&data](uint32_t m, Socket &s) {
s.read(data);
});
auto l = ftl::net::listen("tcp://localhost:9000");
REQUIRE( l->isListening() );
l->setProtocol(&p);
shared_ptr<Socket> s1;
l->onConnection([&s1](auto &s) { s1 = s; });
shared_ptr<Socket> s2 = ftl::net::connect("tcp://localhost:9000");
s2->setProtocol(&p);
REQUIRE( s2 != nullptr );
ftl::net::wait([&s2]() { return s2->isConnected(); });
REQUIRE( s1 != nullptr );
REQUIRE( s1->isConnected() );
REQUIRE( s2->isConnected() );
REQUIRE( s1->call<int>("add", 5, 6) == 11 );
REQUIRE( s2->call<int>("add", 10, 5) == 15);
s1->send(100, "hello world");
ftl::net::wait();
// TODO s2->wait(100);
REQUIRE( data == "hello world" );
}*/
......@@ -74,7 +74,6 @@ void Display::init() {
onKey([this](int key) {
//LOG(INFO) << "Key = " << key;
if (key == 81 || key == 83) {
// TODO Should rotate around lookAt object, but requires correct depth
Eigen::Quaternion<float> q; q = Eigen::AngleAxis<float>((key == 81) ? 0.01f : -0.01f, up_);
eye_ = (q * (eye_ - centre_)) + centre_;
} else if (key == 84 || key == 82) {
......
......@@ -260,7 +260,6 @@ void Streamer::_schedule() {
});
}
// TODO Wait until all jobs completed...
unique_lock<mutex> lk(job_mtx);
job_cv.wait(lk, [&jobs]{ return jobs == 0; });
}
......
......@@ -56,12 +56,11 @@ StereoVideoSource::StereoVideoSource(nlohmann::json &config, const string &file)
// Generate camera parameters from Q matrix
cv::Mat q = calib_->getCameraMatrix();
params_ = {
// TODO(Nick) Add fx and fy
q.at<double>(0,0), // Fx
q.at<double>(1,1), // Fy
-q.at<double>(0,2), // Cx
-q.at<double>(1,2), // Cy
(unsigned int)lsrc_->width(), // TODO (Nick)
(unsigned int)lsrc_->width(),
(unsigned int)lsrc_->height(),
0.0f, // 0m min
15.0f // 15m max
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment