/**
 * @file peer.cpp
 * @copyright Copyright (c) 2020 University of Turku, MIT License
 * @author Nicolas Pope
 */

#include <ftl/lib/loguru.hpp>
#include <ftl/lib/ctpl_stl.hpp>

#include "common.hpp"

#include <ftl/uri.hpp>
#include <ftl/time.hpp>
#include "peer.hpp"
//#include <ftl/config.h>

#include "protocol/connection.hpp"

using ftl::net::internal::SocketConnection;

#include "universe.hpp"

#include <iostream>
#include <memory>
#include <algorithm>
#include <tuple>
#include <chrono>
#include <vector>

using std::tuple;
using std::get;
using ftl::net::Peer;
using ftl::net::PeerPtr;
using ftl::URI;
using ftl::net::Dispatcher;
using std::chrono::seconds;
using ftl::net::Universe;
using ftl::net::Callback;
using std::vector;
using ftl::protocol::NodeStatus;
using ftl::protocol::NodeType;

std::atomic_int Peer::rpcid__ = 0;

int Peer::_socket() const {
	if (sock_->is_valid()) {
		return sock_->fd();
	} else {
		return INVALID_SOCKET;
	}
}

bool Peer::isConnected() const {
	return sock_->is_valid() && (status_ == NodeStatus::kConnected);
}

bool Peer::isValid() const {
	return sock_ && sock_->is_valid() && ((status_ == NodeStatus::kConnected) || (status_ == NodeStatus::kConnecting));
}

void Peer::_set_socket_options() {
	CHECK(net_);
	CHECK(sock_);
	
	// error printed by set methods (return value ignored)
	sock_->set_send_buffer_size(net_->getSendBufferSize(sock_->scheme()));
	sock_->set_recv_buffer_size(net_->getRecvBufferSize(sock_->scheme()));

	DLOG(1)	<< "send buffer size: " << (sock_->get_send_buffer_size() >> 10) << "KiB, "
			<< "recv buffer size: " << (sock_->get_recv_buffer_size() >> 10) << "KiB";
}

void Peer::_send_handshake() {
	DLOG(INFO)	<< "(" << (outgoing_ ? "connecting" : "listening")
			<< " peer) handshake sent, status: "
			<< (isConnected() ? "connected" : "connecting");
	
	send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, net_->id());
}

void Peer::_process_handshake(uint64_t magic, uint32_t version, UUID pid) {
	/** Handshake protocol:
	 * 	(1). Listening side accepts connection and sends handshake.
	 * 	(2). Connecting side acknowledges by replying with own handshake and
	 * 		 sets status to kConnected.
	 * 	(3). Listening side receives handshake and sets status to kConnected.
	 */
	if (magic != ftl::net::kMagic) {
		net_->_notifyError(this, ftl::protocol::Error::kBadHandshake, "invalid magic during handshake");
		_close(reconnect_on_protocol_error_);
	} else {
		if (version != ftl::net::kVersion) LOG(WARNING) << "net protocol using different versions!";

		DLOG(INFO) << "(" << (outgoing_ ? "connecting" : "listening")
				  << " peer) handshake received from remote for " << pid.to_string();

		status_ = NodeStatus::kConnected;
		version_ = version;
		peerid_ = pid;

		if (outgoing_) {
			// only outgoing connection replies with handshake, listening socket
			// sends initial handshake on connect
			_send_handshake();
		}

		++connection_count_;
		net_->_notifyConnect(this);
	}
}

void Peer::_bind_rpc() {
	// Install return handshake handler.
	bind("__handshake__", [this](uint64_t magic, uint32_t version, UUID pid) {
		_process_handshake(magic, version, pid);
	});

	bind("__disconnect__", [this]() {
		close(reconnect_on_remote_disconnect_);
		DLOG(1) << "peer elected to disconnect: " << id().to_string();
	});

	bind("__ping__", [this]() {
		return ftl::time::get_time();
	});

}

Peer::Peer(std::unique_ptr<internal::SocketConnection> s, Universe* u, Dispatcher* d) :
		outgoing_(false),
		local_id_(0),
		uri_("0"),
		status_(NodeStatus::kConnecting),
		can_reconnect_(false),
		net_(u),
		sock_(std::move(s)),
		disp_(std::make_unique<Dispatcher>(d)) {
	
	/* Incoming connection constructor */

	CHECK(sock_) << "incoming SocketConnection pointer null";
	_set_socket_options();
	_updateURI();
	_bind_rpc();
	++net_->peer_instances_;
}

