//#define GLOG_NO_ABBREVIATED_SEVERITIES
#include <loguru.hpp>
#include <ctpl_stl.h>

#ifndef NOMINMAX
#define NOMINMAX
#endif

#include <ftl/net/common.hpp>

#include <fcntl.h>
#ifdef WIN32
#include <Ws2tcpip.h>
#endif

#ifdef WIN32
#pragma comment(lib, "Ws2_32.lib")
#pragma comment(lib, "Rpcrt4.lib")
#endif

#include <ftl/uri.hpp>
#include <ftl/net/peer.hpp>
#include <ftl/net/ws_internal.hpp>
#include <ftl/config.h>
#include "net_internal.hpp"
#include <ftl/net/universe.hpp>

#include <iostream>
#include <memory>
#include <algorithm>
#include <tuple>
#include <chrono>
#include <vector>

using std::tuple;
using std::get;
using ftl::net::Peer;
using ftl::URI;
using ftl::net::ws_connect;
using ftl::net::Dispatcher;
using std::chrono::seconds;
using ftl::net::Universe;
using ftl::net::callback_t;
using std::vector;

#define TCP_BUFFER_SIZE	(1024*1024*10)

/*static std::string hexStr(const std::string &s)
{
	const char *data = s.data();
	int len = s.size();
    std::stringstream ss;
    ss << std::hex;
    for(int i=0;i<len;++i)
        ss << std::setw(2) << std::setfill('0') << (int)data[i];
    return ss.str();
}*/

int Peer::rpcid__ = 0;

// Global peer UUID
ftl::UUID ftl::net::this_peer;

//static ctpl::thread_pool pool(5);

// TODO:(nick) Move to tcp_internal.cpp
static SOCKET tcpConnect(URI &uri) {
	int rc;
	//sockaddr_in destAddr;

	#ifdef WIN32
	WSAData wsaData;
	if (WSAStartup(MAKEWORD(1,1), &wsaData) != 0) {
		LOG(ERROR) << "Could not initiate sockets";
		return INVALID_SOCKET;
	}
	#endif
	
	//We want a TCP socket
	SOCKET csocket = socket(AF_INET, SOCK_STREAM, 0);

	if (csocket == INVALID_SOCKET) {
		LOG(ERROR) << "Unable to create TCP socket";
		return INVALID_SOCKET;
	}

	int flags =1; 
    if (setsockopt(csocket, IPPROTO_TCP, TCP_NODELAY, (const char *)&flags, sizeof(flags))) { LOG(ERROR) << "ERROR: setsocketopt(), TCP_NODELAY"; };

	int a = TCP_BUFFER_SIZE;
	if (setsockopt(csocket, SOL_SOCKET, SO_RCVBUF, (const char *)&a, sizeof(int)) == -1) {
		fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno));
	}
	if (setsockopt(csocket, SOL_SOCKET, SO_SNDBUF, (const char *)&a, sizeof(int)) == -1) {
		fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno));
	}

	addrinfo hints = {}, *addrs;
	hints.ai_family = AF_INET;
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_protocol = IPPROTO_TCP;
	rc = getaddrinfo(uri.getHost().c_str(), std::to_string(uri.getPort()).c_str(), &hints, &addrs);

	if (rc != 0 || addrs == nullptr) {
		#ifndef WIN32
		close(csocket);
		#else
		closesocket(csocket);
		#endif

		LOG(ERROR) << "Address not found : " << uri.getHost() << std::endl;
		return INVALID_SOCKET;
	}

	// Make nonblocking
#ifndef WIN32
	long arg = fcntl(csocket, F_GETFL, NULL);
	arg |= O_NONBLOCK;
	fcntl(csocket, F_SETFL, arg);
