/**
 * @file universe.hpp
 * @copyright Copyright (c) 2020 University of Turku, MIT License
 * @author Nicolas Pope
 */

#pragma once

#include <vector>
#include <list>
#include <string>
#include <thread>
#include <map>
#include <unordered_map>
#include <memory>

#include <msgpack.hpp>

#include <ftl/protocol.hpp>
#include <ftl/protocol/error.hpp>
#include "peer.hpp"
#include "dispatcher.hpp"
#include <ftl/uuid.hpp>
#include <ftl/uri.hpp>
#include <ftl/threads.hpp>
#include <ftl/handle.hpp>

#include <ftl/lib/nlohmann/json_fwd.hpp>

namespace ftl {
namespace net {

struct ReconnectInfo {
    int tries;
    float delay;
    PeerPtr peer;
};

struct NetImplDetail;

using Callback = unsigned int;

/**
 * Represents a group of network peers and their resources, managing the
 * searching of and sharing of resources across peers. Each universe can
 * listen on multiple ports/interfaces for connecting peers, and can connect
 * to any number of peers. The creation of a Universe object also creates a
 * new thread to manage the networking, therefore it is threadsafe but
 * callbacks will execute in a different thread so must also be threadsafe in
 * their actions.
 */
class Universe {
 public:
    friend class Peer;

    Universe();

    /**
     * The destructor will terminate the network thread before completing.
     */
    ~Universe();

    void start();

    /**
     * Open a new listening port on a given interfaces.
     *   eg. "tcp://localhost:9000"
     * @param addr URI giving protocol, interface and port
     */
    bool listen(const ftl::URI &addr);

    std::vector<ftl::URI> getListeningURIs();

    /**
     * Essential to call this before destroying anything that registered
     * callbacks or binds for RPC. It will terminate all connections and
     * stop any network activity but without deleting the net object.
     */
    void shutdown();

    /**
     * Create a new peer connection.
     *   eg. "tcp://10.0.0.2:9000"
     * Supported protocols include tcp and ws.
     *
     * @param addr URI giving protocol, interface and port
     */
    PeerPtr connect(const std::string &addr);
    PeerPtr connect(const ftl::URI &addr);

    bool isConnected(const ftl::URI &uri);
    bool isConnected(const std::string &s);

    size_t numberOfPeers() const { return connection_count_; }

    /**
     * Will block until all currently registered connnections have completed.
     * You should not use this, but rather use onConnect.
     */
    int waitConnections(int seconds = 1);

    /** get peer pointer by peer UUID, returns nullptr if not found */
    PeerPtr getPeer(const ftl::UUID &pid) const;
    /** get webservice peer pointer, returns nullptr if not connected to webservice */
    PeerPtr getWebService() const;

    std::list<PeerPtr> getPeers() const;

    /**
     * Bind a function to an RPC or service call name. This will implicitely
     * be called by any peer making the request.
     */
    template <typename F>
    void bind(const std::string &name, F func);

    void unbind(const std::string &name);

    /**
     * Check if an RPC name is already bound.
     */
    inline bool isBound(const std::string &name) const { return disp_.isBound(name); }

    /**
     * Send a non-blocking RPC call with no return value to all connected
     * peers.
     */
    template <typename... ARGS>
    void broadcast(const std::string &name, ARGS... args);

    template <typename R, typename... ARGS>
    R call(const UUID &pid, const std::string &name, ARGS... args);

    /**
     * Non-blocking Remote Procedure Call using a callback function.
     * 
     * @param pid Peer GUID
     * @param name RPC Function name.
     * @param cb Completion callback.
     * @param args A variable number of arguments for RPC function.
     * 
     * @return A call id for use with cancelCall() if needed.
     */
    template <typename R, typename... ARGS>
    int asyncCall(const UUID &pid, const std::string &name,
            std::function<void(const R&)> cb,
            ARGS... args);

    template <typename... ARGS>
    bool send(const UUID &pid, const std::string &name, ARGS... args);

    template <typename... ARGS>
    int try_send(const UUID &pid, const std::string &name, ARGS... args);