Peer::Peer(const ftl::URI& uri, Universe *u, Dispatcher *d) : 
		outgoing_(true),
		local_id_(0),
		uri_(uri),
		status_(NodeStatus::kInvalid),
		can_reconnect_(true),
		net_(u),
		disp_(std::make_unique<Dispatcher>(d)) {
	
	/* Outgoing connection constructor */

	_bind_rpc();
	_connect();
	++net_->peer_instances_;
}

void Peer::start() {
	if (outgoing_) {
		// Connect needs to be in constructor
	} else {
		_send_handshake();
	}
}

void Peer::_connect() {
	sock_ = ftl::net::internal::createConnection(uri_); // throws on bad uri
	_set_socket_options();
	sock_->connect(uri_); // throws on error
	status_ = NodeStatus::kConnecting;
}

/** Called from ftl::Universe::_periodic() */
bool Peer::reconnect() {

	if (status_ != NodeStatus::kConnecting || !can_reconnect_) return false;

	URI uri(uri_);

	DLOG(INFO) << "Reconnecting to " << uri_.to_string() << " ...";

	// First, ensure all stale jobs and buffer data are removed.
	while (job_count_ > 0 && ftl::pool.size() > 0) {
		DLOG(1) << "Waiting on peer jobs before reconnect " << job_count_;
		std::this_thread::sleep_for(std::chrono::milliseconds(2));
	}
	recv_buf_.remove_nonparsed_buffer();
	recv_buf_.reset();

	try {
		_connect();
		return true;
		
	} catch(const std::exception& ex) {
		net_->_notifyError(this, ftl::protocol::Error::kReconnectionFailed, ex.what());
	}

	close(true); 
	return false;
}

void Peer::_updateURI() {
	// should be same as provided uri for connecting sockets, for connections
	// created by listening socket should generate some meaningful value
	uri_ = sock_->uri();
}

void Peer::rawClose() {	
	UNIQUE_LOCK(send_mtx_, lk_send);
	//UNIQUE_LOCK(recv_mtx_, lk_recv);
	sock_->close();
	status_ = NodeStatus::kDisconnected;
}

void Peer::close(bool retry) {
	// Attempt to inform about disconnect
	if (sock_->is_valid() && status_ == NodeStatus::kConnected) {
		send("__disconnect__");
	}

	UNIQUE_LOCK(send_mtx_, lk_send);
	//UNIQUE_LOCK(recv_mtx_, lk_recv);

	_close(retry);
}

void Peer::_close(bool retry) {
	if (status_ != NodeStatus::kConnected && status_ != NodeStatus::kConnecting) return;
	
	// Attempt auto reconnect?
	if (retry && can_reconnect_) {
		status_ = NodeStatus::kReconnecting;
	} else {
		status_ = NodeStatus::kDisconnected;
	}

	if (sock_->is_valid()) {
		net_->_notifyDisconnect(this);
		sock_->close();
	}
}

bool Peer::socketError() {
	int errcode = sock_->getSocketError();

	if (!sock_->is_fatal(errcode)) return false;

	if (errcode == ECONNRESET) {
		_close(reconnect_on_socket_error_);
		return true;
	}

	net_->_notifyError(this, ftl::protocol::Error::kSocketError, std::string("Socket error: ") + std::to_string(errcode));
	_close(reconnect_on_socket_error_); 
	return true;
}

void Peer::error(int e) {
	
}

NodeType Peer::getType() const {
	if ((uri_.getScheme() == URI::SCHEME_WS)
		|| (uri_.getScheme() == URI::SCHEME_WSS)) {
		
		return NodeType::kWebService;
	}
	return NodeType::kNode;
}

void Peer::_createJob() {
	++job_count_;

	ftl::pool.push([this](int id) {
		try {
			_data();
		} catch (const std::exception &e) {
			net_->_notifyError(this, ftl::protocol::Error::kUnknown, e.what());	
		}
		--job_count_;
	});
}