#endif

	// TODO:(Nick) - Check all returned addresses.
	auto addr = addrs;
	rc = ::connect(csocket, addr->ai_addr, (socklen_t)addr->ai_addrlen);

	if (rc < 0) {
		if (errno == EINPROGRESS) {
			// FIXME:(Nick) Move to main select thread to prevent blocking
			fd_set myset; 
			struct timeval tv;
			tv.tv_sec = 1; 
			tv.tv_usec = 0; 
			FD_ZERO(&myset); 
			FD_SET(csocket, &myset); 
			rc = select(csocket+1, NULL, &myset, NULL, &tv); 
			if (rc <= 0) { //} && errno != EINTR) { 
				#ifndef WIN32
				close(csocket);
				#else
				closesocket(csocket);
				#endif

				LOG(ERROR) << "Could not connect to " << uri.getBaseURI();

				return INVALID_SOCKET;
			}
		} else {
			#ifndef WIN32
			close(csocket);
			#else
			closesocket(csocket);
			#endif

			LOG(ERROR) << "Could not connect to " << uri.getBaseURI();

			return INVALID_SOCKET;
		}
	}

	// Make blocking again
#ifndef WIN32
	arg = fcntl(csocket, F_GETFL, NULL);
	arg &= (~O_NONBLOCK);
	fcntl(csocket, F_SETFL, arg);
#endif

	return csocket;
}

Peer::Peer(SOCKET s, Universe *u, Dispatcher *d) : sock_(s), can_reconnect_(false), universe_(u), ws_read_header_(false) {
	status_ = (s == INVALID_SOCKET) ? kInvalid : kConnecting;
	_updateURI();
	
	disp_ = new Dispatcher(d);
	
	is_waiting_ = true;
	scheme_ = ftl::URI::SCHEME_TCP;

	int flags =1; 
    if (setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (const char *)&flags, sizeof(flags))) { LOG(ERROR) << "ERROR: setsocketopt(), TCP_NODELAY"; };
	int a = TCP_BUFFER_SIZE;
	if (setsockopt(s, SOL_SOCKET, SO_RCVBUF, (const char *)&a, sizeof(int)) == -1) {
		fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno));
	}
	if (setsockopt(s, SOL_SOCKET, SO_SNDBUF, (const char *)&a, sizeof(int)) == -1) {
		fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno));
	}
	
	// Send the initiating handshake if valid
	if (status_ == kConnecting) {
		// Install return handshake handler.
		bind("__handshake__", [this](uint64_t magic, uint32_t version, UUID pid) {
			LOG(INFO) << "Handshake 2 received";
			if (magic != ftl::net::kMagic) {
				_badClose(false);
				LOG(ERROR) << "Invalid magic during handshake";
			} else {
				status_ = kConnected;
				version_ = version;
				peerid_ = pid;
				if (version != ftl::net::kVersion) LOG(WARNING) << "Net protocol using different versions!";
				
				// Ensure handlers called later or in new thread
				ftl::pool.push([this](int id) {
					universe_->_notifyConnect(this);
				});
			}
		});

		bind("__disconnect__", [this]() {
			_badClose(false);
			LOG(INFO) << "Peer elected to disconnect: " << id().to_string();
		});

		bind("__ping__", [this]() {
			auto now = std::chrono::high_resolution_clock::now();
			return std::chrono::time_point_cast<std::chrono::milliseconds>(now).time_since_epoch().count();
		});

		send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, ftl::net::this_peer); 
	}
}

