Skip to content
Snippets Groups Projects
universe.cpp 6.6 KiB
Newer Older
#include <ftl/net/universe.hpp>
#include <chrono>
#ifdef WIN32
#include <Ws2tcpip.h>
#pragma comment(lib, "Rpcrt4.lib")
#endif

Nicolas Pope's avatar
Nicolas Pope committed
#ifndef WIN32
#include <signal.h>
#endif

using std::string;
using std::vector;
using std::thread;
using ftl::net::Peer;
using ftl::net::Listener;
using ftl::net::Universe;
Nicolas Pope's avatar
Nicolas Pope committed
using nlohmann::json;
using ftl::UUID;
using std::optional;
using std::unique_lock;
using std::mutex;
Nicolas Pope's avatar
Nicolas Pope committed
Universe::Universe() : Configurable(), active_(true), this_peer(ftl::net::this_peer), thread_(Universe::__start, this) {
	_installBindings();
}
Nicolas Pope's avatar
Nicolas Pope committed

Universe::Universe(nlohmann::json &config) :
Nicolas Pope's avatar
Nicolas Pope committed
		Configurable(config), active_(true), this_peer(ftl::net::this_peer), thread_(Universe::__start, this) {
Nicolas Pope's avatar
Nicolas Pope committed
	if (config["listen"].is_array()) {
		for (auto &l : config["listen"]) {
			listen(l);
		}
	} else if (config["listen"].is_string()) {
		listen(config["listen"]);
	}
	
	if (config["peers"].is_array()) {
		for (auto &p : config["peers"]) {
			connect(p);
		}
	}
	
	_installBindings();
}

Universe::~Universe() {
	active_ = false;
	thread_.join();
	
	for (auto s : peers_) {
		s->close();
	}
	
	peers_.clear();
	
	for (auto l : listeners_) {
		l->close();
	}
	
	listeners_.clear();
}

bool Universe::listen(const string &addr) {
	auto l = new Listener(addr.c_str());
	if (!l) return false;
	unique_lock<mutex> lk(net_mutex_);
	listeners_.push_back(l);
	return l->isListening();
}