void Peer::data() {
	if (!sock_->is_valid()) { return; }

	int rc = 0;

	// Only need to lock and reserve buffer if there isn't enough
	if (recv_buf_.buffer_capacity() < kMaxMessage) {
		UNIQUE_LOCK(recv_mtx_,lk);
		recv_buf_.reserve_buffer(kMaxMessage);
	}

	int cap = static_cast<int>(recv_buf_.buffer_capacity());

	try {
		rc = sock_->recv(recv_buf_.buffer(), recv_buf_.buffer_capacity());
		
		if (rc >= cap - 1) {
			net_->_notifyError(this, ftl::protocol::Error::kBufferSize, "Too much data received"); 
			// TODO: Increase the buffer size next time
		}
		if (cap < (kMaxMessage / 10)) {
			net_->_notifyError(this, ftl::protocol::Error::kBufferSize, "Buffer is at capacity"); 
		}

	} catch (std::exception& ex) {
		net_->_notifyError(this, ftl::protocol::Error::kSocketError, ex.what());	
		close(reconnect_on_socket_error_);
		return;

	}

	if (rc == 0) { // retry later
		CHECK(sock_->is_valid() == false);
		//close(reconnect_on_socket_error_);
		return;
	}
	if (rc < 0) { // error so close peer
		sock_->close();
		close(reconnect_on_socket_error_);
		return;
	}

	// May possibly need locking
	recv_buf_.buffer_consumed(rc);

	recv_checked_.clear();
	if (!already_processing_.test_and_set()) {
		//lk.unlock();
		_createJob();
	}
}

bool Peer::_has_next() {

	if (!sock_->is_valid()) { return false; }

	bool has_next = true;
	// buffer might contain non-msgpack data (headers etc). check with
	// prepare_next() and skip if necessary
	size_t skip;
	auto buffer = recv_buf_.nonparsed_buffer();
	auto buffer_len = recv_buf_.nonparsed_size();
	has_next = sock_->prepare_next(buffer, buffer_len, skip);

	if (has_next) { recv_buf_.skip_nonparsed_buffer(skip); }

	return has_next;
}

bool Peer::_data() {
	// lock before trying to acquire handle to buffer
	//UNIQUE_LOCK(recv_mtx_, lk);

	// msgpack::object is valid as long as handle is
	msgpack::object_handle msg_handle;

	try {
		recv_checked_.test_and_set();

		UNIQUE_LOCK(recv_mtx_,lk);
		bool has_next = _has_next() && recv_buf_.next(msg_handle);
		lk.unlock();

		if (!has_next) {
			already_processing_.clear();
			if (!recv_checked_.test_and_set() && !already_processing_.test_and_set()) {
				return _data();
			}
			return false;
		}
	} catch (const std::exception& ex) {
		net_->_notifyError(this, ftl::protocol::Error::kPacketFailure, ex.what());
		_close(reconnect_on_protocol_error_);
		return false;
	}

	//lk.unlock();

	msgpack::object obj = msg_handle.get();

	if (status_ == NodeStatus::kConnecting) {
		// If not connected, must lock to make sure no other thread performs this step
		//lk.lock();

		// Verify still not connected after lock
		//if (status_ == NodeStatus::kConnecting) {
			// First message must be a handshake
			try {
				tuple<uint32_t, std::string, msgpack::object> hs;
				obj.convert(hs);
				
				if (get<1>(hs) != "__handshake__") {
					DLOG(WARNING) << "Missing handshake - got '" << get<1>(hs) << "'";

					// Allow a small delay in case another thread is doing the handshake
					//lk.unlock();
					std::this_thread::sleep_for(std::chrono::milliseconds(10));
					if (status_ == NodeStatus::kConnecting) {
						net_->_notifyError(this, ftl::protocol::Error::kMissingHandshake, "failed to get handshake");
						close(reconnect_on_protocol_error_);
						//lk.lock();
						return false;
					}
				} else {
					// Must handle immediately with no other thread able
					// to read next message before completion.
					// The handshake handler must not block.

					try {
						disp_->dispatch(*this, obj);
					} catch (const std::exception &e) {
						net_->_notifyError(this, ftl::protocol::Error::kDispatchFailed, e.what());
					}

					_createJob();
					return true;
				}
			} catch(...) {
				DLOG(WARNING) << "Bad first message format... waiting";
				// Allow a small delay in case another thread is doing the handshake

				//lk.unlock();
				std::this_thread::sleep_for(std::chrono::milliseconds(10));
				if (status_ == NodeStatus::kConnecting) {
					net_->_notifyError(this, ftl::protocol::Error::kMissingHandshake, "failed to get handshake");
					close(reconnect_on_protocol_error_);
					return false;
				}
			}
		//} else {
			//lk.unlock();
		//}
	}
	
	// Process more data...
	_createJob();
	
	try {
		disp_->dispatch(*this, obj);
	} catch (const std::exception &e) {
		net_->_notifyError(this, ftl::protocol::Error::kDispatchFailed, e.what());
	}

	// Lock again before freeing msg_handle (destruction order).
	// msgpack::object_handle destructor modifies recv_buffer_
	//lk.lock();
	return true;
}