Peer::Peer(const char *pUri, Universe *u, Dispatcher *d) : can_reconnect_(true), universe_(u), ws_read_header_(false), uri_(pUri) {	
	URI uri(pUri);
	
	status_ = kInvalid;
	sock_ = INVALID_SOCKET;
	
	disp_ = new Dispatcher(d);

	// Must do to prevent receiving message before handlers are installed
	UNIQUE_LOCK(recv_mtx_,lk);

	scheme_ = uri.getProtocol();
	if (uri.getProtocol() == URI::SCHEME_TCP) {
		sock_ = tcpConnect(uri);
		if (sock_ != INVALID_SOCKET) status_ = kConnecting;
		else status_ = kReconnecting;
	} else if (uri.getProtocol() == URI::SCHEME_WS) {
		LOG(INFO) << "Websocket connect " << uri.getPath();
		sock_ = tcpConnect(uri);
		if (sock_ != INVALID_SOCKET) {
			if (!ws_connect(sock_, uri)) {
				LOG(ERROR) << "Websocket connection failed";
				_badClose(false);
			} else {
				status_ = kConnecting;
				LOG(INFO) << "WEB SOCK CONNECTED";
			}
		} else {
			LOG(ERROR) << "Connection refused to " << uri.getHost() << ":" << uri.getPort();
			status_ = kReconnecting;
		}

		//status_ = kConnecting;
	} else {
		LOG(ERROR) << "Unrecognised connection protocol: " << pUri;
		return;
	}
	
	is_waiting_ = true;
	
	if (status_ == kConnecting || status_ == kReconnecting) {
		// Install return handshake handler.
		bind("__handshake__", [this](uint64_t magic, uint32_t version, UUID pid) {
			LOG(INFO) << "Handshake 1 received";
			if (magic != ftl::net::kMagic) {
				_badClose(false);
				LOG(ERROR) << "Invalid magic during handshake";
			} else {
				status_ = kConnected;
				version_ = version;
				peerid_ = pid;
				if (version != ftl::net::kVersion) LOG(WARNING) << "Net protocol using different versions!";
				send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, ftl::net::this_peer);
				
				// Ensure handlers called later or in new thread
				ftl::pool.push([this](int id) {
					universe_->_notifyConnect(this);
				});
			}
		}); 

		bind("__disconnect__", [this]() {
			_badClose(false);
			LOG(INFO) << "Peer elected to disconnect: " << id().to_string();
		});

		bind("__ping__", [this]() {
			auto now = std::chrono::high_resolution_clock::now();
			return std::chrono::time_point_cast<std::chrono::milliseconds>(now).time_since_epoch().count();
		});
	}
}

bool Peer::reconnect() {
	if (status_ != kReconnecting || !can_reconnect_) return false;

	URI uri(uri_);

	LOG(INFO) << "Reconnecting to " << uri_ << " ...";

	if (scheme_ == URI::SCHEME_TCP) {
		sock_ = tcpConnect(uri);
		if (sock_ != INVALID_SOCKET) {
			status_ = kConnecting;
			is_waiting_ = true;
			return true;
		} else {
			return false;
		}
	} else if (scheme_ == URI::SCHEME_WS) {
		sock_ = tcpConnect(uri);
		if (sock_ != INVALID_SOCKET) {
			if (!ws_connect(sock_, uri)) {
				return false;
			} else {
				status_ = kConnecting;
				LOG(INFO) << "WEB SOCK CONNECTED";
				return true;
			}
		} else {
			return false;
		}
	}

	// TODO:(Nick) allow for other protocols in reconnect
	return false;
}

void Peer::_updateURI() {
	sockaddr_storage addr;

	// FIXME:(Nick) Get actual protocol...
	scheme_ = ftl::URI::SCHEME_TCP;

	int rsize = sizeof(sockaddr_storage);
	if (getpeername(sock_, (sockaddr*)&addr, (socklen_t*)&rsize) == 0) {
		char addrbuf[INET6_ADDRSTRLEN];
		int port;
		
		if (addr.ss_family == AF_INET) {
			struct sockaddr_in *s = (struct sockaddr_in *)&addr;
			//port = ntohs(s->sin_port);
			inet_ntop(AF_INET, &s->sin_addr, addrbuf, INET6_ADDRSTRLEN);
			port = s->sin_port;
		} else { // AF_INET6
			struct sockaddr_in6 *s = (struct sockaddr_in6 *)&addr;
			//port = ntohs(s->sin6_port);
			inet_ntop(AF_INET6, &s->sin6_addr, addrbuf, INET6_ADDRSTRLEN);
			port = s->sin6_port;
		}
		
		uri_ = std::string("tcp://")+addrbuf;
		uri_ += ":";
		uri_ += std::to_string(port);
	}
}