    template <typename R, typename... ARGS>
    std::optional<R> findOne(const std::string &name, ARGS... args);

    template <typename R, typename... ARGS>
    std::vector<R> findAll(const std::string &name, ARGS... args);

    void setLocalID(const ftl::UUID &u) { this_peer = u; }
    const ftl::UUID &id() const { return this_peer; }

    // --- Event Handlers -----------------------------------------------------

    ftl::Handle onConnect(const std::function<bool(const ftl::net::PeerPtr&)>&);
    ftl::Handle onDisconnect(const std::function<bool(const ftl::net::PeerPtr&)>&);
    ftl::Handle onError(
        const std::function<bool(const ftl::net::PeerPtr&, ftl::protocol::Error, const std::string &)>&);

    size_t getSendBufferSize(ftl::URI::scheme_t s);
    size_t getRecvBufferSize(ftl::URI::scheme_t s);
    void setSendBufferSize(ftl::URI::scheme_t s, size_t size);
    void setRecvBufferSize(ftl::URI::scheme_t s, size_t size);

    static inline std::shared_ptr<Universe> getInstance() { return instance_; }

    void setMaxConnections(size_t m);
    size_t getMaxConnections() const { return peers_.size(); }

    // --- Test support -------------------------------------------------------

    PeerPtr injectFakePeer(std::unique_ptr<ftl::net::internal::SocketConnection> s);

 private:
    void _run();
    void _setDescriptors();
    void _installBindings();
    void _installBindings(const ftl::net::PeerPtr&);
    void _cleanupPeers();
    void _notifyConnect(ftl::net::Peer *);
    void _notifyDisconnect(ftl::net::Peer *);
    void _notifyError(ftl::net::Peer *, ftl::protocol::Error, const std::string &);
    void _periodic();
    ftl::net::PeerPtr _findPeer(const ftl::net::Peer *p);
    void _removePeer(PeerPtr &p);
    void _insertPeer(const ftl::net::PeerPtr &ptr);

    static void __start(Universe *u);

    bool active_;
    ftl::UUID this_peer;
    mutable SHARED_MUTEX net_mutex_;
    std::condition_variable_any socket_cv_;

    std::unique_ptr<NetImplDetail> impl_;

    std::vector<std::unique_ptr<ftl::net::internal::SocketServer>> listeners_;
    std::vector<ftl::net::PeerPtr> peers_;
    std::unordered_map<std::string, size_t> peer_by_uri_;
    std::map<ftl::UUID, size_t> peer_ids_;

    ftl::net::Dispatcher disp_;
    std::list<ReconnectInfo> reconnects_;
    size_t phase_;
    std::list<ftl::net::PeerPtr> garbage_;
    ftl::Handle garbage_timer_;

    // size_t send_size_;
    // size_t recv_size_;
    double periodic_time_;
    int reconnect_attempts_;
    std::atomic_int connection_count_ = 0;  // Active connections
    std::atomic_int peer_instances_ = 0;    // Actual peers dependent on Universe

    ftl::Handler<const ftl::net::PeerPtr&> on_connect_;
    ftl::Handler<const ftl::net::PeerPtr&> on_disconnect_;
    ftl::Handler<const ftl::net::PeerPtr&, ftl::protocol::Error, const std::string &> on_error_;

    static std::shared_ptr<Universe> instance_;

    // Socket buffer sizes
    size_t tcp_send_buffer_;
    size_t tcp_recv_buffer_;
    size_t ws_send_buffer_;
    size_t ws_recv_buffer_;