Peer *Universe::connect(const string &addr) {
	auto p = new Peer(addr.c_str(), &disp_);
	if (!p) return nullptr;
	
	if (p->status() != Peer::kInvalid) {
		unique_lock<mutex> lk(net_mutex_);
		peers_.push_back(p);
	}
	
	_installBindings(p);
	
	p->onConnect([this](Peer &p) {
		peer_ids_[p.id()] = &p;
	});
	
	return p;
int Universe::waitConnections() {
	int count = 0;
	for (auto p : peers_) {
		if (p->waitConnection()) count++;
	}
	return count;
}

int Universe::_setDescriptors() {
	//Reset all file descriptors
	FD_ZERO(&sfdread_);
	FD_ZERO(&sfderror_);

	int n = 0;

	unique_lock<mutex> lk(net_mutex_);

	//Set file descriptor for the listening sockets.
	for (auto l : listeners_) {
		if (l != nullptr && l->isListening()) {
			FD_SET(l->_socket(), &sfdread_);
			FD_SET(l->_socket(), &sfderror_);
			if (l->_socket() > n) n = l->_socket();
		}
	}

	//Set the file descriptors for each client
	for (auto s : peers_) {
		if (s != nullptr && s->isValid()) {
			
			if (s->_socket() > n) {
				n = s->_socket();
			}

			if (s->isWaiting()) {
				FD_SET(s->_socket(), &sfdread_);
			}
			FD_SET(s->_socket(), &sfderror_);
Nicolas Pope's avatar
Nicolas Pope committed
		} else if (s) {
			_remove(s);
		}
	}

	return n;
}

void Universe::_installBindings(Peer *p) {

}

void Universe::_installBindings() {
	bind("__subscribe__", [this](const UUID &id, const string &uri) -> bool {
		LOG(INFO) << "Subscription to " << uri << " by " << id.to_string();
Nicolas Pope's avatar
Nicolas Pope committed
		unique_lock<mutex> lk(net_mutex_);
		subscribers_[ftl::URI(uri).to_string()].push_back(id);
	bind("__owner__", [this](const std::string &res) -> optional<UUID> {
		if (owned_.count(res) > 0) return this_peer;
Nicolas Pope's avatar
Nicolas Pope committed
// Note: should be called inside a net lock
void Universe::_remove(Peer *p) {
	LOG(INFO) << "Removing disconnected peer: " << p->id().to_string();
	for (auto i=peers_.begin(); i!=peers_.end(); i++) {
		if ((*i) == p) {
			peers_.erase(i); break;
		}
	}

	auto ix = peer_ids_.find(p->id());
	if (ix != peer_ids_.end()) peer_ids_.erase(ix);
	delete p;
}

Peer *Universe::getPeer(const UUID &id) const {
	auto ix = peer_ids_.find(id);
	if (ix == peer_ids_.end()) return nullptr;
	else return ix->second;
}

optional<UUID> Universe::findOwner(const string &res) {
	// TODO(nick) cache this information
	return findOne<UUID>("__owner__", res);
}

bool Universe::createResource(const std::string &uri) {
	owned_.insert(uri);
// TODO (nick) Add URI version and correctly parse URI query parameters
int Universe::numberOfSubscribers(const std::string &res) const {
	auto s = subscribers_.find(res);
	if (s != subscribers_.end()) {
		return s->second.size();
	} else {
		return -1;
	}
}

bool Universe::hasSubscribers(const std::string &res) const {
	// FIXME (nick) Need to parse URI and correct query order
	return numberOfSubscribers(res) > 0;
}

bool Universe::hasSubscribers(const ftl::URI &res) const {
	return numberOfSubscribers(res.to_string()) > 0;
}

bool Universe::_subscribe(const std::string &res) {
	// Need to find who owns the resource
		return call<bool>(*pid, "__subscribe__", this_peer, res);
	} else {
		// No resource found
		LOG(WARNING) << "Subscribe to unknown resource: " << res;
		return false;
	}
}

void Universe::__start(Universe * u) {
Nicolas Pope's avatar
Nicolas Pope committed
#ifndef WIN32
	signal(SIGPIPE,SIG_IGN);
#endif  // WIN32
	u->_run();
}

void Universe::_run() {
	timeval block;

#ifdef WIN32
	WSAData wsaData;
	//If Win32 then load winsock
	if (WSAStartup(MAKEWORD(1, 1), &wsaData) != 0) {
		LOG(ERROR) << "Could not initiate sockets";
		return;
	}
#endif

	while (active_) {
		int n = _setDescriptors();
		int selres = 1;

		if (n == 0) {
			std::this_thread::sleep_for(std::chrono::milliseconds(300));
			continue;
		}

		//Wait for a network event or timeout in 3 seconds
		block.tv_sec = 0;
		block.tv_usec = 10000;
		selres = select(n+1, &sfdread_, 0, &sfderror_, &block);

		//Some kind of error occured, it is usually possible to recover from this.
		if (selres < 0) {
Nicolas Pope's avatar
Nicolas Pope committed
			switch (errno) {
			case 9	: continue;  // Bad file descriptor = socket closed
			default	: std::cout << "Unknown select error: " << strerror(errno) << std::endl;
			}
			continue;
		} else if (selres == 0) {
			// Timeout, nothing to do...
			continue;
		}

		unique_lock<mutex> lk(net_mutex_);

		//If connection request is waiting
		for (auto l : listeners_) {
			if (l && l->isListening()) {
				if (FD_ISSET(l->_socket(), &sfdread_)) {
					int rsize = sizeof(sockaddr_storage);
					sockaddr_storage addr;

Nicolas Pope's avatar
Nicolas Pope committed
					//Finally accept this client connection.
					int csock = accept(l->_socket(), (sockaddr*)&addr, (socklen_t*)&rsize);
Nicolas Pope's avatar
Nicolas Pope committed
					if (csock != INVALID_SOCKET) {
						auto p = new Peer(csock, &disp_);
						peers_.push_back(p);
						_installBindings(p);
						p->onConnect([this](Peer &p) {
							peer_ids_[p.id()] = &p;
						});
					}
				}
			}
		}

		//Also check each clients socket to see if any messages or errors are waiting
		for (auto s : peers_) {
			if (s != NULL && s->isValid()) {
				//If message received from this client then deal with it
				if (FD_ISSET(s->_socket(), &sfdread_)) {
Nicolas Pope's avatar
Nicolas Pope committed
					s->data();
				}
				if (FD_ISSET(s->_socket(), &sfderror_)) {
					s->socketError();
Nicolas Pope's avatar
Nicolas Pope committed
					s->close();
				}
			} else if (s != NULL) {
				// Erase it
Nicolas Pope's avatar
Nicolas Pope committed
				_remove(s);