void Peer::close(bool retry) {
	if (sock_ != INVALID_SOCKET) {
		// Attempt to inform about disconnect
		send("__disconnect__");

		_badClose(retry);
		LOG(INFO) << "Deliberate disconnect of peer.";
	}
}

void Peer::_badClose(bool retry) {
	if (sock_ != INVALID_SOCKET) {
		#ifndef WIN32
		::close(sock_);
		#else
		closesocket(sock_);
		#endif
		sock_ = INVALID_SOCKET;
		status_ = kDisconnected;
		
		//auto i = find(sockets.begin(),sockets.end(),this);
		//sockets.erase(i);

		universe_->_notifyDisconnect(this);

		// Attempt auto reconnect?
		if (retry && can_reconnect_) {
			status_ = kReconnecting;
		}
	}
}

void Peer::socketError() {
	int err;
#ifdef WIN32
	int optlen = sizeof(err);
#else
	uint32_t optlen = sizeof(err);
#endif
	getsockopt(sock_, SOL_SOCKET, SO_ERROR, (char*)&err, &optlen);

	// Must close before log since log may try to send over net causing
	// more socket errors...
	_badClose();

	LOG(ERROR) << "Socket: " << uri_ << " - error " << err;
}

void Peer::error(int e) {
	
}

void Peer::data() {
	{
		UNIQUE_LOCK(recv_mtx_,lk);

		int rc=0;
		int c=0;

		//do {
			recv_buf_.reserve_buffer(kMaxMessage);

			if (recv_buf_.buffer_capacity() < (kMaxMessage / 10)) {
				LOG(WARNING) << "Net buffer at capacity";
				return;
			}

			int cap = recv_buf_.buffer_capacity();
			rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), cap, 0);
			//if (c > 0 && rc > 0) LOG(INFO) << "RECV: " << rc;

			if (rc >= cap-1) {
				LOG(WARNING) << "More than buffers worth of data received"; 
			}
			if (cap < (kMaxMessage / 10)) LOG(WARNING) << "NO BUFFER";

			if (rc == 0) {
				close();
				return;
			} else if (rc < 0 && c == 0) {
				socketError();
				return;
			}

			//if (rc == -1) break;
			++c;
			
			recv_buf_.buffer_consumed(rc);
		//} while (rc > 0);
	}

	ftl::pool.push([this](int id) {
		_data();
	});
}

bool Peer::_data() {
	msgpack::object_handle msg;

	UNIQUE_LOCK(recv_mtx_,lk);

	if (scheme_ == ftl::URI::SCHEME_WS && !ws_read_header_) {
		wsheader_type ws;
		if (ws_parse(recv_buf_, ws) < 0) {
			return false;
		}
		ws_read_header_ = true;
	}

	if (!recv_buf_.next(msg)) return false;
	msgpack::object obj = msg.get();
	lk.unlock();

	ftl::pool.push([this](int id) {
		_data();
	});

	if (status_ != kConnected) {
		// If not connected, must lock to make sure no other thread performs this step
		UNIQUE_LOCK(recv_mtx_,lk);
		// Verify still not connected after lock
		if (status_ != kConnected) {
			// First message must be a handshake
			try {
				tuple<uint32_t, std::string, msgpack::object> hs;
				obj.convert(hs);
				
				if (get<1>(hs) != "__handshake__") {
					_badClose(false);
					LOG(ERROR) << "Missing handshake - got '" << get<1>(hs) << "'";
					return false;
				} else {
					// Must handle immediately with no other thread able
					// to read next message before completion.
					// The handshake handler must not block.
					disp_->dispatch(*this, obj);
					return true;
				}
			} catch(...) {
				_badClose(false);
				LOG(ERROR) << "Bad first message format";
				return false;
			}
		}
	}
	
	disp_->dispatch(*this, obj);

	// Lock again before freeing msg handle
	UNIQUE_LOCK(recv_mtx_,lk2);
	return true;
}

