diff --git a/CMakeLists.txt b/CMakeLists.txt index 14523cb787ce39db39e7a49141126a192fe2a068..886295cf67665f6d06be520923d8a68ca01381e7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -155,32 +155,41 @@ include(ftl_CPack) include_directories("include/ftl/lib") -add_library(beyond-protocol STATIC +add_library(beyond-common OBJECT src/ctpl_stl.cpp - src/dispatcher.cpp - src/exception.cpp - src/loguru.cpp - src/peer.cpp - src/universe.cpp - src/uri.cpp + src/dispatcher.cpp + src/exception.cpp + src/loguru.cpp + src/uri.cpp src/config.cpp src/time.cpp - src/node.cpp - src/self.cpp + src/base64.cpp + src/channelSet.cpp +) + +target_include_directories(beyond-common PUBLIC + $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> + $<INSTALL_INTERFACE:include>) + +add_library(beyond-protocol STATIC + $<TARGET_OBJECTS:beyond-common> + src/peer.cpp + src/universe.cpp src/socket/socket.cpp src/protocol/connection.cpp src/protocol/factory.cpp src/protocol/tcp.cpp src/protocol/tls.cpp src/protocol/websocket.cpp - src/base64.cpp - src/protocol.cpp src/streams/streams.cpp - src/channelSet.cpp src/streams/muxer.cpp src/streams/broadcaster.cpp src/streams/netstream.cpp src/streams/filestream.cpp + src/node.cpp + src/self.cpp + src/protocol.cpp + src/rpc.cpp ) target_include_directories(beyond-protocol PUBLIC diff --git a/include/ftl/protocol/broadcaster.hpp b/include/ftl/protocol/broadcaster.hpp index 7afd3f4a30879367e2307accf514000e26f8ab79..4e404579b4b116dafb04350c2c435c86e8ededbc 100644 --- a/include/ftl/protocol/broadcaster.hpp +++ b/include/ftl/protocol/broadcaster.hpp @@ -32,9 +32,9 @@ class Broadcast : public Stream { std::list<std::shared_ptr<Stream>> streams() const; - void setProperty(ftl::protocol::StreamProperty opt, int value) override; + void setProperty(ftl::protocol::StreamProperty opt, std::any value) override; - int getProperty(ftl::protocol::StreamProperty opt) override; + std::any getProperty(ftl::protocol::StreamProperty opt) override; bool supportsProperty(ftl::protocol::StreamProperty opt) override; diff --git a/include/ftl/protocol/error.hpp b/include/ftl/protocol/error.hpp index 97f5f2bb23f566c70417b90879db94758e4fb421..708ed388ea73330d85b1c9fae3b5dc372ce05b34 100644 --- a/include/ftl/protocol/error.hpp +++ b/include/ftl/protocol/error.hpp @@ -18,7 +18,8 @@ enum struct Error { kSelfConnect, kListen, kURIAlreadyExists, - kURIDoesNotExist + kURIDoesNotExist, + kBadURI }; } diff --git a/include/ftl/protocol/muxer.hpp b/include/ftl/protocol/muxer.hpp index f3d85a0e449c2194307e1305691d9545c3569ad4..777c276e61e1c9c59af6e0a7f66f1a4e17d092e7 100644 --- a/include/ftl/protocol/muxer.hpp +++ b/include/ftl/protocol/muxer.hpp @@ -40,9 +40,9 @@ class Muxer : public Stream { bool enable(FrameID id, const ftl::protocol::ChannelSet &channels) override; - void setProperty(ftl::protocol::StreamProperty opt, int value) override; + void setProperty(ftl::protocol::StreamProperty opt, std::any value) override; - int getProperty(ftl::protocol::StreamProperty opt) override; + std::any getProperty(ftl::protocol::StreamProperty opt) override; bool supportsProperty(ftl::protocol::StreamProperty opt) override; diff --git a/include/ftl/protocol/self.hpp b/include/ftl/protocol/self.hpp index c8918cdeef5b3db31b306fa8a14c5669fe70c733..e21b5645b98a8e1711b8c1f5cc70783d8337cabd 100644 --- a/include/ftl/protocol/self.hpp +++ b/include/ftl/protocol/self.hpp @@ -75,6 +75,9 @@ class Self { ftl::Handle onDisconnect(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&)>&); ftl::Handle onError(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&, ftl::protocol::Error, const std::string & )>&); + // Used for testing + ftl::net::Universe *getUniverse() const { return universe_.get(); } + protected: std::shared_ptr<ftl::net::Universe> universe_; }; diff --git a/include/ftl/protocol/streams.hpp b/include/ftl/protocol/streams.hpp index 2de2eec79cd1d98904e6f479590b886a5043506d..1ca85e88da5d11fb1682813b83474823e602de9e 100644 --- a/include/ftl/protocol/streams.hpp +++ b/include/ftl/protocol/streams.hpp @@ -16,6 +16,7 @@ #include <string> #include <vector> #include <unordered_set> +#include <any> namespace ftl { namespace protocol { @@ -40,7 +41,16 @@ enum struct StreamProperty { kMaxBitrate, kAdaptiveBitrate, kObservers, - kURI + kURI, + kPaused, + kBytesSent, + kBytesReceived, + kLatency, + kFrameRate, + kName, + kDescription, + kTags, + kUser }; enum struct StreamType { @@ -173,9 +183,9 @@ class Stream { // TODO: Disable - virtual void setProperty(ftl::protocol::StreamProperty opt, int value)=0; + virtual void setProperty(ftl::protocol::StreamProperty opt, std::any value)=0; - virtual int getProperty(ftl::protocol::StreamProperty opt)=0; + virtual std::any getProperty(ftl::protocol::StreamProperty opt)=0; virtual bool supportsProperty(ftl::protocol::StreamProperty opt)=0; diff --git a/src/common_fwd.hpp b/src/common_fwd.hpp index 55f544222edc071388c89d981e261e5c73ecab0a..dbecf2f22f78d26fe7b7355d069c6ccf9c5fdb91 100644 --- a/src/common_fwd.hpp +++ b/src/common_fwd.hpp @@ -6,6 +6,8 @@ #pragma once +struct pollfd; + #ifndef WIN32 #define INVALID_SOCKET -1 #define SOCKET int diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index 038417d1f499006d2dabdc39f9d694a79ed35dd3..bd4f721d8ff13f6d630bb5b030ce191083abe29c 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -83,6 +83,7 @@ void ftl::net::Dispatcher::dispatch_call(Peer &s, const msgpack::object &msg) { s._sendResponse(id, name, result->get()); } catch (const std::exception &e) { throw FTL_Error("Exception when attempting to call RPC " << name << " (" << e.what() << ")"); + // FIXME: Send the error in the response. } } else { throw FTL_Error("No binding found for " << name); diff --git a/src/peer.cpp b/src/peer.cpp index 95f5b787980ebf9c86418c2e723b07035d386ba2..efe0af6af772c4b04deed22ec3ca7aaf635bb25c 100644 --- a/src/peer.cpp +++ b/src/peer.cpp @@ -55,7 +55,7 @@ bool Peer::isConnected() const { } bool Peer::isValid() const { - return sock_ && sock_->fd() != INVALID_SOCKET && ((status_ == NodeStatus::kConnected) || (status_ == NodeStatus::kConnecting)); + return sock_ && sock_->is_valid() && ((status_ == NodeStatus::kConnected) || (status_ == NodeStatus::kConnecting)); } void Peer::_set_socket_options() { diff --git a/src/peer.hpp b/src/peer.hpp index 2791559b9ee1639d9281c97f9d34d9b52525e93f..df7bae20238d3a96a5ba77feb5e32a3dc4d030bf 100644 --- a/src/peer.hpp +++ b/src/peer.hpp @@ -195,15 +195,21 @@ class Peer { inline unsigned int localID() const { return local_id_; } int connectionCount() const { return connection_count_; } + + /** + * @brief Call recv to get data. Internal use, it is blocking so should only + * be done if data is available. + * + */ + void data(); public: static const int kMaxMessage = 2*1024*1024; // 10Mb currently -protected: - void data(); // Process one message from socket +private: // Functions bool socketError(); // Process one error from socket void error(int e); - + // check if buffer has enough decoded data from lower layer and advance // buffer if necessary (skip headers etc). bool _has_next(); @@ -225,9 +231,6 @@ protected: * Universe (universe.cpp) */ int _socket() const; - - -private: // Functions void _send_handshake(); void _process_handshake(uint64_t magic, uint32_t version, UUID pid); diff --git a/src/protocol.cpp b/src/protocol.cpp index c0092db6e095883ad7a185f37948e79cca6f3c74..2a45913f94a6d88c3bf272628ad204c1002d80b0 100644 --- a/src/protocol.cpp +++ b/src/protocol.cpp @@ -1,6 +1,7 @@ #include <ftl/protocol.hpp> #include <ftl/protocol/self.hpp> #include "universe.hpp" +#include "rpc.hpp" static std::shared_ptr<ftl::net::Universe> universe; @@ -14,7 +15,10 @@ void ftl::protocol::reset() { ftl::UUID ftl::protocol::id; std::shared_ptr<ftl::protocol::Self> ftl::getSelf() { - if (!universe) universe = std::make_shared<ftl::net::Universe>(); + if (!universe) { + universe = std::make_shared<ftl::net::Universe>(); + ftl::rpc::install(universe.get()); + } return std::make_shared<ftl::protocol::Self>(universe); } @@ -22,6 +26,7 @@ std::shared_ptr<ftl::protocol::Self> ftl::createDummySelf() { ftl::UUID uuid; auto u = std::make_shared<ftl::net::Universe>(); u->setLocalID(uuid); + ftl::rpc::install(u.get()); return std::make_shared<ftl::protocol::Self>(u); } diff --git a/src/protocol/connection.hpp b/src/protocol/connection.hpp index a058f435c8d0164eecc6cfbbeba010e1aba73db7..5b320a63df7f21d68e8e915d9051ecc54bab4318 100644 --- a/src/protocol/connection.hpp +++ b/src/protocol/connection.hpp @@ -42,7 +42,7 @@ public: virtual bool is_valid(); // OS socket file descriptor - socket_t fd(); + virtual socket_t fd(); virtual ftl::URI uri(); virtual ftl::URI::scheme_t scheme() const; @@ -66,10 +66,10 @@ public: // scatter write, return number of bytes sent. always sends all data in iov. virtual ssize_t writev(const struct iovec *iov, int iovcnt); - bool set_recv_buffer_size(size_t sz); - bool set_send_buffer_size(size_t sz); - size_t get_recv_buffer_size(); - size_t get_send_buffer_size(); + virtual bool set_recv_buffer_size(size_t sz); + virtual bool set_send_buffer_size(size_t sz); + virtual size_t get_recv_buffer_size(); + virtual size_t get_send_buffer_size(); int getSocketError(); diff --git a/src/rpc.cpp b/src/rpc.cpp new file mode 100644 index 0000000000000000000000000000000000000000..ef4a6d183a684bad91bdbc2387b1534b9956ba3d --- /dev/null +++ b/src/rpc.cpp @@ -0,0 +1,7 @@ +#include "rpc.hpp" + +#include "streams/netstream.hpp" + +void ftl::rpc::install(ftl::net::Universe *net) { + ftl::protocol::Net::installRPC(net); +} diff --git a/src/rpc.hpp b/src/rpc.hpp new file mode 100644 index 0000000000000000000000000000000000000000..03519d067c8faf69b8641b27888cf939ae2214fc --- /dev/null +++ b/src/rpc.hpp @@ -0,0 +1,15 @@ +#pragma once + +#include <memory> + +namespace ftl { +namespace net { +class Universe; +} + +namespace rpc { + +void install(ftl::net::Universe *net); + +} +} diff --git a/src/streams/broadcaster.cpp b/src/streams/broadcaster.cpp index bc33e1c8f295212f73c3dc3c7ceffc6338ce61ec..96a017406a9ea199dd2fcbac6300c02d973a7fda 100644 --- a/src/streams/broadcaster.cpp +++ b/src/streams/broadcaster.cpp @@ -136,11 +136,11 @@ bool Broadcast::enable(FrameID id, const ftl::protocol::ChannelSet &channels) { return r; } -void Broadcast::setProperty(ftl::protocol::StreamProperty opt, int value) { +void Broadcast::setProperty(ftl::protocol::StreamProperty opt, std::any value) { } -int Broadcast::getProperty(ftl::protocol::StreamProperty opt) { +std::any Broadcast::getProperty(ftl::protocol::StreamProperty opt) { return 0; } diff --git a/src/streams/muxer.cpp b/src/streams/muxer.cpp index 3e7f592a9b231ae8ed69b1de180da0bbb87182f0..f790b12ce33564822428a365ca0c5abc9505edea 100644 --- a/src/streams/muxer.cpp +++ b/src/streams/muxer.cpp @@ -198,13 +198,13 @@ bool Muxer::enable(FrameID id, const ftl::protocol::ChannelSet &channels) { return r; } -void Muxer::setProperty(ftl::protocol::StreamProperty opt, int value) { +void Muxer::setProperty(ftl::protocol::StreamProperty opt, std::any value) { for (auto &s : streams_) { s.stream->setProperty(opt, value); } } -int Muxer::getProperty(ftl::protocol::StreamProperty opt) { +std::any Muxer::getProperty(ftl::protocol::StreamProperty opt) { for (auto &s : streams_) { if (s.stream->supportsProperty(opt)) return s.stream->getProperty(opt); } diff --git a/src/streams/netstream.cpp b/src/streams/netstream.cpp index 7292328bc7855b1506646b978ee76ba0c1f797c8..128eebcb2ac1c1fcf45a579e98996275b212697e 100644 --- a/src/streams/netstream.cpp +++ b/src/streams/netstream.cpp @@ -23,11 +23,10 @@ using ftl::protocol::FrameID; using ftl::protocol::Error; using ftl::protocol::kAllFrames; using ftl::protocol::kAllFramesets; +using ftl::protocol::StreamProperty; using std::string; using std::optional; -static constexpr int kFramesToRequest = 30; - std::atomic_size_t Net::req_bitrate__ = 0; std::atomic_size_t Net::tx_bitrate__ = 0; std::atomic_size_t Net::rx_sample_count__ = 0; @@ -36,66 +35,66 @@ int64_t Net::last_msg__ = 0; MUTEX Net::msg_mtx__; static std::list<std::string> net_streams; -static std::atomic_flag has_bindings = ATOMIC_FLAG_INIT; static SHARED_MUTEX stream_mutex; -Net::Net(const std::string &uri, ftl::net::Universe *net, bool host) : - active_(false), net_(net), clock_adjust_(0), last_ping_(0), uri_(uri), host_(host) { - - // First net stream needs to register these RPC handlers - //if (!has_bindings.test_and_set()) { - if (net_->isBound("find_stream")) net_->unbind("find_stream"); - net_->bind("find_stream", [net = net_](const std::string &uri) -> optional<ftl::UUID> { - DLOG(INFO) << "Request for stream: " << uri; - - ftl::URI u1(uri); - std::string base = u1.getBaseURI(); - - SHARED_LOCK(stream_mutex, lk); - for (const auto &s : net_streams) { - ftl::URI u2(s); - // Don't compare query string components. - if (base == u2.getBaseURI()) { - return net->id(); - } +void Net::installRPC(ftl::net::Universe *net) { + net->bind("find_stream", [net](const std::string &uri) -> optional<ftl::UUID> { + DLOG(INFO) << "Request for stream: " << uri; + + ftl::URI u1(uri); + std::string base = u1.getBaseURI(); + + SHARED_LOCK(stream_mutex, lk); + for (const auto &s : net_streams) { + ftl::URI u2(s); + // Don't compare query string components. + if (base == u2.getBaseURI()) { + return net->id(); } - return {}; - }); - - if (net_->isBound("list_streams")) net_->unbind("list_streams"); - net_->bind("list_streams", [this]() { - SHARED_LOCK(stream_mutex, lk); - return net_streams; - }); - //} - - last_frame_ = 0; - time_peer_ = ftl::UUID(0); - - //abr_ = new ftl::stream::AdaptiveBitrate(std::max(0, std::min(255, value("bitrate", 64)))); - - bitrate_ = 200; //abr_->current(); - //abr_->setMaxRate(static_cast<uint8_t>(std::max(0, std::min(255, value("max_bitrate", 200))))); - //on("bitrate", [this]() { - // abr_->setMaxRate(static_cast<uint8_t>(std::max(0, std::min(255, value("max_bitrate", 200))))); - //}); - - /*abr_enabled_ = value("abr_enabled", false); - on("abr_enabled", [this]() { - abr_enabled_ = value("abr_enabled", false); - bitrate_ = (abr_enabled_) ? - abr_->current() : - static_cast<uint8_t>(std::max(0, std::min(255, value("bitrate", 64)))); - tally_ = 0; - });*/ - - /*value("paused", false); - on("paused", [this]() { - paused_ = value("paused", false); - if (!paused_) { - reset(); } - });*/ + return {}; + }); + + net->bind("list_streams", []() { + SHARED_LOCK(stream_mutex, lk); + return net_streams; + }); + + net->bind("enable_stream", [](const std::string &uri, unsigned int fsid, unsigned int fid) { + // Nothing to do here, used by web service + }); + + net->bind("add_stream", [](const std::string &uri) { + // TODO: Trigger some callback + }); + + // TODO: Call "list_streams" to get all available locally + // This call should be done on any Peer connection + // and perhaps periodically +} + +Net::Net(const std::string &uri, ftl::net::Universe *net, bool host) : + net_(net), time_peer_(ftl::UUID(0)), uri_(uri), host_(host) { + + ftl::URI u(uri_); + if (!u.isValid() || !(u.getScheme() == ftl::URI::SCHEME_FTL)) { + error(Error::kBadURI, uri_); + throw FTL_Error("Bad stream URI"); + } + base_uri_ = u.getBaseURI(); + + if (host_) { + // Automatically set name + name_.resize(1024); + #ifdef WIN32 + DWORD size = name_.capacity(); + GetComputerName(name_.data(), &size); + #else + gethostname(name_.data(), name_.capacity()); + #endif + } else { + name_ = "No name"; + } } Net::~Net() { @@ -104,106 +103,151 @@ Net::~Net() { // FIXME: Wait to ensure no net callbacks are active. // Do something better than this std::this_thread::sleep_for(std::chrono::milliseconds(10)); - - //delete abr_; } bool Net::post(const StreamPacket &spkt, const Packet &pkt) { if (!active_) return false; + if (paused_) return true; bool hasStale = false; - // Lock to prevent clients being added / removed - { + // Cast to include msgpack methods + auto spkt_net = reinterpret_cast<const StreamPacketMSGPACK&>(spkt); + + // Version of packet without data but with msgpack methods + PacketMSGPACK pkt_strip; + pkt_strip.codec = pkt.codec; + pkt_strip.bitrate = pkt.bitrate; + pkt_strip.frame_count = pkt.frame_count; + pkt_strip.flags = pkt.flags; + + if (host_) { SHARED_LOCK(mutex_,lk); - //available(spkt.frameSetID()) += spkt.channel; - - // Map the frameset ID from a local one to a remote one - StreamPacketMSGPACK spkt_net = *((StreamPacketMSGPACK*)&spkt); - spkt_net.streamID = _localToRemoteFS(spkt.streamID); - - PacketMSGPACK pkt_strip; - pkt_strip.codec = pkt.codec; - pkt_strip.bitrate = pkt.bitrate; - pkt_strip.frame_count = pkt.frame_count; - pkt_strip.flags = pkt.flags; - - if (host_) { - auto c = clients_.begin(); - while (c != clients_.end()) { - auto &client = *c; - - // Strip packet data if channel is not wanted by client - const bool strip = int(spkt.channel) < 32 && pkt.data.size() > 0 && ((1 << int(spkt.channel)) & client.channels) == 0; - - try { - short pre_transmit_latency = short(ftl::time::get_time() - spkt.localTimestamp); - - if (!net_->send(client.peerid, - base_uri_, - pre_transmit_latency, // Time since timestamp for tx - spkt_net, - (strip) ? pkt_strip : *((PacketMSGPACK*)&pkt))) { - - // Send failed so mark as client stream completed - client.txcount = client.txmax; - } else { - if (!strip && pkt.data.size() > 0) _checkTXRate(pkt.data.size(), 0, spkt.timestamp); - - // Count frame as completed only if last block and channel is 0 - // FIXME: This is unreliable, colour might not exist etc. - if (spkt_net.streamID == 0 && spkt.frame_number == 0 && spkt.channel == Channel::kColour) ++client.txcount; - } - } catch(...) { - client.txcount = client.txmax; - } + for (auto &client : clients_) { + // Strip packet data if channel is not wanted by client + const bool strip = int(spkt.channel) < 32 && pkt.data.size() > 0 && ((1 << int(spkt.channel)) & client.channels) == 0; - if (client.txcount >= client.txmax) { - hasStale = true; - } - ++c; - } - } else { try { short pre_transmit_latency = short(ftl::time::get_time() - spkt.localTimestamp); - if (!net_->send(*peer_, + + // TODO: msgpack only once and broadcast. + // TODO: send in parallel and then wait on all futures? + // Or send non-blocking and wait + if (!net_->send(client.peerid, base_uri_, pre_transmit_latency, // Time since timestamp for tx spkt_net, - *((PacketMSGPACK*)&pkt))) { + (strip) ? pkt_strip : reinterpret_cast<const PacketMSGPACK&>(pkt))) { + // Send failed so mark as client stream completed + client.txcount = client.txmax; + } else { + if (!strip && pkt.data.size() > 0) _checkTXRate(pkt.data.size(), 0, spkt.timestamp); + + // Count frame as completed only if last block and channel is 0 + // FIXME: This is unreliable, colour might not exist etc. + if (spkt_net.streamID == 0 && spkt.frame_number == 0 && spkt.channel == Channel::kColour) ++client.txcount; } - if (pkt.data.size() > 0) _checkTXRate(pkt.data.size(), 0, spkt.timestamp); } catch(...) { - // TODO: Some disconnect error - return false; + client.txcount = client.txmax; + } + + if (client.txcount >= client.txmax) { + hasStale = true; + } + } + } else { + try { + short pre_transmit_latency = short(ftl::time::get_time() - spkt.localTimestamp); + + if (!net_->send(*peer_, + base_uri_, + pre_transmit_latency, // Time since timestamp for tx + spkt_net, + reinterpret_cast<const PacketMSGPACK&>(pkt))) { + } + if (pkt.data.size() > 0) _checkTXRate(pkt.data.size(), 0, spkt.timestamp); + } catch(...) { + // TODO: Some disconnect error + return false; } } if (hasStale) _cleanUp(); + hasPosted(spkt, pkt); + return true; } -uint32_t Net::_localToRemoteFS(uint32_t fsid) { - if (fsid == 255) return 255; - local_fsid_ = fsid; - return 0; +void Net::_processPacket(ftl::net::Peer *p, short ttimeoff, const StreamPacket &spkt_raw, const Packet &pkt) { + int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count(); + + if (!active_) return; + + StreamPacket spkt = spkt_raw; + spkt.localTimestamp = now - int64_t(ttimeoff); + spkt.hint_capability = 0; + spkt.hint_source_total = 0; + spkt.version = 4; + if (p) spkt.hint_peerid = p->localID(); + + FrameID localFrame(spkt.streamID, spkt.frame_number); + + seen(localFrame, spkt.channel); + + if (paused_) return; + + // Manage recuring requests + if (!host_ && last_frame_ != spkt.timestamp) { + UNIQUE_LOCK(mutex_, lk); + if (last_frame_ != spkt.timestamp) { + //int tc = now - last_completion_; // Milliseconds since last frame completed + frame_time_ = spkt.timestamp - last_frame_; // Milliseconds per frame + last_completion_ = now; + bytes_received_ = 0; + last_frame_ = spkt.timestamp; + + lk.unlock(); + + // Are we close to reaching the end of our frames request? + if (tally_ <= 5) { + // Yes, so send new requests + // FIXME: Do this for all frames, or use tally be frame + //for (size_t i = 0; i < size(); ++i) { + const auto &sel = enabledChannels(localFrame); + + for (auto c : sel) { + _sendRequest(c, localFrame.frameset(), kAllFrames, frames_to_request_, 255); + } + //} + tally_ = frames_to_request_; + } else { + --tally_; + } + } + } + + bytes_received_ += pkt.data.size(); + //time_at_last_ = now; + + // If hosting and no data then it is a request for data + // Note: a non host can receive empty data, meaning data is available + // but that you did not request it + if (host_ && pkt.data.size() == 0 && (spkt.flags & ftl::protocol::kFlagRequest)) { + _processRequest(p, spkt, pkt); + } + + trigger(spkt, pkt); + if (pkt.data.size() > 0) _checkRXRate(pkt.data.size(), now-(spkt.timestamp+ttimeoff), spkt.timestamp); } -uint32_t Net::_remoteToLocalFS(uint32_t fsid) { - return local_fsid_; //(fsid == 255) ? 255 : local_fsid_; +void Net::inject(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::Packet &pkt) { + _processPacket(nullptr, 0, spkt, pkt); } bool Net::begin() { if (active_) return true; - //if (!get<string>("uri")) return false; - - //uri_ = *get<string>("uri"); - - ftl::URI u(uri_); - if (!u.isValid() || !(u.getScheme() == ftl::URI::SCHEME_FTL)) return false; - base_uri_ = u.getBaseURI(); if (net_->isBound(base_uri_)) { error(Error::kURIAlreadyExists, std::string("Stream already exists: ") + uri_); @@ -211,129 +255,27 @@ bool Net::begin() { return false; } + // FIXME: Potential race between above check and new binding + // Add the RPC handler for the URI net_->bind(base_uri_, [this](ftl::net::Peer &p, short ttimeoff, const StreamPacketMSGPACK &spkt_raw, const PacketMSGPACK &pkt) { - int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count(); - - if (!active_) return; - if (paused_) return; - - StreamPacket spkt = spkt_raw; - spkt.localTimestamp = now - int64_t(ttimeoff); - spkt.hint_capability = 0; - spkt.hint_source_total = 0; - spkt.version = 4; - spkt.hint_peerid = p.localID(); - // Map remote frameset ID to a local one. - spkt.streamID = _remoteToLocalFS(spkt.streamID); - - FrameID localFrame(spkt.streamID, spkt.frame_number); - - seen(localFrame, spkt.channel); - - // Manage recuring requests - if (!host_ && last_frame_ != spkt.timestamp) { - UNIQUE_LOCK(mutex_, lk); - if (last_frame_ != spkt.timestamp) { - int tf = spkt.timestamp - last_frame_; // Milliseconds per frame - int tc = now - last_completion_; // Milliseconds since last frame completed - last_completion_ = now; - bytes_received_ = 0; - last_frame_ = spkt.timestamp; - - lk.unlock(); - - // Apply adaptive bitrate adjustment if needed - /*if (abr_enabled_) { - int new_bitrate = abr_->adjustment(tf, tc, pkt.bitrate); - if (new_bitrate != bitrate_) { - bitrate_ = new_bitrate; - tally_ = 0; // Force request send - } - }*/ - - /*if (size() > spkt.frameSetID()) { - auto sel = selected(spkt.frameSetID()); - - // A change in channel selections, so send those requests now - if (sel != last_selected_) { - auto changed = sel - last_selected_; - last_selected_ = sel; - - for (auto c : changed) { - _sendRequest(c, spkt.frameSetID(), kAllFrames, kFramesToRequest, 255); - } - } - }*/ - - // Are we close to reaching the end of our frames request? - if (tally_ <= 5) { - // Yes, so send new requests - //for (size_t i = 0; i < size(); ++i) { - const auto &sel = enabledChannels(localFrame); - - for (auto c : sel) { - _sendRequest(c, localFrame.frameset(), kAllFrames, kFramesToRequest, 255); - } - //} - tally_ = kFramesToRequest; - } else { - --tally_; - } - } - } - - bytes_received_ += pkt.data.size(); - //time_at_last_ = now; - - // If hosting and no data then it is a request for data - // Note: a non host can receive empty data, meaning data is available - // but that you did not request it - if (host_ && pkt.data.size() == 0 && (spkt.flags & ftl::protocol::kFlagRequest)) { - _processRequest(p, spkt, pkt); - } else { - // FIXME: Allow availability to change... - //available(spkt.frameSetID()) += spkt.channel; - } - - trigger(spkt, pkt); - if (pkt.data.size() > 0) _checkRXRate(pkt.data.size(), now-(spkt.timestamp+ttimeoff), spkt.timestamp); + _processPacket(&p, ttimeoff, spkt_raw, pkt); }); if (host_) { DLOG(INFO) << "Hosting stream: " << uri_; - // Alias the URI to the configurable if not already - // Allows the URI to be used to get config data. - /*if (ftl::config::find(uri_) == nullptr) { - ftl::config::alias(uri_, this); - }*/ - { // Add to list of available streams UNIQUE_LOCK(stream_mutex, lk); net_streams.push_back(uri_); } - // Automatically set name if missing - //if (!get<std::string>("name")) { - char hostname[1024] = {0}; - #ifdef WIN32 - DWORD size = 1024; - GetComputerName(hostname, &size); - #else - gethostname(hostname, 1024); - #endif - - //set("name", std::string(hostname)); - //} - net_->broadcast("add_stream", uri_); active_ = true; } else { - - tally_ = kFramesToRequest; + tally_ = frames_to_request_; active_ = true; } @@ -349,10 +291,10 @@ void Net::refresh() { auto sel = enabledChannels(i); for (auto c : sel) { - _sendRequest(c, i.frameset(), kAllFrames, kFramesToRequest, 255, true); + _sendRequest(c, i.frameset(), kAllFrames, frames_to_request_, 255, true); } } - tally_ = kFramesToRequest; + tally_ = frames_to_request_; } void Net::reset() { @@ -439,6 +381,7 @@ bool Net::_sendRequest(Channel c, uint8_t frameset, uint8_t frames, uint8_t coun }; net_->send(*peer_, base_uri_, (short)0, spkt, pkt); + hasPosted(spkt, pkt); return true; } @@ -460,15 +403,15 @@ void Net::_cleanUp() { * batches (max 255 unique frames by timestamp). Requests are in the form * of packets that match the request except the data component is empty. */ -bool Net::_processRequest(ftl::net::Peer &p, StreamPacket &spkt, const Packet &pkt) { +bool Net::_processRequest(ftl::net::Peer *p, StreamPacket &spkt, const Packet &pkt) { bool found = false; DLOG(INFO) << "processing request: " << int(spkt.streamID) << ", " << int(spkt.channel); - { + if (p) { SHARED_LOCK(mutex_,lk); // Does the client already exist for (auto &c : clients_) { - if (c.peerid == p.id()) { + if (c.peerid == p->id()) { // Yes, so reset internal request counters c.txcount = 0; c.txmax = static_cast<int>(pkt.frame_count); @@ -480,12 +423,12 @@ bool Net::_processRequest(ftl::net::Peer &p, StreamPacket &spkt, const Packet &p } // No existing client, so add a new one. - if (!found) { + if (p && !found) { { UNIQUE_LOCK(mutex_,lk); auto &client = clients_.emplace_back(); - client.peerid = p.id(); + client.peerid = p->id(); client.quality = 255; // TODO: Use quality given in packet client.txcount = 0; client.txmax = static_cast<int>(pkt.frame_count); @@ -495,7 +438,7 @@ bool Net::_processRequest(ftl::net::Peer &p, StreamPacket &spkt, const Packet &p spkt.hint_capability |= ftl::protocol::kStreamCap_NewConnection; try { - connect_cb_.trigger(&p); + connect_cb_.trigger(p); } catch (const ftl::exception &e) { LOG(ERROR) << "Exception in stream connect callback: " << e.what(); } @@ -553,16 +496,52 @@ bool Net::active() { return active_; } -void Net::setProperty(ftl::protocol::StreamProperty opt, int value) { - +void Net::setProperty(ftl::protocol::StreamProperty opt, std::any value) { + switch (opt) { + case StreamProperty::kBitrate : + case StreamProperty::kMaxBitrate : bitrate_ = std::any_cast<int>(value); break; + case StreamProperty::kPaused : paused_ = std::any_cast<bool>(value); break; + case StreamProperty::kName : name_ = std::any_cast<std::string>(value); break; + case StreamProperty::kObservers : + case StreamProperty::kBytesSent : + case StreamProperty::kBytesReceived : + case StreamProperty::kLatency : + case StreamProperty::kFrameRate : + case StreamProperty::kURI : throw FTL_Error("Readonly property"); + default : throw FTL_Error("Unsupported property"); + } } -int Net::getProperty(ftl::protocol::StreamProperty opt) { - return 0; +std::any Net::getProperty(ftl::protocol::StreamProperty opt) { + switch (opt) { + case StreamProperty::kBitrate : + case StreamProperty::kMaxBitrate : return bitrate_; + case StreamProperty::kObservers : return clients_.size(); + case StreamProperty::kURI : return base_uri_; + case StreamProperty::kPaused : return paused_; + case StreamProperty::kBytesSent : return 0; + case StreamProperty::kBytesReceived : return int64_t(bytes_received_); + case StreamProperty::kFrameRate : return (frame_time_ > 0) ? 1000 / frame_time_ : 0; + case StreamProperty::kLatency : return 0; + case StreamProperty::kName : return name_; + default : throw FTL_Error("Unsupported property"); + } } bool Net::supportsProperty(ftl::protocol::StreamProperty opt) { - return false; + switch (opt) { + case StreamProperty::kBitrate : + case StreamProperty::kMaxBitrate : + case StreamProperty::kObservers : + case StreamProperty::kPaused : + case StreamProperty::kBytesSent : + case StreamProperty::kBytesReceived : + case StreamProperty::kLatency : + case StreamProperty::kFrameRate : + case StreamProperty::kName : + case StreamProperty::kURI : return true; + default : return false; + } } ftl::protocol::StreamType Net::type() const { diff --git a/src/streams/netstream.hpp b/src/streams/netstream.hpp index 13430ff028832174943085b7ac67c39e0a275b39..d311f9d332962dc47d3c9fa3b7e050709fb6398f 100644 --- a/src/streams/netstream.hpp +++ b/src/streams/netstream.hpp @@ -10,8 +10,6 @@ namespace ftl { namespace protocol { -class AdaptiveBitrate; - namespace detail { struct StreamClient { ftl::UUID peerid; @@ -40,7 +38,7 @@ struct NetStats { class Net : public Stream { public: Net(const std::string &uri, ftl::net::Universe *net, bool host=false); - ~Net(); + virtual ~Net(); bool post(const ftl::protocol::StreamPacket &, const ftl::protocol::Packet &) override; @@ -55,8 +53,8 @@ class Net : public Stream { void reset() override; void refresh() override; - void setProperty(ftl::protocol::StreamProperty opt, int value) override; - int getProperty(ftl::protocol::StreamProperty opt) override; + void setProperty(ftl::protocol::StreamProperty opt, std::any value) override; + std::any getProperty(ftl::protocol::StreamProperty opt) override; bool supportsProperty(ftl::protocol::StreamProperty opt) override; StreamType type() const override; @@ -74,31 +72,39 @@ class Net : public Stream { */ static NetStats getStatistics(); + static void installRPC(ftl::net::Universe *net); + + static constexpr int kFramesToRequest = 30; + + // Unit test support + virtual void hasPosted(const ftl::protocol::StreamPacket &, const ftl::protocol::Packet &) {} + void inject(const ftl::protocol::StreamPacket &, const ftl::protocol::Packet &); + private: SHARED_MUTEX mutex_; - bool active_; + bool active_ = false; ftl::net::Universe *net_; - int64_t clock_adjust_; + int64_t clock_adjust_ = 0; ftl::UUID time_peer_; std::optional<ftl::UUID> peer_; - int64_t last_frame_; - int64_t last_ping_; + int64_t last_frame_ = 0; + int64_t last_ping_ = 0; + int64_t frame_time_ = 0; std::string uri_; std::string base_uri_; const bool host_; - int tally_; + int tally_ = 0; std::array<std::atomic<int>,32> reqtally_ = {0}; ftl::protocol::ChannelSet last_selected_; - uint8_t bitrate_=255; + uint8_t bitrate_ = 200; std::atomic_int64_t bytes_received_ = 0; int64_t last_completion_ = 0; int64_t time_at_last_ = 0; - float required_bps_; - float actual_bps_; - bool abr_enabled_; + float required_bps_ = 0.0f; + float actual_bps_ = 0.0f; bool paused_ = false; - - AdaptiveBitrate *abr_; + int frames_to_request_ = kFramesToRequest; + std::string name_; ftl::Handler<ftl::net::Peer*> connect_cb_; @@ -114,13 +120,12 @@ private: std::list<detail::StreamClient> clients_; bool _enable(FrameID id); - bool _processRequest(ftl::net::Peer &p, ftl::protocol::StreamPacket &spkt, const ftl::protocol::Packet &pkt); + bool _processRequest(ftl::net::Peer *p, ftl::protocol::StreamPacket &spkt, const ftl::protocol::Packet &pkt); void _checkRXRate(size_t rx_size, int64_t rx_latency, int64_t ts); void _checkTXRate(size_t tx_size, int64_t tx_latency, int64_t ts); bool _sendRequest(ftl::protocol::Channel c, uint8_t frameset, uint8_t frames, uint8_t count, uint8_t bitrate, bool doreset=false); void _cleanUp(); - uint32_t _localToRemoteFS(uint32_t fsid); - uint32_t _remoteToLocalFS(uint32_t fsid); + void _processPacket(ftl::net::Peer *p, short ttimeoff, const StreamPacket &spkt_raw, const Packet &pkt); }; } diff --git a/src/streams/packetMsgpack.hpp b/src/streams/packetMsgpack.hpp index 673ed99eb2133e4f86dfe78aa31df1280b6e11a6..79f577cb8c56cc5c588d1393a054f27348331a2c 100644 --- a/src/streams/packetMsgpack.hpp +++ b/src/streams/packetMsgpack.hpp @@ -17,5 +17,8 @@ struct PacketMSGPACK : ftl::protocol::Packet { MSGPACK_DEFINE(codec, reserved, frame_count, bitrate, flags, data); }; +static_assert(sizeof(StreamPacketMSGPACK) == sizeof(StreamPacket)); +static_assert(sizeof(PacketMSGPACK) == sizeof(Packet)); + } } diff --git a/src/universe.cpp b/src/universe.cpp index 5b5ad3185e08ce5a89a33aa4f7107ba1b54aebbd..ba6fc3b856e24d86fa893c2101ddf1fcbf31c1d5 100644 --- a/src/universe.cpp +++ b/src/universe.cpp @@ -600,6 +600,13 @@ ftl::Handle Universe::onError(const std::function<bool(const PeerPtr&, ftl::prot return on_error_.on(cb); } +PeerPtr Universe::injectFakePeer(std::unique_ptr<ftl::net::internal::SocketConnection> s) { + auto p = std::make_shared<Peer>(std::move(s), this, &disp_); + _insertPeer(p); + _installBindings(p); + return p; +} + PeerPtr Universe::_findPeer(const Peer *p) { SHARED_LOCK(net_mutex_,lk); for (const auto &pp : peers_) { diff --git a/src/universe.hpp b/src/universe.hpp index 180eabf044b2d9eb614da8e142e2afd4f97495df..439c8d724a33336e0638015a10a9ae20fba2d9e2 100644 --- a/src/universe.hpp +++ b/src/universe.hpp @@ -162,7 +162,7 @@ public: void setLocalID(const ftl::UUID &u) { this_peer = u; }; const ftl::UUID &id() const { return this_peer; } - // --- Event Handlers ------------------------------------------------------ + // --- Event Handlers ----------------------------------------------------- ftl::Handle onConnect(const std::function<bool(const ftl::net::PeerPtr&)>&); ftl::Handle onDisconnect(const std::function<bool(const ftl::net::PeerPtr&)>&); @@ -172,6 +172,10 @@ public: size_t getRecvBufferSize(ftl::URI::scheme_t s); static inline std::shared_ptr<Universe> getInstance() { return instance_; } + + // --- Test support ------------------------------------------------------- + + PeerPtr injectFakePeer(std::unique_ptr<ftl::net::internal::SocketConnection> s); private: void _run(); diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index c7dc751655199aa9eff06a392292cf2aa44d300d..824c7ef88ae6ca1f66ac2ab8c8538d05a3a14ece 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -83,6 +83,18 @@ target_link_libraries(stream_integration add_test(StreamIntegrationTest stream_integration) +### Net Stream Unit ############################################################ +add_executable(netstream_unit + $<TARGET_OBJECTS:CatchTestFTL> + ./netstream_unit.cpp + ./mocks/connection.cpp +) +target_include_directories(netstream_unit PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../include") +target_link_libraries(netstream_unit + beyond-protocol GnuTLS::GnuTLS Threads::Threads ${URIPARSER_LIBRARIES} ${UUID_LIBRARIES} ${OS_LIBS}) + +add_test(NetStreamTest netstream_unit) + ### Webservice E2E ############################################################# add_executable(webservice_e2e $<TARGET_OBJECTS:CatchTestFTL> @@ -92,4 +104,18 @@ target_include_directories(webservice_e2e PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/.. target_link_libraries(webservice_e2e beyond-protocol GnuTLS::GnuTLS Threads::Threads ${URIPARSER_LIBRARIES} ${UUID_LIBRARIES} ${OS_LIBS}) -add_test(WebserviceE2ETest webservice_e2e) \ No newline at end of file +add_test(WebserviceE2ETest webservice_e2e) + +### Peer Unit ################################################################## +add_executable(peer_unit + $<TARGET_OBJECTS:CatchTest> + #./socket_mock.cpp + ./peer_unit.cpp + ./mocks/connection.cpp +) +target_compile_definitions(peer_unit PUBLIC MOCK_UNIVERSE) +target_include_directories(peer_unit PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../include" "${CMAKE_CURRENT_SOURCE_DIR}/../src") +target_link_libraries(peer_unit + beyond-protocol GnuTLS::GnuTLS Threads::Threads ${URIPARSER_LIBRARIES} ${UUID_LIBRARIES} ${OS_LIBS}) + +add_test(PeerUnitTest peer_unit) \ No newline at end of file diff --git a/test/broadcast_unit.cpp b/test/broadcast_unit.cpp index b35ac6da8ad88ba20ba6b1e7e49c227447e086a6..77a587e1f1df1e1067791c6e06ebb5128d425828 100644 --- a/test/broadcast_unit.cpp +++ b/test/broadcast_unit.cpp @@ -14,10 +14,10 @@ using ftl::protocol::Channel; using ftl::protocol::ChannelSet; using ftl::protocol::FrameID; -class TestStream : public ftl::protocol::Stream { +class BTestStream : public ftl::protocol::Stream { public: - TestStream() {}; - ~TestStream() {}; + BTestStream() {}; + ~BTestStream() {}; bool post(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::Packet &pkt) { seen(FrameID(spkt.streamID, spkt.frame_number), spkt.channel); @@ -29,9 +29,9 @@ class TestStream : public ftl::protocol::Stream { bool end() override { return true; } bool active() override { return true; } - void setProperty(ftl::protocol::StreamProperty opt, int value) override {} + void setProperty(ftl::protocol::StreamProperty opt, std::any value) override {} - int getProperty(ftl::protocol::StreamProperty opt) override { return 0; } + std::any getProperty(ftl::protocol::StreamProperty opt) override { return 0; } bool supportsProperty(ftl::protocol::StreamProperty opt) override { return true; } @@ -45,9 +45,9 @@ TEST_CASE("ftl::stream::Broadcast()::write", "[stream]") { REQUIRE(mux); SECTION("write with two streams") { - std::shared_ptr<Stream> s1 = std::make_shared<TestStream>(); + std::shared_ptr<Stream> s1 = std::make_shared<BTestStream>(); REQUIRE(s1); - std::shared_ptr<Stream> s2 = std::make_shared<TestStream>(); + std::shared_ptr<Stream> s2 = std::make_shared<BTestStream>(); REQUIRE(s2); mux->add(s1); @@ -76,9 +76,9 @@ TEST_CASE("Broadcast enable", "[stream]") { std::unique_ptr<Broadcast> mux = std::make_unique<Broadcast>(); REQUIRE(mux); - std::shared_ptr<TestStream> s1 = std::make_shared<TestStream>(); + std::shared_ptr<BTestStream> s1 = std::make_shared<BTestStream>(); REQUIRE(s1); - std::shared_ptr<TestStream> s2 = std::make_shared<TestStream>(); + std::shared_ptr<BTestStream> s2 = std::make_shared<BTestStream>(); REQUIRE(s2); mux->add(s1); diff --git a/test/mocks/connection.cpp b/test/mocks/connection.cpp new file mode 100644 index 0000000000000000000000000000000000000000..c47bc7612edb1d77cfdf95188981d6df27dc174e --- /dev/null +++ b/test/mocks/connection.cpp @@ -0,0 +1,111 @@ +#include "connection.hpp" + +#include "../../src/socket.hpp" +#include "../../src/universe.hpp" +#include "../../src/protocol/connection.hpp" +#include <ftl/protocol/self.hpp> +#include <chrono> + +using ftl::net::internal::Socket; +using ftl::net::internal::SocketConnection; +using std::this_thread::sleep_for; +using std::chrono::milliseconds; + +// Mock connection, reads/writes from fakedata +// TODO: use separate in/out data +std::map<int, std::string> fakedata; + +class Connection_Mock : public SocketConnection { +public: + const int id_; + bool valid_ = true; + explicit Connection_Mock(int id) : SocketConnection(), id_(id) { + + } + + void connect(const ftl::URI&, int) override {} + + bool is_valid() override { return valid_; } + + bool close() override { valid_ = false; return true; } + + SOCKET fd() override { return -1; } + + ssize_t send(const char* buffer, size_t len) override { + fakedata[id_] += std::string(buffer, len); + return len; + } + + ssize_t recv(char *buffer, size_t len) override { + if (fakedata.count(id_) == 0) { + // this is an error in test + std::cout << "unrecognised socket, test error (FIXME)" << std::endl; + return 0; + } + + size_t l = fakedata[id_].size(); + CHECK(l <= len); // FIXME: buffer overflow + std::memcpy(buffer, fakedata[id_].c_str(), l); + + fakedata.erase(id_); + + return l; + } + + ssize_t writev(const struct iovec *iov, int iovcnt) override { + size_t sent = 0; + std::stringstream ss; + for (int i = 0; i < iovcnt; i++) { + ss << std::string((char*)(iov[i].iov_base), size_t(iov[i].iov_len)); + sent += iov[i].iov_len; + } + fakedata[id_] += ss.str(); + return sent; + } + + bool set_recv_buffer_size(size_t sz) override { return true; } + bool set_send_buffer_size(size_t sz) override { return true; } + size_t get_recv_buffer_size() override { return 1024; } + size_t get_send_buffer_size() override { return 1024; } +}; + +ftl::net::PeerPtr createMockPeer(int c) { + ftl::net::Universe *u = ftl::getSelf()->getUniverse(); + std::unique_ptr<ftl::net::internal::SocketConnection> conn = std::make_unique<Connection_Mock>(c); + return u->injectFakePeer(std::move(conn)); +} + +void send_handshake(ftl::net::Peer &p) { + ftl::UUID id; + p.send("__handshake__", ftl::net::kMagic, ((8 << 16) + (5 << 8) + 2), id); +} + +void provideResponses(const ftl::net::PeerPtr &p, int c, const std::vector<std::tuple<bool,std::string,msgpack::object>> &responses) { + for (const auto &response : responses) { + auto [notif,expname,resdata] = response; + while (fakedata[c].size() == 0) std::this_thread::sleep_for(std::chrono::milliseconds(20)); + std::string name; + int id; + if (notif) { + auto [t,n,v] = readNotifFull<msgpack::object>(c); + name = n; + id = -1; + } else { + auto [t,i,n,v] = readRPCFull<msgpack::object>(c); + name = n; + id = i; + } + + if (name != expname) return; + if (!notif) { + auto res_obj = std::make_tuple(1,id,name, resdata); + std::stringstream buf; + msgpack::pack(buf, res_obj); + fakedata[c] = buf.str(); + p->data(); + sleep_for(milliseconds(50)); + } else { + fakedata[c] = ""; + } + } +} diff --git a/test/mocks/connection.hpp b/test/mocks/connection.hpp new file mode 100644 index 0000000000000000000000000000000000000000..5700c8eccb10af0f2a1a8891e729ec9c50208fa1 --- /dev/null +++ b/test/mocks/connection.hpp @@ -0,0 +1,67 @@ +#pragma once + +#include <map> +#include <string> +#include "../../src/peer.hpp" + +ftl::net::PeerPtr createMockPeer(int c); + +extern std::map<int, std::string> fakedata; + +void send_handshake(ftl::net::Peer &p); + +template <typename ARG> +msgpack::object packResponse(msgpack::zone &z, const ARG &arg) { + return msgpack::object(arg, z); +} + +void provideResponses(const ftl::net::PeerPtr &p, int c, const std::vector<std::tuple<bool,std::string,msgpack::object>> &responses); + +template <typename T> +void writeNotification(int c, const std::string &name, const T &value) { + auto res_obj = std::make_tuple(2,name,value); + std::stringstream buf; + msgpack::pack(buf, res_obj); + fakedata[c] = buf.str(); +} + +template <typename T> +std::tuple<std::string, T> readResponse(int s) { + msgpack::object_handle msg = msgpack::unpack(fakedata[s].data(), fakedata[s].size()); + std::tuple<uint8_t, std::string, T> req; + msg.get().convert(req); + return std::make_tuple(std::get<1>(req), std::get<2>(req)); +} + +template <typename T> +std::tuple<uint32_t, T> readRPC(int s) { + msgpack::object_handle msg = msgpack::unpack(fakedata[s].data(), fakedata[s].size()); + std::tuple<uint8_t, uint32_t, std::string, T> req; + msg.get().convert(req); + return std::make_tuple(std::get<1>(req), std::get<3>(req)); +} + +template <typename T> +std::tuple<uint8_t, uint32_t, std::string, T> readRPCFull(int s) { + msgpack::object_handle msg = msgpack::unpack(fakedata[s].data(), fakedata[s].size()); + std::tuple<uint8_t, uint32_t, std::string, T> req; + msg.get().convert(req); + return req; +} + +template <typename T> +std::tuple<uint8_t, std::string, T> readNotifFull(int s) { + msgpack::object_handle msg = msgpack::unpack(fakedata[s].data(), fakedata[s].size()); + std::tuple<uint8_t, std::string, T> req; + msg.get().convert(req); + return req; +} + +template <typename T> +T readRPCReturn(int s) { + msgpack::object_handle msg = msgpack::unpack(fakedata[s].data(), fakedata[s].size()); + std::tuple<uint8_t, uint32_t, std::string, T> req; + msg.get().convert(req); + return std::get<3>(req); +} + diff --git a/test/muxer_unit.cpp b/test/muxer_unit.cpp index bac6374cff620be068102a28671ad8995d5ff483..4e099fda5f687c1fcb09b1009db36c3de5df8e0c 100644 --- a/test/muxer_unit.cpp +++ b/test/muxer_unit.cpp @@ -29,9 +29,9 @@ class TestStream : public ftl::protocol::Stream { bool end() override { return true; } bool active() override { return true; } - void setProperty(ftl::protocol::StreamProperty opt, int value) override {} + void setProperty(ftl::protocol::StreamProperty opt, std::any value) override {} - int getProperty(ftl::protocol::StreamProperty opt) override { return 0; } + std::any getProperty(ftl::protocol::StreamProperty opt) override { return 0; } bool supportsProperty(ftl::protocol::StreamProperty opt) override { return true; } diff --git a/test/netstream_unit.cpp b/test/netstream_unit.cpp new file mode 100644 index 0000000000000000000000000000000000000000..5cd782c2a4d0d3d5246a87c939cc7fe80904de00 --- /dev/null +++ b/test/netstream_unit.cpp @@ -0,0 +1,185 @@ +#include "catch.hpp" +#include <ftl/protocol.hpp> +#include <ftl/protocol/self.hpp> +#include <ftl/protocol/streams.hpp> +#include <ftl/uri.hpp> +#include <ftl/exception.hpp> +#include <ftl/protocol/node.hpp> +#include <thread> +#include <chrono> + +#include "../src/streams/netstream.hpp" +#include "../src/streams/packetMsgpack.hpp" +#include "mocks/connection.hpp" + +using ftl::protocol::FrameID; +using ftl::protocol::StreamProperty; +using ftl::protocol::StreamPacket; +using ftl::protocol::Packet; +using ftl::protocol::Channel; +using std::this_thread::sleep_for; +using std::chrono::milliseconds; + +// --- Mock -------------------------------------------------------------------- + +class MockNetStream : public ftl::protocol::Net { + public: + MockNetStream(const std::string &uri, ftl::net::Universe *net, bool host=false): Net(uri, net, host) {}; + + void hasPosted(const StreamPacket &spkt, const Packet &pkt) override { + lastSpkt = spkt; + } + + void forceSeen(FrameID id, Channel channel) { + seen(id, channel); + } + + StreamPacket lastSpkt; +}; + +// --- Tests ------------------------------------------------------------------- + +TEST_CASE("Net stream options") { + SECTION("can get correct URI") { + auto s1 = ftl::createStream("ftl://mystream?opt=none"); + REQUIRE( s1 ); + REQUIRE( s1->begin() ); + + REQUIRE( std::any_cast<std::string>(s1->getProperty(StreamProperty::kURI)) == "ftl://mystream" ); + } + + SECTION("can get a name") { + auto s1 = ftl::createStream("ftl://mystream?opt=none"); + REQUIRE( s1 ); + REQUIRE( std::any_cast<std::string>(s1->getProperty(StreamProperty::kName)).size() > 0 ); + } + + SECTION("can pause the stream") { + auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), true); + REQUIRE( s1->begin() ); + + StreamPacket spkt; + spkt.timestamp = 100; + spkt.streamID = 1; + spkt.frame_number = 2; + spkt.channel = Channel::kColour; + + Packet pkt; + pkt.frame_count = 1; + + s1->lastSpkt.timestamp = 0; + REQUIRE( s1->post(spkt, pkt) ); + REQUIRE( s1->lastSpkt.timestamp == 100 ); + + s1->setProperty(StreamProperty::kPaused, true); + + spkt.timestamp = 200; + REQUIRE( s1->post(spkt, pkt) ); + REQUIRE( s1->lastSpkt.timestamp == 100 ); + REQUIRE( std::any_cast<bool>(s1->getProperty(StreamProperty::kPaused)) ); + } +} + +TEST_CASE("Net stream sending requests") { + auto p = createMockPeer(0); + fakedata[0] = ""; + send_handshake(*p.get()); + p->data(); + sleep_for(milliseconds(50)); + + SECTION("cannot enable if not seen") { + auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), false); + REQUIRE( s1->begin() ); + REQUIRE( !s1->enable(FrameID(1, 1), Channel::kDepth)); + } + + SECTION("sends request on enable") { + auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), false); + + // Thread to provide response to otherwise blocking call + std::thread thr([&p]() { + auto z = std::make_unique<msgpack::zone>(); + provideResponses(p, 0, { + {false, "find_stream", packResponse(*z, p->id())}, + {true, "enable_stream", {}}, + }); + }); + + REQUIRE( s1->begin() ); + + s1->forceSeen(FrameID(1, 1), Channel::kDepth); + s1->lastSpkt.channel = Channel::kNone; + REQUIRE( s1->enable(FrameID(1, 1), Channel::kDepth)); + + thr.join(); + + REQUIRE( s1->lastSpkt.streamID == 1 ); + REQUIRE( int(s1->lastSpkt.frame_number) == 255 ); // TODO: update when this is fixed + REQUIRE( s1->lastSpkt.channel == Channel::kDepth ); + REQUIRE( (s1->lastSpkt.flags & ftl::protocol::kFlagRequest) > 0 ); + } + + SECTION("responds to requests") { + auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), true); + + REQUIRE( s1->begin() ); + + bool seenReq = false; + + auto h = s1->onRequest([&seenReq](const ftl::protocol::Request &req) { + seenReq = true; + return true; + }); + + ftl::protocol::StreamPacketMSGPACK spkt; + ftl::protocol::PacketMSGPACK pkt; + spkt.streamID = 1; + spkt.frame_number = 1; + spkt.channel = Channel::kColour; + spkt.flags = ftl::protocol::kFlagRequest; + writeNotification(0, "ftl://mystream", std::make_tuple(0, spkt, pkt)); + p->data(); + + sleep_for(milliseconds(50)); + REQUIRE( seenReq ); + } + + p.reset(); + ftl::protocol::reset(); +} + +TEST_CASE("Net stream can see received data") { + auto p = createMockPeer(0); + fakedata[0] = ""; + send_handshake(*p.get()); + p->data(); + sleep_for(milliseconds(50)); + + SECTION("available if packet is seen") { + auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), true); + + REQUIRE( s1->begin() ); + + bool seenReq = false; + + auto h = s1->onAvailable([&seenReq](FrameID id, Channel channel) { + seenReq = true; + return true; + }); + + ftl::protocol::StreamPacketMSGPACK spkt; + ftl::protocol::PacketMSGPACK pkt; + spkt.streamID = 1; + spkt.frame_number = 1; + spkt.channel = Channel::kColour; + writeNotification(0, "ftl://mystream", std::make_tuple(0, spkt, pkt)); + p->data(); + + sleep_for(milliseconds(50)); + REQUIRE( seenReq ); + REQUIRE( s1->available(FrameID(1, 1), Channel::kColour) ); + } + + p.reset(); + ftl::protocol::reset(); +} diff --git a/test/peer_unit.cpp b/test/peer_unit.cpp index e0407614779ba501601284b470c08d9d48f764c9..219f47c7ccfa0173e2ae28ceabe716f9dd4a5b5c 100644 --- a/test/peer_unit.cpp +++ b/test/peer_unit.cpp @@ -12,15 +12,14 @@ #include <functional> #include <sstream> -#include <ftl/net/common.hpp> -#include <ftl/net/peer.hpp> -#include <ftl/net/protocol.hpp> -#include <ftl/config.h> +#include <peer.hpp> +#include <protocol.hpp> +#include <ftl/protocol.hpp> +#include <ftl/protocol/error.hpp> +#include <ftl/protocol/config.h> +#include <ftl/handle.hpp> -#include "../src/socket.hpp" -#include "../src/protocol/connection.hpp" - -#define _FTL_NET_UNIVERSE_HPP_ +#include "mocks/connection.hpp" using std::tuple; using std::get; @@ -28,155 +27,18 @@ using std::vector; using ftl::net::Peer; using std::this_thread::sleep_for; using std::chrono::milliseconds; - -// --- Mock -------------------------------------------------------------------- - -namespace ftl { -namespace net { - -typedef unsigned int callback_t; - -class Universe { - public: - Universe() {}; - - ftl::UUID id() { return ftl::UUID(); } - - void _notifyConnect(Peer*) {} - void _notifyDisconnect(Peer*) {} - void removeCallback(callback_t id) {} - - callback_t onConnect(const std::function<void(Peer*)> &f) { return 0; } - callback_t onDisconnect(const std::function<void(Peer*)> &f) { return 0; } - - size_t getSendBufferSize(ftl::URI::scheme_t s) const { return 10*1024; } - size_t getRecvBufferSize(ftl::URI::scheme_t s) const { return 10*1024; } -}; -} -} - -using ftl::net::internal::Socket; -using ftl::net::internal::SocketConnection; - -// Mock connection, reads/writes from fakedata -// TODO: use separate in/out data -static std::map<int, std::string> fakedata; -//static std::mutex fakedata_mtx; - -class Connection_Mock : public SocketConnection { -public: - const int id_; - Connection_Mock(int id) : SocketConnection(), id_(id) { - - } - - void connect(const ftl::URI&, int) override {} - - ssize_t send(const char* buffer, size_t len) override { - fakedata[id_] += std::string(buffer, len); - return len; - } - - ssize_t recv(char *buffer, size_t len) override { - if (fakedata.count(id_) == 0) { - // this is an error in test - std::cout << "unrecognised socket, test error (FIXME)" << std::endl; - return 0; - } - - size_t l = fakedata[id_].size(); - CHECK(l <= len); // FIXME: buffer overflow - std::memcpy(buffer, fakedata[id_].c_str(), l); - - fakedata.erase(id_); - - return l; - } - - ssize_t writev(const struct iovec *iov, int iovcnt) override { - size_t sent = 0; - std::stringstream ss; - for (int i = 0; i < iovcnt; i++) { - ss << std::string((char*)(iov[i].iov_base), size_t(iov[i].iov_len)); - sent += iov[i].iov_len; - } - fakedata[id_] += ss.str(); - return sent; - } -}; +using ftl::protocol::NodeStatus; static std::atomic<int> ctr_ = 0; -std::unique_ptr<SocketConnection> ftl::net::internal::createConnection(const ftl::URI &uri) { - return std::make_unique<Connection_Mock>(ctr_++); -} - -bool ftl::net::internal::Socket::is_fatal() { - return false; -} - -class MockPeer : public Peer { -private: - MockPeer(int) : - Peer(ftl::net::internal::createConnection(ftl::URI("")), new ftl::net::Universe()), - idx(ctr_ - 1) {} - - MockPeer(std::string uri) : - Peer(ftl::URI(""), new ftl::net::Universe()), idx(ctr_ - 1) {} - -public: - int idx; - - void mock_data() { data(); } - - static MockPeer create_connecting_peer() { - return MockPeer(""); - }; - - static MockPeer create_listening_peer() { - return MockPeer(0); - }; -}; - -// --- Support ----------------------------------------------------------------- - -void send_handshake(Peer &p) { - ftl::UUID id; - p.send("__handshake__", ftl::net::kMagic, ((8 << 16) + (5 << 8) + 2), id); -} - -template <typename T> -tuple<std::string, T> readResponse(int s) { - msgpack::object_handle msg = msgpack::unpack(fakedata[s].data(), fakedata[s].size()); - tuple<uint8_t, std::string, T> req; - msg.get().convert(req); - return std::make_tuple(get<1>(req), get<2>(req)); -} - -template <typename T> -tuple<uint32_t, T> readRPC(int s) { - msgpack::object_handle msg = msgpack::unpack(fakedata[s].data(), fakedata[s].size()); - tuple<uint8_t, uint32_t, std::string, T> req; - msg.get().convert(req); - return std::make_tuple(get<1>(req), get<3>(req)); -} - -template <typename T> -T readRPCReturn(int s) { - msgpack::object_handle msg = msgpack::unpack(fakedata[s].data(), fakedata[s].size()); - tuple<uint8_t, uint32_t, std::string, T> req; - msg.get().convert(req); - return get<3>(req); -} - -// --- Files to test ----------------------------------------------------------- - -#include "../src/peer.cpp" // --- Tests ------------------------------------------------------------------- TEST_CASE("Peer(int)", "[]") { SECTION("initiates a valid handshake") { - MockPeer s = MockPeer::create_listening_peer(); + auto s = createMockPeer(ctr_++); + s->start(); + + LOG(INFO) << "STARTED"; auto [name, hs] = readResponse<ftl::net::Handshake>(0); @@ -191,24 +53,28 @@ TEST_CASE("Peer(int)", "[]") { // 3) Sends peer UUID - REQUIRE( s.status() == Peer::kConnecting ); + REQUIRE( s->status() == NodeStatus::kConnecting ); } SECTION("completes on full handshake") { - MockPeer s_l = MockPeer::create_listening_peer(); - MockPeer s_c = MockPeer::create_connecting_peer(); + int lidx = ctr_++; + int cidx = ctr_++; + auto s_l = createMockPeer(lidx); + auto s_c = createMockPeer(cidx); + + s_l->start(); // get sent message by s_l and place it in s_c's buffer - fakedata[s_c.idx] = fakedata[s_l.idx]; - s_l.mock_data(); // listenin peer: process + fakedata[cidx] = fakedata[lidx]; + s_l->data(); // listenin peer: process // vice versa, listening peer gets reply and processes it - fakedata[s_l.idx] = fakedata[s_c.idx]; - s_c.mock_data(); // connecting peer: process + fakedata[lidx] = fakedata[cidx]; + s_c->data(); // connecting peer: process sleep_for(milliseconds(50)); // both peers should be connected now - REQUIRE( s_c.status() == Peer::kConnected ); - REQUIRE( s_l.status() == Peer::kConnected ); + REQUIRE( s_c->status() == NodeStatus::kConnected ); + REQUIRE( s_l->status() == NodeStatus::kConnected ); } SECTION("has correct version on full handshake") { @@ -227,56 +93,59 @@ TEST_CASE("Peer(int)", "[]") { // Send handshake response //REQUIRE( s.id() == ); } + + ftl::protocol::reset(); } TEST_CASE("Peer::call()", "[rpc]") { - MockPeer s = MockPeer::create_connecting_peer(); - send_handshake(s); - s.mock_data(); + int c = ctr_++; + auto s = createMockPeer(c); + send_handshake(*s.get()); + s->data(); sleep_for(milliseconds(50)); SECTION("one argument call") { - REQUIRE( s.isConnected() ); + REQUIRE( s->isConnected() ); - fakedata[s.idx] = ""; + fakedata[c] = ""; // Thread to provide response to otherwise blocking call - std::thread thr([&s]() { - while (fakedata[s.idx].size() == 0) std::this_thread::sleep_for(std::chrono::milliseconds(20)); + std::thread thr([&s, c]() { + while (fakedata[c].size() == 0) std::this_thread::sleep_for(std::chrono::milliseconds(20)); - auto [id,value] = readRPC<tuple<int>>(s.idx); + auto [id,value] = readRPC<tuple<int>>(c); auto res_obj = std::make_tuple(1,id,"__return__",get<0>(value)+22); std::stringstream buf; msgpack::pack(buf, res_obj); - fakedata[s.idx] = buf.str(); - s.mock_data(); + fakedata[c] = buf.str(); + s->data(); sleep_for(milliseconds(50)); }); - int res = s.call<int>("test1", 44); + int res = s->call<int>("test1", 44); thr.join(); REQUIRE( (res == 66) ); } SECTION("no argument call") { - REQUIRE( s.isConnected() ); + REQUIRE( s->isConnected() ); - fakedata[s.idx] = ""; + fakedata[c] = ""; // Thread to provide response to otherwise blocking call - std::thread thr([&s]() { - while (fakedata[s.idx].size() == 0) std::this_thread::sleep_for(std::chrono::milliseconds(20)); + std::thread thr([&s, c]() { + while (fakedata[c].size() == 0) std::this_thread::sleep_for(std::chrono::milliseconds(20)); - auto res = readRPC<tuple<>>(s.idx); + auto res = readRPC<tuple<>>(c); auto res_obj = std::make_tuple(1,std::get<0>(res),"__return__",77); std::stringstream buf; msgpack::pack(buf, res_obj); - fakedata[s.idx] = buf.str(); - s.mock_data(); + fakedata[c] = buf.str(); + s->data(); sleep_for(milliseconds(50)); }); - int res = s.call<int>("test1"); + int res = s->call<int>("test1"); thr.join(); @@ -284,49 +153,53 @@ TEST_CASE("Peer::call()", "[rpc]") { } SECTION("vector return from call") { - REQUIRE( s.isConnected() ); + REQUIRE( s->isConnected() ); - fakedata[s.idx] = ""; + fakedata[c] = ""; // Thread to provide response to otherwise blocking call - std::thread thr([&s]() { - while (fakedata[s.idx].size() == 0) std::this_thread::sleep_for(std::chrono::milliseconds(20)); + std::thread thr([&s, c]() { + while (fakedata[c].size() == 0) std::this_thread::sleep_for(std::chrono::milliseconds(20)); - auto res = readRPC<tuple<>>(s.idx); + auto res = readRPC<tuple<>>(c); vector<int> data = {44,55,66}; auto res_obj = std::make_tuple(1,std::get<0>(res),"__return__",data); std::stringstream buf; msgpack::pack(buf, res_obj); - fakedata[s.idx] = buf.str(); - s.mock_data(); + fakedata[c] = buf.str(); + s->data(); sleep_for(milliseconds(50)); }); - vector<int> res = s.call<vector<int>>("test1"); + vector<int> res = s->call<vector<int>>("test1"); thr.join(); REQUIRE( (res[0] == 44) ); REQUIRE( (res[2] == 66) ); } + + s.reset(); + ftl::protocol::reset(); } TEST_CASE("Peer::bind()", "[rpc]") { - MockPeer s = MockPeer::create_listening_peer(); - send_handshake(s); - s.mock_data(); + int c = ctr_++; + auto s = createMockPeer(c); + send_handshake(*s.get()); + s->data(); sleep_for(milliseconds(50)); SECTION("no argument call") { bool done = false; - s.bind("hello", [&]() { + s->bind("hello", [&]() { done = true; }); - s.send("hello"); - s.mock_data(); // Force it to read the fake send... + s->send("hello"); + s->data(); // Force it to read the fake send... sleep_for(milliseconds(50)); REQUIRE( done ); @@ -335,12 +208,12 @@ TEST_CASE("Peer::bind()", "[rpc]") { SECTION("one argument call") { int done = 0; - s.bind("hello", [&](int a) { + s->bind("hello", [&](int a) { done = a; }); - s.send("hello", 55); - s.mock_data(); // Force it to read the fake send... + s->send("hello", 55); + s->data(); // Force it to read the fake send... sleep_for(milliseconds(50)); REQUIRE( (done == 55) ); @@ -349,12 +222,12 @@ TEST_CASE("Peer::bind()", "[rpc]") { SECTION("two argument call") { std::string done; - s.bind("hello", [&](int a, std::string b) { + s->bind("hello", [&](int a, const std::string &b) { done = b; }); - s.send("hello", 55, "world"); - s.mock_data(); // Force it to read the fake send... + s->send("hello", 55, "world"); + s->data(); // Force it to read the fake send... sleep_for(milliseconds(50)); REQUIRE( (done == "world") ); @@ -363,49 +236,53 @@ TEST_CASE("Peer::bind()", "[rpc]") { SECTION("int return value") { int done = 0; - s.bind("hello", [&](int a) -> int { + s->bind("hello", [&](int a) -> int { done = a; return a; }); - s.asyncCall<int>("hello", [](int a){}, 55); - s.mock_data(); // Force it to read the fake send... + s->asyncCall<int>("hello", [](int a){}, 55); + s->data(); // Force it to read the fake send... sleep_for(milliseconds(50)); REQUIRE( (done == 55) ); - REQUIRE( (readRPCReturn<int>(s.idx) == 55) ); + REQUIRE( (readRPCReturn<int>(c) == 55) ); } SECTION("vector return value") { int done = 0; - s.bind("hello", [&](int a) -> vector<int> { + s->bind("hello", [&](int a) -> vector<int> { done = a; vector<int> b = {a,45}; return b; }); - s.asyncCall<int>("hello", [](int a){}, 55); - s.mock_data(); // Force it to read the fake send... + s->asyncCall<int>("hello", [](int a){}, 55); + s->data(); // Force it to read the fake send... sleep_for(milliseconds(50)); REQUIRE( (done == 55) ); - auto res = readRPCReturn<vector<int>>(s.idx); + auto res = readRPCReturn<vector<int>>(c); REQUIRE( (res[1] == 45) ); } + + s.reset(); + ftl::protocol::reset(); } TEST_CASE("Socket::send()", "[io]") { - MockPeer s = MockPeer::create_connecting_peer(); + int c = ctr_++; + auto s = createMockPeer(c); sleep_for(milliseconds(50)); SECTION("send an int") { int i = 607; - s.send("dummy",i); + s->send("dummy",i); - auto [name, value] = readResponse<tuple<int>>(s.idx); + auto [name, value] = readResponse<tuple<int>>(c); REQUIRE( (name == "dummy") ); REQUIRE( (get<0>(value) == 607) ); @@ -413,18 +290,18 @@ TEST_CASE("Socket::send()", "[io]") { SECTION("send a string") { std::string str("hello world"); - s.send("dummy",str); + s->send("dummy",str); - auto [name, value] = readResponse<tuple<std::string>>(s.idx); + auto [name, value] = readResponse<tuple<std::string>>(c); REQUIRE( (name == "dummy") ); REQUIRE( (get<0>(value) == "hello world") ); } SECTION("send const char* string") { - s.send("dummy","hello world"); + s->send("dummy","hello world"); - auto [name, value] = readResponse<tuple<std::string>>(s.idx); + auto [name, value] = readResponse<tuple<std::string>>(c); REQUIRE( (name == "dummy") ); REQUIRE( (get<0>(value) == "hello world") ); @@ -432,9 +309,9 @@ TEST_CASE("Socket::send()", "[io]") { SECTION("send a tuple") { auto tup = std::make_tuple(55,66,true,6.7); - s.send("dummy",tup); + s->send("dummy",tup); - auto [name, value] = readResponse<tuple<decltype(tup)>>(s.idx); + auto [name, value] = readResponse<tuple<decltype(tup)>>(c); REQUIRE( (name == "dummy") ); REQUIRE( (get<1>(get<0>(value)) == 66) ); @@ -443,13 +320,16 @@ TEST_CASE("Socket::send()", "[io]") { SECTION("send multiple strings") { std::string str("hello "); std::string str2("world"); - s.send("dummy2",str,str2); + s->send("dummy2",str,str2); - auto [name, value] = readResponse<tuple<std::string,std::string>>(s.idx); + auto [name, value] = readResponse<tuple<std::string,std::string>>(c); REQUIRE( (name == "dummy2") ); REQUIRE( (get<0>(value) == "hello ") ); REQUIRE( (get<1>(value) == "world") ); } + + s.reset(); + ftl::protocol::reset(); } diff --git a/test/socket_mock.cpp b/test/socket_mock.cpp index b9e71e893afe951c1c91d92ac4b8db2c7155a73a..08177a7a1003a964c9d113c476867692964e226a 100644 --- a/test/socket_mock.cpp +++ b/test/socket_mock.cpp @@ -1,7 +1,7 @@ /** no-op socket for unit tests. Simulated data transfer implemented in * Connection_Mock */ -#include "../src/socket.hpp" +#include "../src/socketImpl.hpp" #include <string> @@ -13,7 +13,7 @@ bool ftl::net::internal::resolve_inet_address(const std::string &hostname, int p } Socket::Socket(int domain, int type, int protocol) : - status_(STATUS::UNCONNECTED), fd_(-1), family_(domain) { + status_(STATUS::UNCONNECTED), fd_(-1), family_(domain), err_(0) { } bool Socket::is_valid() { return true; } @@ -90,7 +90,7 @@ bool Socket::is_blocking() { return true; } -std::string Socket::get_error_string() { +std::string Socket::get_error_string(int code) { return "not real socket"; } diff --git a/test/stream_integration.cpp b/test/stream_integration.cpp index a9baf1bf8aff7842379e85b4c2d065c966a48afb..6168147721ddf38cfece97ccc9b65acd0cc7d95b 100644 --- a/test/stream_integration.cpp +++ b/test/stream_integration.cpp @@ -7,6 +7,11 @@ #include <ftl/protocol/node.hpp> using ftl::protocol::FrameID; +using ftl::protocol::StreamProperty; + +// --- Mock -------------------------------------------------------------------- + + // --- Tests ------------------------------------------------------------------- @@ -86,6 +91,8 @@ TEST_CASE("TCP Stream", "[net]") { REQUIRE( rpkt.bitrate == 10 ); REQUIRE( rpkt.codec == ftl::protocol::Codec::kJPG ); REQUIRE( rpkt.frame_count == 1 ); + + REQUIRE( std::any_cast<size_t>(s1->getProperty(StreamProperty::kObservers)) == 1 ); } p.reset();