    // NOTE: Must always be last member
    std::thread thread_;
};

//------------------------------------------------------------------------------

template <typename F>
void Universe::bind(const std::string &name, F func) {
    UNIQUE_LOCK(net_mutex_, lk);
    disp_.bind(name, func,
        typename ftl::internal::func_kind_info<F>::result_kind(),
        typename ftl::internal::func_kind_info<F>::args_kind(),
        typename ftl::internal::func_kind_info<F>::has_peer());
}

template <typename... ARGS>
void Universe::broadcast(const std::string &name, ARGS... args) {
    SHARED_LOCK(net_mutex_, lk);
    for (const auto &p : peers_) {
        if (!p || !p->waitConnection()) continue;
        p->send(name, args...);
    }
}

template <typename R, typename... ARGS>
std::optional<R> Universe::findOne(const std::string &name, ARGS... args) {
    struct SharedData {
        std::atomic_bool hasreturned = false;
        std::mutex m;
        std::condition_variable cv;
        std::optional<R> result;
    };

    auto sdata = std::make_shared<SharedData>();

    auto handler = [sdata](const std::optional<R> &r) {
        std::unique_lock<std::mutex> lk(sdata->m);
        if (r && !sdata->hasreturned) {
            sdata->hasreturned = true;
            sdata->result = r;
        }
        lk.unlock();
        sdata->cv.notify_one();
    };

    {
        SHARED_LOCK(net_mutex_, lk);
        for (const auto &p : peers_) {
            if (!p || !p->waitConnection()) continue;
            p->asyncCall<std::optional<R>>(name, handler, args...);
        }
    }

    // Block thread until async callback notifies us
    std::unique_lock<std::mutex> llk(sdata->m);
    sdata->cv.wait_for(llk, std::chrono::seconds(1), [sdata] {
        return static_cast<bool>(sdata->hasreturned);
    });

    return sdata->result;
}

template <typename R, typename... ARGS>
std::vector<R> Universe::findAll(const std::string &name, ARGS... args) {
    struct SharedData {
        std::atomic_int returncount = 0;
        std::atomic_int sentcount = 0;
        std::mutex m;
        std::condition_variable cv;
        std::vector<R> results;
    };

    auto sdata = std::make_shared<SharedData>();

    auto handler = [sdata](const std::vector<R> &r) {
        std::unique_lock<std::mutex> lk(sdata->m);
        ++sdata->returncount;
        sdata->results.insert(sdata->results.end(), r.begin(), r.end());
        lk.unlock();
        sdata->cv.notify_one();
    };

    {
        SHARED_LOCK(net_mutex_, lk);
        for (const auto &p : peers_) {
            if (!p || !p->waitConnection()) continue;
            ++sdata->sentcount;
            p->asyncCall<std::vector<R>>(name, handler, args...);
        }
    }

    std::unique_lock<std::mutex> llk(sdata->m);
    sdata->cv.wait_for(llk, std::chrono::seconds(1), [sdata]{return sdata->returncount == sdata->sentcount; });
    return sdata->results;
}

template <typename R, typename... ARGS>
R Universe::call(const ftl::UUID &pid, const std::string &name, ARGS... args) {
    PeerPtr p = getPeer(pid);
    if (p == nullptr || !p->isConnected()) {
        if (p == nullptr) throw FTL_Error("Attempting to call an unknown peer : " << pid.to_string());
        else
            throw FTL_Error("Attempting to call an disconnected peer : " << pid.to_string());
    }
    return p->call<R>(name, args...);
}

template <typename R, typename... ARGS>
int Universe::asyncCall(const ftl::UUID &pid, const std::string &name, std::function<void(const R&)> cb, ARGS... args) {
    PeerPtr p = getPeer(pid);
    if (p == nullptr || !p->isConnected()) {
        if (p == nullptr) throw FTL_Error("Attempting to call an unknown peer : " << pid.to_string());
        else
            throw FTL_Error("Attempting to call an disconnected peer : " << pid.to_string());
    }
    return p->asyncCall(name, cb, args...);
}

template <typename... ARGS>
bool Universe::send(const ftl::UUID &pid, const std::string &name, ARGS... args) {
    PeerPtr p = getPeer(pid);
    if (p == nullptr) {
        LOG(WARNING) << "Attempting to call an unknown peer : " << pid.to_string();
        return false;
    }

    return p->isConnected() && p->send(name, args...) > 0;
}

template <typename... ARGS>
int Universe::try_send(const ftl::UUID &pid, const std::string &name, ARGS... args) {
    PeerPtr p = getPeer(pid);
    if (p == nullptr) {
        return false;
    }

    return (p->isConnected()) ? p->try_send(name, args...) : -1;
}

};  // namespace net
};  // namespace ftl