void Peer::_dispatchResponse(uint32_t id, msgpack::object &res) {	
	// TODO: Handle error reporting...
	UNIQUE_LOCK(cb_mtx_,lk);
	if (callbacks_.count(id) > 0) {
		DLOG(1) << "Received return RPC value";
		
		// Allow for unlock before callback
		auto cb = std::move(callbacks_[id]);
		callbacks_.erase(id);
		lk.unlock();

		// Call the callback with unpacked return value
		(*cb)(res);
	} else {
		LOG(WARNING) << "Missing RPC callback for result - discarding";
	}
}

void Peer::cancelCall(int id) {
	UNIQUE_LOCK(cb_mtx_,lk);
	if (callbacks_.count(id) > 0) {
		callbacks_.erase(id);
	}
}

void Peer::_sendResponse(uint32_t id, const msgpack::object &res) {
	Dispatcher::response_t res_obj = std::make_tuple(1,id,std::string(""),res);
	UNIQUE_LOCK(send_mtx_,lk);
	if (scheme_ == ftl::URI::SCHEME_WS) send_buf_.append_ref(nullptr,0);
	msgpack::pack(send_buf_, res_obj);
	_send();
}

bool Peer::waitConnection() {
	if (status_ == kConnected) return true;
	
	std::mutex m;
	//UNIQUE_LOCK(m,lk);
	std::unique_lock<std::mutex> lk(m);
	std::condition_variable cv;

	callback_t h = universe_->onConnect([this,&cv](Peer *p) {
		if (p == this) {
			cv.notify_one();
		}
	});

	cv.wait_for(lk, seconds(5));
	universe_->removeCallback(h);
	return status_ == kConnected;
}

/*void Peer::onConnect(const std::function<void(Peer&)> &f) {
	if (status_ == kConnected) {
		f(*this);
	} else {
		open_handlers_.push_back(f);
	}
}*/

void Peer::_connected() {
	status_ = kConnected;

}

int Peer::_send() {
	if (sock_ == INVALID_SOCKET) return -1;

	// Are we using a websocket?
	if (scheme_ == ftl::URI::SCHEME_WS) {
		// Create a websocket header as well.
		size_t len = 0;
		const iovec *sendvec = send_buf_.vector();
		size_t size = send_buf_.vector_size();
		char buf[20];
		
		// Calculate total size of message
		for (size_t i=1; i < size; i++) {
			len += sendvec[i].iov_len;
		}

		if (sendvec[0].iov_len != 0) {
			LOG(FATAL) << "CORRUPTION in websocket header buffer";
		}

		//LOG(INFO) << "SEND SIZE = " << len;
		
		// Pack correct websocket header into buffer
		int rc = ws_prepare(wsheader_type::BINARY_FRAME, false, len, buf, 20);
		if (rc == -1) return -1;
		
		// Patch the first io vector to be ws header
		const_cast<iovec*>(&sendvec[0])->iov_base = buf;
		const_cast<iovec*>(&sendvec[0])->iov_len = rc;
	}
	
#ifdef WIN32
	auto send_vec = send_buf_.vector();
	auto send_size = send_buf_.vector_size();
	vector<WSABUF> wsabuf(send_size);

	for (int i = 0; i < send_size; i++) {
		wsabuf[i].len = (ULONG)send_vec[i].iov_len;
		wsabuf[i].buf = (char*)send_vec[i].iov_base;
		//c += ftl::net::internal::send(sock_, (char*)send_vec[i].iov_base, (int)send_vec[i].iov_len, 0);
	}

	DWORD bytessent;
	int c = WSASend(sock_, wsabuf.data(), send_size, (LPDWORD)&bytessent, 0, NULL, NULL);
#else
	int c = ftl::net::internal::writev(sock_, send_buf_.vector(), (int)send_buf_.vector_size());
#endif

	send_buf_.clear();
	
	// We are blocking, so -1 should mean actual error
	if (c == -1) {
		socketError();
		//_badClose();
	}
	
	return c;
}

Peer::~Peer() {
	UNIQUE_LOCK(send_mtx_,lk1);
	UNIQUE_LOCK(recv_mtx_,lk2);
	_badClose(false);
	LOG(INFO) << "Deleting peer object";

	delete disp_;
}