void Peer::_dispatchResponse(uint32_t id, const std::string &name, msgpack::object &res) {
	UNIQUE_LOCK(cb_mtx_,lk);
	if (callbacks_.count(id) > 0) {
		
		// Allow for unlock before callback
		auto cb = std::move(callbacks_[id]);
		callbacks_.erase(id);
		lk.unlock();

		// Call the callback with unpacked return value
		try {
			(*cb)(res);
		} catch(std::exception &e) {
			net_->_notifyError(this, ftl::protocol::Error::kRPCResponse, e.what());	
		}
	} else {
		net_->_notifyError(this, ftl::protocol::Error::kRPCResponse, "Missing RPC callback for result - discarding: " + name);	
	}
}

void Peer::cancelCall(int id) {
	UNIQUE_LOCK(cb_mtx_,lk);
	if (callbacks_.count(id) > 0) {
		callbacks_.erase(id);
	}
}

void Peer::_sendResponse(uint32_t id, const std::string &name, const msgpack::object &res) {
	Dispatcher::response_t res_obj = std::make_tuple(1,id,name,res);
	UNIQUE_LOCK(send_mtx_,lk);
	msgpack::pack(send_buf_, res_obj);
	_send();
}

void Peer::_waitCall(int id, std::condition_variable &cv, bool &hasreturned, const std::string &name) {
	std::mutex m;

	int64_t beginat = ftl::time::get_time();
	std::function<void(int)> j;
	while (!hasreturned) {
		// Attempt to do a thread pool job if available
		if ((bool)(j=ftl::pool.pop())) {
			j(-1);
		} else {
			// Block for a little otherwise
			std::unique_lock<std::mutex> lk(m);
			cv.wait_for(lk, std::chrono::milliseconds(2), [&hasreturned]{return hasreturned;});
		}

		if (ftl::time::get_time() - beginat > 1000) break;
	}
	
	if (!hasreturned) {
		cancelCall(id);
		throw FTL_Error("RPC failed with timeout: " << name);
	}
}

bool Peer::waitConnection(int s) {
	if (status_ == NodeStatus::kConnected) return true;
	else if (status_ == NodeStatus::kDisconnected) return false;
	
	std::mutex m;
	m.lock();
	std::condition_variable_any cv;

	auto h = net_->onConnect([this, &cv](const PeerPtr &p) {
		if (p.get() == this) {
			cv.notify_one();
		}
		return true;
	});

	cv.wait_for(m, seconds(s), [this]() { return status_ == NodeStatus::kConnected;});
	m.unlock();
	return status_ == NodeStatus::kConnected;
}

int Peer::_send() {
	if (!sock_->is_valid()) return -1;

	ssize_t c = 0;
	
	try {
		c = sock_->writev(send_buf_.vector(), send_buf_.vector_size());
		if (c <= 0) {
			// writev() should probably throw exception which is reported here
			// at the moment, error message is (should be) printed by writev()
			net_->_notifyError(this, ftl::protocol::Error::kSocketError, "writev() failed");
			return c;
		}
	
		ssize_t sz = 0; for (size_t i = 0; i < send_buf_.vector_size(); i++) {
			sz += send_buf_.vector()[i].iov_len;
		} 
		if (c != sz) {
			net_->_notifyError(this, ftl::protocol::Error::kSocketError, "writev(): incomplete send");
			_close(reconnect_on_socket_error_);
		}

		send_buf_.clear();

	} catch (std::exception& ex) {
		net_->_notifyError(this, ftl::protocol::Error::kSocketError, ex.what());
		_close(reconnect_on_socket_error_);
	}
	
	return c;
}

Peer::~Peer() {
	--net_->peer_instances_;
	{
		UNIQUE_LOCK(send_mtx_,lk1);
		//UNIQUE_LOCK(recv_mtx_,lk2);
		_close(false);
	}

	// Prevent deletion if there are any jobs remaining
	if (job_count_ > 0 && ftl::pool.size() > 0) {
		DLOG(1) << "Waiting on peer jobs... " << job_count_;
		std::this_thread::sleep_for(std::chrono::milliseconds(2));
		if (job_count_ > 0) LOG(FATAL) << "Peer jobs not terminated";
	}
}