From 90c8459698c79c959e672dee959185868b74a1df Mon Sep 17 00:00:00 2001 From: Nicolas Pope <nicolas.pope@utu.fi> Date: Mon, 16 May 2022 08:26:12 +0000 Subject: [PATCH] #10 add a file stream --- CMakeLists.txt | 5 + examples/CMakeLists.txt | 2 + examples/read-ftl-file/main.cpp | 39 ++ include/ftl/protocol/broadcaster.hpp | 2 +- include/ftl/protocol/codecs.hpp | 11 - include/ftl/protocol/error.hpp | 3 +- include/ftl/protocol/interceptor.hpp | 2 +- include/ftl/protocol/muxer.hpp | 2 +- include/ftl/protocol/packet.hpp | 23 +- include/ftl/protocol/streams.hpp | 8 +- include/ftl/uri.hpp | 2 + src/self.cpp | 11 +- src/streams/broadcaster.cpp | 6 +- src/streams/filestream.cpp | 621 ++++++++++++++++++++++++++- src/streams/filestream.hpp | 131 ++++++ src/streams/muxer.cpp | 4 +- src/streams/netstream.cpp | 12 +- src/streams/netstream.hpp | 10 +- src/streams/packetMsgpack.hpp | 50 ++- src/streams/streams.cpp | 2 +- src/uri.cpp | 40 +- test/CMakeLists.txt | 11 + test/broadcast_unit.cpp | 8 +- test/filestream_unit.cpp | 122 ++++++ test/muxer_unit.cpp | 24 +- test/netstream_unit.cpp | 6 +- test/stream_integration.cpp | 6 +- 27 files changed, 1081 insertions(+), 82 deletions(-) create mode 100644 examples/CMakeLists.txt create mode 100644 examples/read-ftl-file/main.cpp create mode 100644 src/streams/filestream.hpp create mode 100644 test/filestream_unit.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index f8daf9f..8050737 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -17,6 +17,7 @@ enable_testing() option(WITH_GNUTLS "Enable TLS support" ON) option(USE_CPPCHECK "Apply cppcheck during build" ON) option(BUILD_TESTS "Compile all unit and integration tests" ON) +option(BUILD_EXAMPLES "Compile the examples" ON) if (NOT WIN32) option(WITH_PYTHON "Enable python support" ON) @@ -206,3 +207,7 @@ install(DIRECTORY include/ DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}) if (BUILD_TESTS) add_subdirectory(test) endif() + +if (BUILD_EXAMPLES) + add_subdirectory(examples) +endif() diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt new file mode 100644 index 0000000..c2703d8 --- /dev/null +++ b/examples/CMakeLists.txt @@ -0,0 +1,2 @@ +add_executable(read-ftl-file ./read-ftl-file/main.cpp) +target_link_libraries(read-ftl-file beyond-protocol Threads::Threads ${OS_LIBS} ${URIPARSER_LIBRARIES} ${UUID_LIBRARIES}) \ No newline at end of file diff --git a/examples/read-ftl-file/main.cpp b/examples/read-ftl-file/main.cpp new file mode 100644 index 0000000..e166a29 --- /dev/null +++ b/examples/read-ftl-file/main.cpp @@ -0,0 +1,39 @@ +/** + * @file main.cpp + * @copyright Copyright (c) 2022 University of Turku, MIT License + * @author Nicolas Pope + */ + +#include <chrono> +#include <ftl/protocol.hpp> +#include <ftl/protocol/streams.hpp> +#include <ftl/lib/loguru.hpp> + +using ftl::protocol::StreamPacket; +using ftl::protocol::DataPacket; +using std::this_thread::sleep_for; +using std::chrono::seconds; +using ftl::protocol::StreamProperty; + +int main(int argc, char *argv[]) { + if (argc != 2) return -1; + + auto stream = ftl::getStream(argv[1]); + + auto h = stream->onPacket([](const StreamPacket &spkt, const DataPacket &pkt) { + LOG(INFO) << "Packet: " + << static_cast<int>(spkt.streamID) << "," + << static_cast<int>(spkt.frame_number) << "," + << static_cast<int>(spkt.channel); + return true; + }); + + stream->setProperty(StreamProperty::kLooping, true); + stream->setProperty(StreamProperty::kSpeed, 1); + + if (!stream->begin()) return -1; + sleep_for(seconds(5)); + stream->end(); + + return 0; +} diff --git a/include/ftl/protocol/broadcaster.hpp b/include/ftl/protocol/broadcaster.hpp index ec768bc..a514299 100644 --- a/include/ftl/protocol/broadcaster.hpp +++ b/include/ftl/protocol/broadcaster.hpp @@ -27,7 +27,7 @@ class Broadcast : public Stream { void remove(const std::shared_ptr<Stream> &); void clear(); - bool post(const ftl::protocol::StreamPacket &, const ftl::protocol::Packet &) override; + bool post(const ftl::protocol::StreamPacket &, const ftl::protocol::DataPacket &) override; bool begin() override; bool end() override; diff --git a/include/ftl/protocol/codecs.hpp b/include/ftl/protocol/codecs.hpp index 67762b2..4de382f 100644 --- a/include/ftl/protocol/codecs.hpp +++ b/include/ftl/protocol/codecs.hpp @@ -11,16 +11,8 @@ namespace ftl { -/** - * Video and data encoding / decoding components are located in this namespace. - * Audio codecs are for now in `ftl::audio` namespace. - */ namespace protocol { -static constexpr uint8_t kFlagRequest = 0x01; ///< Used for empty data packets to mark a request for data -static constexpr uint8_t kFlagCompleted = 0x02; ///< Last packet for timestamp -static constexpr uint8_t kFlagReset = 0x04; - /** * Compression format used. */ @@ -49,8 +41,5 @@ enum struct Codec : uint8_t { kAny = 255 }; -/** Given a frame count, return a width x height tile configuration. */ -std::pair<int, int> chooseTileConfig(int size); - } // namespace protocol } // namespace ftl diff --git a/include/ftl/protocol/error.hpp b/include/ftl/protocol/error.hpp index 3f17202..537987c 100644 --- a/include/ftl/protocol/error.hpp +++ b/include/ftl/protocol/error.hpp @@ -29,7 +29,8 @@ enum struct Error { kListen, kURIAlreadyExists, kURIDoesNotExist, - kBadURI + kBadURI, + kBadVersion }; } // namespace protocol diff --git a/include/ftl/protocol/interceptor.hpp b/include/ftl/protocol/interceptor.hpp index 1a2ecc9..199768f 100644 --- a/include/ftl/protocol/interceptor.hpp +++ b/include/ftl/protocol/interceptor.hpp @@ -26,7 +26,7 @@ class Intercept : public Stream { //bool onPacket(const StreamCallback &) override; bool onIntercept(const StreamCallback &); - bool post(const ftl::protocol::StreamPacket &, const ftl::protocol::Packet &) override; + bool post(const ftl::protocol::StreamPacket &, const ftl::protocol::DataPacket &) override; bool begin() override; bool end() override; diff --git a/include/ftl/protocol/muxer.hpp b/include/ftl/protocol/muxer.hpp index 5d934b3..7a6c791 100644 --- a/include/ftl/protocol/muxer.hpp +++ b/include/ftl/protocol/muxer.hpp @@ -32,7 +32,7 @@ class Muxer : public Stream { void add(const std::shared_ptr<Stream> &, int fsid = -1); void remove(const std::shared_ptr<Stream> &); - bool post(const ftl::protocol::StreamPacket &, const ftl::protocol::Packet &) override; + bool post(const ftl::protocol::StreamPacket &, const ftl::protocol::DataPacket &) override; bool begin() override; bool end() override; diff --git a/include/ftl/protocol/packet.hpp b/include/ftl/protocol/packet.hpp index d75b79c..e493c04 100644 --- a/include/ftl/protocol/packet.hpp +++ b/include/ftl/protocol/packet.hpp @@ -9,12 +9,17 @@ #include <cstdint> #include <vector> #include <string> +#include <tuple> #include <ftl/protocol/codecs.hpp> #include <ftl/protocol/channels.hpp> namespace ftl { namespace protocol { +static constexpr uint8_t kFlagRequest = 0x01; ///< Used for empty data packets to mark a request for data +static constexpr uint8_t kFlagCompleted = 0x02; ///< Last packet for timestamp +static constexpr uint8_t kFlagReset = 0x04; + static constexpr uint8_t kAllFrames = 255; static constexpr uint8_t kAllFramesets = 255; static constexpr uint8_t kCurrentFTLVersion = 5; @@ -40,7 +45,7 @@ struct IndexHeader { * codec may use its own blocks and packets, in which case this is essentially * an empty wrapper around that. It is used in the encoding callback. */ -struct Packet { +struct DataPacket { ftl::protocol::Codec codec = ftl::protocol::Codec::kInvalid; uint8_t reserved = 0; uint8_t frame_count = 1; // v4+ Frames included in this packet @@ -48,7 +53,7 @@ struct Packet { uint8_t bitrate = 0; // v4+ For multi-bitrate encoding, 0=highest union { - uint8_t flags = 0; // Codec dependent flags (eg. I-Frame or P-Frame) + uint8_t dataFlags = 0; // Codec dependent flags (eg. I-Frame or P-Frame) uint8_t packet_count; }; std::vector<uint8_t> data; @@ -59,7 +64,7 @@ static constexpr unsigned int kStreamCap_Recorded = 0x02; static constexpr unsigned int kStreamCap_NewConnection = 0x04; /** V4 packets have no stream flags field */ -struct StreamPacketV4 { +/*struct StreamPacketV4 { int version = 4; // FTL version, Not encoded into stream int64_t timestamp; @@ -75,7 +80,7 @@ struct StreamPacketV4 { size_t hint_source_total; // Number of tracks per frame to expect operator std::string() const; -}; +};*/ /** * Add timestamp and channel information to a raw encoded frame packet. This @@ -103,15 +108,7 @@ struct StreamPacket { operator std::string() const; }; -/** - * Combine both packet types into a single packet unit. This pair is always - * saved or transmitted in a stream together. - */ -struct PacketPair { - PacketPair(const StreamPacket &s, const Packet &p) : spkt(s), pkt(p) {} - const StreamPacket &spkt; - const Packet &pkt; -}; +struct Packet : public StreamPacket, public DataPacket {}; } // namespace protocol } // namespace ftl diff --git a/include/ftl/protocol/streams.hpp b/include/ftl/protocol/streams.hpp index 2e298de..e5bd060 100644 --- a/include/ftl/protocol/streams.hpp +++ b/include/ftl/protocol/streams.hpp @@ -33,7 +33,7 @@ struct Request { using RequestCallback = std::function<bool(const ftl::protocol::Request&)>; -using StreamCallback = std::function<bool(const ftl::protocol::StreamPacket &, const ftl::protocol::Packet &)>; +using StreamCallback = std::function<bool(const ftl::protocol::StreamPacket &, const ftl::protocol::DataPacket &)>; /** * @brief Enumeration of possible stream properties. Not all properties are supported @@ -104,7 +104,7 @@ class Stream { * @return true if sent * @return false if dropped */ - virtual bool post(const ftl::protocol::StreamPacket &, const ftl::protocol::Packet &) = 0; + virtual bool post(const ftl::protocol::StreamPacket &, const ftl::protocol::DataPacket &) = 0; // TODO(Nick): Add methods for: pause, paused, statistics @@ -311,7 +311,7 @@ class Stream { protected: /** Dispatch packets to callbacks */ - void trigger(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::Packet &pkt); + void trigger(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::DataPacket &pkt); /** Mark the channel and frame as available */ void seen(FrameID id, ftl::protocol::Channel channel); @@ -332,7 +332,7 @@ class Stream { // TODO(Nick): Add a name and metadata }; - ftl::Handler<const ftl::protocol::StreamPacket&, const ftl::protocol::Packet&> cb_; + ftl::Handler<const ftl::protocol::StreamPacket&, const ftl::protocol::DataPacket&> cb_; ftl::Handler<const Request &> request_cb_; ftl::Handler<FrameID, ftl::protocol::Channel> avail_cb_; ftl::Handler<ftl::protocol::Error, const std::string&> error_cb_; diff --git a/include/ftl/uri.hpp b/include/ftl/uri.hpp index c1b8abe..2beb386 100644 --- a/include/ftl/uri.hpp +++ b/include/ftl/uri.hpp @@ -81,6 +81,8 @@ class URI { std::string to_string() const; + std::string toFilePath() const; + void to_json(nlohmann::json &) const; private: diff --git a/src/self.cpp b/src/self.cpp index 4aff76a..47a339e 100644 --- a/src/self.cpp +++ b/src/self.cpp @@ -7,6 +7,7 @@ #include "universe.hpp" #include <ftl/protocol/self.hpp> #include "./streams/netstream.hpp" +#include "./streams/filestream.hpp" using ftl::protocol::Self; @@ -21,13 +22,13 @@ std::shared_ptr<ftl::protocol::Node> Self::connectNode(const std::string &uri) { std::shared_ptr<ftl::protocol::Stream> Self::createStream(const std::string &uri) { ftl::URI u(uri); - if (!u.isValid()) throw FTL_Error("Invalid Stream URI"); + if (!u.isValid()) throw FTL_Error("Invalid Stream URI: " << uri); switch (u.getScheme()) { case ftl::URI::SCHEME_FTL : return std::make_shared<ftl::protocol::Net>(uri, universe_.get(), true); case ftl::URI::SCHEME_FILE : - case ftl::URI::SCHEME_NONE : - default : throw FTL_Error("Invalid Stream URI"); + case ftl::URI::SCHEME_NONE : return std::make_shared<ftl::protocol::File>(uri, true); + default : throw FTL_Error("Invalid Stream URI: " << uri); } } @@ -39,8 +40,8 @@ std::shared_ptr<ftl::protocol::Stream> Self::getStream(const std::string &uri) { switch (u.getScheme()) { case ftl::URI::SCHEME_FTL : return std::make_shared<ftl::protocol::Net>(uri, universe_.get(), false); case ftl::URI::SCHEME_FILE : - case ftl::URI::SCHEME_NONE : - default : throw FTL_Error("Invalid Stream URI"); + case ftl::URI::SCHEME_NONE : return std::make_shared<ftl::protocol::File>(uri, false); + default : throw FTL_Error("Invalid Stream URI: " << uri); } } diff --git a/src/streams/broadcaster.cpp b/src/streams/broadcaster.cpp index 5afb6c3..8380357 100644 --- a/src/streams/broadcaster.cpp +++ b/src/streams/broadcaster.cpp @@ -8,7 +8,7 @@ using ftl::protocol::Broadcast; using ftl::protocol::StreamPacket; -using ftl::protocol::Packet; +using ftl::protocol::DataPacket; using ftl::protocol::Channel; using ftl::protocol::FrameID; @@ -22,7 +22,7 @@ void Broadcast::add(const std::shared_ptr<Stream> &s) { auto &entry = streams_.emplace_back(); entry.stream = s; - entry.handle = std::move(s->onPacket([this, s](const StreamPacket &spkt, const Packet &pkt) { + entry.handle = std::move(s->onPacket([this, s](const StreamPacket &spkt, const DataPacket &pkt) { trigger(spkt, pkt); return true; })); @@ -56,7 +56,7 @@ void Broadcast::clear() { streams_.clear(); } -bool Broadcast::post(const StreamPacket &spkt, const Packet &pkt) { +bool Broadcast::post(const StreamPacket &spkt, const DataPacket &pkt) { bool status = true; for (auto &s : streams_) { status = s.stream->post(spkt, pkt) && status; diff --git a/src/streams/filestream.cpp b/src/streams/filestream.cpp index c94e373..bc09295 100644 --- a/src/streams/filestream.cpp +++ b/src/streams/filestream.cpp @@ -1 +1,620 @@ -/* Empty */ \ No newline at end of file +/** + * @file filestream.cpp + * @copyright Copyright (c) 2022 University of Turku, MIT License + * @author Nicolas Pope + */ + +#include <fstream> +#include <unordered_set> +#include <string> +#include <utility> +#include <limits> +#include <algorithm> +#include <thread> +#include <chrono> +#include "filestream.hpp" +#include <ftl/time.hpp> +#include "packetMsgpack.hpp" + +#define LOGURU_REPLACE_GLOG 1 +#include <loguru.hpp> + +using ftl::protocol::File; +using ftl::protocol::StreamPacket; +using ftl::protocol::DataPacket; +using ftl::protocol::Packet; +using std::get; +using ftl::protocol::Channel; +using ftl::protocol::StreamPacketMSGPACK; +using ftl::protocol::PacketMSGPACK; +using std::this_thread::sleep_for; +using std::chrono::milliseconds; +using ftl::protocol::StreamProperty; + +File::File(const std::string &uri, bool writeable) : + Stream(), + uri_(uri), + ostream_(nullptr), + istream_(nullptr), + active_(false) { + mode_ = (writeable) ? Mode::Write : Mode::Read; + + // Open the file + if (!writeable) { + if (!_checkFile()) { + throw FTL_Error("Could not open file"); + } + } +} + +File::File(std::ifstream *is) : Stream(), ostream_(nullptr), istream_(is), active_(false) { + mode_ = Mode::Read; + + if (!_checkFile()) { + throw FTL_Error("Could not open file"); + } +} + +File::File(std::ofstream *os) : Stream(), ostream_(os), istream_(nullptr), active_(false) { + mode_ = Mode::Write; +} + +File::~File() { + end(); +} + +bool File::_checkFile() { + if (!_open()) return false; + + // Read some packets to identify frame rate. + int count = 1000; + int64_t ts = -1000; + int min_ts_diff = 1000; + first_ts_ = 10000000000000ll; + + std::unordered_set<ftl::protocol::Codec> codecs_found; + + while (count > 0) { + Packet data; + if (!readPacket(data)) { + break; + } + + StreamPacket &spkt = data; + Packet &pkt = data; + + seen(FrameID(spkt.streamID, spkt.frame_number), spkt.channel); + + // TODO(Nick): Extract metadata + + auto &fsdata = framesets_[spkt.streamID]; + + codecs_found.emplace(pkt.codec); + + if (fsdata.first_ts < 0) fsdata.first_ts = spkt.timestamp; + + if (spkt.timestamp > 0 && static_cast<int>(spkt.channel) < 32) { + if (spkt.timestamp > ts) { + --count; + auto d = spkt.timestamp - ts; + if (d < min_ts_diff && d > 0) { + min_ts_diff = d; + } + ts = spkt.timestamp; + } + } + } + + buffer_in_.reset(); + buffer_in_.remove_nonparsed_buffer(); + + checked_ = true; + + is_video_ = count < 9; + + framerate_ = 1000 / min_ts_diff; + if (!is_video_) { + looping_ = false; + } + + interval_ = min_ts_diff; + for (auto &f : framesets_) { + f.second.interval = interval_; + } + return true; +} + +bool File::isValid() { + return _checkFile(); +} + +bool File::post(const StreamPacket &s, const DataPacket &p) { + if (!active_) return false; + if (mode_ != Mode::Write) { + // LOG(WARNING) << "Cannot write to read-only ftl file"; + return false; + } + + // LOG(INFO) << "WRITE: " << s.timestamp << " " << (int)s.channel << " " << p.data.size(); + + // Don't write dummy packets to files. + if (p.data.size() == 0) return true; + + // Discard all data channel packets for now + // if (!save_data_ && static_cast<int>(s.channel) >= static_cast<int>(ftl::codecs::Channel::Data)) return true; + + StreamPacket s2 = s; + + auto data = std::tie( + *reinterpret_cast<const StreamPacketMSGPACK*>(&s2), + *reinterpret_cast<const PacketMSGPACK*>(&p)); + msgpack::sbuffer buffer; + msgpack::pack(buffer, data); + + UNIQUE_LOCK(mutex_, lk); + ostream_->write(buffer.data(), buffer.size()); + return ostream_->good(); +} + +bool File::readPacket(Packet &data) { + bool partial = false; + ftl::protocol::Packer pack; + + while ((istream_->good()) || buffer_in_.nonparsed_size() > 0u) { + if (buffer_in_.nonparsed_size() == 0 || (partial && buffer_in_.nonparsed_size() < 10000000)) { + buffer_in_.reserve_buffer(10000000); + istream_->read(buffer_in_.buffer(), buffer_in_.buffer_capacity()); + // if (stream_->bad()) return false; + + int bytes = istream_->gcount(); + if (bytes == 0) return false; + buffer_in_.buffer_consumed(bytes); + partial = false; + } + + msgpack::object_handle msg; + if (!buffer_in_.next(msg)) { + partial = true; + continue; + } + + msgpack::object obj = msg.get(); + + try { + // Older versions have a different SPKT structure. + if (version_ < 5) { + /*std::tuple<StreamPacketV4MSGPACK, PacketMSGPACK> datav4; + obj.convert(datav4); + + auto &spkt = std::get<0>(data); + auto &spktv4 = std::get<0>(datav4); + spkt.version = 4; + spkt.streamID = spktv4.streamID; + spkt.channel = spktv4.channel; + spkt.frame_number = spktv4.frame_number; + spkt.timestamp = spktv4.timestamp; + spkt.flags = 0; + + std::get<1>(data) = std::move(std::get<1>(datav4));*/ + error(ftl::protocol::Error::kBadVersion, "Version too old"); + return false; + } else { + pack.set(&data); + obj.convert(pack); + } + } catch (std::exception &e) { + LOG(INFO) << "Corrupt message: " << buffer_in_.nonparsed_size() << " - " << e.what(); + // active_ = false; + return false; + } + + // Correct for older version differences. + // _patchPackets(&std::get<0>(data), &std::get<1>(data)); + + return true; + } + + return false; +} + +void File::_patchPackets(StreamPacket *spkt, DataPacket *pkt) { + // Fix to clear flags for version 2. + /*if (version_ <= 2) { + pkt.flags = 0; + } + if (version_ < 4) { + spkt.frame_number = spkt.streamID; + spkt.streamID = 0; + if (isFloatChannel(spkt.channel)) pkt.flags |= ftl::protocol::kFlagFloat; + + auto codec = pkt.codec; + if (codec == ftl::codecs::codec_t::HEVC) pkt.codec = ftl::codecs::codec_t::HEVC_LOSSLESS; + }*/ + + spkt->version = ftl::protocol::kCurrentFTLVersion; + + // Fix for flags corruption + if (pkt->data.size() == 0) { + pkt->dataFlags = 0; + } +} + +bool File::tick(int64_t ts) { + if (!active_) return false; + if (mode_ != Mode::Read) { + LOG(ERROR) << "Cannot read from a write only file"; + return false; + } + + // Skip if paused + // if (value("paused", false)) return true; + + #ifdef DEBUG_MUTEX + UNIQUE_LOCK(mutex_, lk); + #else + std::unique_lock<std::mutex> lk(mutex_, std::defer_lock); + if (!lk.try_lock()) return true; + #endif + + if (jobs_ > 0) { + return true; + } + + // Check buffer first for frames already read + size_t complete_count = 0; + + for (auto i = data_.begin(); i != data_.end(); ) { + auto &fsdata = framesets_[i->streamID]; + if (fsdata.timestamp == 0) fsdata.timestamp = i->timestamp; + + // Limit to file framerate + if (i->timestamp > ts) { + break; + } + + // Is the packet too old? + if (i->timestamp < fsdata.timestamp) { + i = data_.erase(i); + continue; + } + + if (i->timestamp <= fsdata.timestamp) { + StreamPacket &spkt = *i; + + ++jobs_; + + if (spkt.channel == Channel::kEndFrame) { + fsdata.needs_endframe = false; + } + + if (fsdata.needs_endframe) { + if (spkt.frame_number < 255) { + Packet &pkt = *i; + + fsdata.frame_count = std::max( + fsdata.frame_count, + static_cast<size_t>(spkt.frame_number + pkt.frame_count)); + while (fsdata.packet_counts.size() <= spkt.frame_number) fsdata.packet_counts.push_back(0); + ++fsdata.packet_counts[spkt.frame_number]; + } else { + // Add frameset packets to frame 0 counts + fsdata.frame_count = std::max(fsdata.frame_count, size_t(1)); + while (fsdata.packet_counts.size() < fsdata.frame_count) fsdata.packet_counts.push_back(0); + ++fsdata.packet_counts[0]; + } + } + + auto j = i; + ++i; + + // TODO(Nick): Probably better not to do a thread per packet + ftl::pool.push([this, i = j](int id) { + StreamPacket &spkt = *i; + Packet &pkt = *i; + + spkt.localTimestamp = spkt.timestamp; + + trigger(spkt, pkt); + + UNIQUE_LOCK(data_mutex_, dlk); + data_.erase(i); + --jobs_; + }); + } else { + ++complete_count; + + if (fsdata.needs_endframe) { + for (size_t j = 0; j < fsdata.frame_count; ++j) { + auto timestamp = fsdata.timestamp; + auto sid = i->streamID; + auto pcount = fsdata.packet_counts[j]; + + ftl::pool.push([this, timestamp, sid, j, pcount](int id) { + // Send final frame packet. + StreamPacket spkt; + spkt.timestamp = timestamp; + spkt.streamID = sid; + spkt.flags = 0; + spkt.channel = Channel::kEndFrame; + + DataPacket pkt; + pkt.bitrate = 255; + pkt.codec = Codec::kInvalid; + pkt.packet_count = 1; + pkt.frame_count = 1; + + spkt.frame_number = j; + pkt.packet_count = pcount+1; + + trigger(spkt, pkt); + }); + + fsdata.packet_counts[j] = 0; + } + } else { + } + + fsdata.timestamp = i->timestamp; + if (complete_count == framesets_.size()) break; + } + } + + int64_t max_ts = std::numeric_limits<int64_t>::min(); + for (auto &fsd : framesets_) { + max_ts = std::max(max_ts, (fsd.second.timestamp <= 0) ? timestart_ : fsd.second.timestamp); + } + int64_t extended_ts = max_ts + 200; // Buffer 200ms ahead + + while (!read_error_ && ((active_ && istream_->good()) || buffer_in_.nonparsed_size() > 0u)) { + UNIQUE_LOCK(data_mutex_, dlk); + auto &data = data_.emplace_back(); + dlk.unlock(); + + bool res = readPacket(data); + if (!res) { + dlk.lock(); + data_.pop_back(); + read_error_ = true; + break; + } + + auto &fsdata = framesets_[data.streamID]; + + if (fsdata.first_ts < 0) { + LOG(WARNING) << "Bad first timestamp " << fsdata.first_ts << ", " << data.timestamp; + } + + // Adjust timestamp + // FIXME: A potential bug where multiple times are merged into one? + data.timestamp = (((data.timestamp) - fsdata.first_ts)) + timestart_; + data.hint_capability = + ((is_video_) ? 0 : ftl::protocol::kStreamCap_Static) | ftl::protocol::kStreamCap_Recorded; + + if (data.timestamp > extended_ts) { + break; + } + } + + // Force send end frames for static files + if (data_.size() == 0 && !is_video_) { + for (auto &fsix : framesets_) { + auto &fsdata = fsix.second; + if (fsdata.needs_endframe) { + fsdata.needs_endframe = false; + // Send final frame packet. + StreamPacket spkt; + spkt.timestamp = fsdata.timestamp; + spkt.streamID = fsix.first; + spkt.flags = 0; + spkt.channel = Channel::kEndFrame; + + DataPacket pkt; + pkt.bitrate = 255; + pkt.codec = Codec::kInvalid; + pkt.packet_count = 1; + pkt.frame_count = 1; + + for (size_t i = 0; i < fsdata.frame_count; ++i) { + spkt.frame_number = i; + pkt.packet_count = fsdata.packet_counts[i]+1; + fsdata.packet_counts[i] = 0; + + trigger(spkt, pkt); + } + } + } + } + + if (data_.size() == 0 && looping_) { + buffer_in_.reset(); + buffer_in_.remove_nonparsed_buffer(); + _open(); + + read_error_ = false; + timestart_ = ftl::time::get_time(); + for (auto &fsd : framesets_) fsd.second.timestamp = 0; + return true; + } + + return data_.size() > 0; +} + +bool File::_open() { + if (istream_ && istream_->is_open()) { + istream_->clear(); + istream_->seekg(0); + } else { + if (!istream_) istream_ = new std::ifstream; + istream_->open(uri_.toFilePath(), std::ifstream::in | std::ifstream::binary); + + if (!istream_->good()) { + LOG(ERROR) << "Could not open file: " << uri_.toFilePath(); + return false; + } + } + + ftl::protocol::Header h; + (*istream_).read(reinterpret_cast<char*>(&h), sizeof(h)); + if (h.magic[0] != 'F' || h.magic[1] != 'T' || h.magic[2] != 'L' || h.magic[3] != 'F') return false; + + if (h.version >= 2) { + ftl::protocol::IndexHeader ih; + (*istream_).read(reinterpret_cast<char*>(&ih), sizeof(ih)); + } + + version_ = h.version; + return true; +} + +bool File::run() { + thread_ = std::thread([this]() { + while (active_) { + auto now = ftl::time::get_time(); + tick(now); + auto used = ftl::time::get_time() - now; + int64_t spare = interval_ - used; + // LOG(INFO) << "SLEEP = " << spare; + sleep_for(milliseconds(std::max(int64_t(1), spare))); + } + }); + + #ifndef WIN32 + sched_param p; + p.sched_priority = sched_get_priority_max(SCHED_RR); + pthread_setschedparam(thread_.native_handle(), SCHED_RR, &p); + #endif + + // TODO(Nick): Windows thread priority + + return true; +} + +bool File::begin() { + if (active_) return true; + if (mode_ == Mode::Read) { + if (!checked_) _checkFile(); + _open(); + + // Capture current time to adjust timestamps + timestart_ = ftl::time::get_time(); + active_ = true; + read_error_ = false; + + tick(timestart_); // Do some now! + run(); + } else if (mode_ == Mode::Write) { + if (!ostream_) ostream_ = new std::ofstream; + ostream_->open(uri_.toFilePath(), std::ifstream::out | std::ifstream::binary); + + if (!ostream_->good()) { + LOG(ERROR) << "Could not open file: '" << uri_.toFilePath() << "'"; + return false; + } + + ftl::protocol::Header h; + (*ostream_).write((const char*)&h, sizeof(h)); + + ftl::protocol::IndexHeader ih; + ih.reserved[0] = -1; + (*ostream_).write((const char*)&ih, sizeof(ih)); + + // Capture current time to adjust timestamps + timestart_ = ftl::time::get_time(); + active_ = true; + interval_ = 50; // TODO(Nick): Where to get this from? + } + + return true; +} + +bool File::end() { + if (!active_) return false; + active_ = false; + + if (thread_.joinable()) thread_.join(); + + UNIQUE_LOCK(mutex_, lk); + + while (jobs_ > 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + + if (mode_ == Mode::Read) { + if (istream_) { + istream_->close(); + delete istream_; + istream_ = nullptr; + } + } else if (mode_ == Mode::Write) { + if (ostream_) { + ostream_->close(); + delete ostream_; + ostream_ = nullptr; + } + } + return true; +} + +void File::reset() { + /*UNIQUE_LOCK(mutex_, lk); + + // TODO: Find a better solution + while (jobs_ > 0) std::this_thread::sleep_for(std::chrono::milliseconds(2)); + + data_.clear(); + buffer_in_.reset(); + buffer_in_.remove_nonparsed_buffer(); + _open(); + + timestart_ = (ftl::timer::get_time() / ftl::timer::getInterval()) * ftl::timer::getInterval(); + //timestamp_ = timestart_; + for (auto &fsd : framesets_) fsd.second.timestamp = timestart_;*/ +} + +bool File::active() { + return active_; +} + +void File::refresh() {} + +bool File::enable(FrameID id) { + return Stream::enable(id); +} + +bool File::enable(FrameID id, ftl::protocol::Channel c) { + return Stream::enable(id, c); +} + +bool File::enable(FrameID id, const ftl::protocol::ChannelSet &channels) { + return Stream::enable(id, channels); +} + +void File::setProperty(StreamProperty opt, std::any value) { + switch (opt) { + case StreamProperty::kFrameRate : + case StreamProperty::kURI : throw FTL_Error("Readonly property"); + case StreamProperty::kLooping : looping_ = std::any_cast<bool>(value); break; + case StreamProperty::kSpeed : speed_ = std::any_cast<int>(value); break; + default : throw FTL_Error("Property not supported"); + } +} + +std::any File::getProperty(StreamProperty opt) { + switch (opt) { + case StreamProperty::kSpeed : return speed_; + case StreamProperty::kFrameRate : return framerate_; + case StreamProperty::kLooping : return looping_; + case StreamProperty::kURI : return uri_.getBaseURI(); + default : throw FTL_Error("Property not supported"); + } +} + +bool File::supportsProperty(StreamProperty opt) { + switch (opt) { + case StreamProperty::kSpeed : + case StreamProperty::kFrameRate : + case StreamProperty::kLooping : + case StreamProperty::kURI : return true; + default : return false; + } +} diff --git a/src/streams/filestream.hpp b/src/streams/filestream.hpp new file mode 100644 index 0000000..269b866 --- /dev/null +++ b/src/streams/filestream.hpp @@ -0,0 +1,131 @@ +/** + * @file filestream.hpp + * @copyright Copyright (c) 2022 University of Turku, MIT License + * @author Nicolas Pope + */ + +#pragma once + +#include <string> +#include <list> +#include <tuple> +#include <vector> +#include <unordered_map> +#include <thread> +#include <ftl/protocol/packet.hpp> +#include <ftl/protocol/streams.hpp> +#include <ftl/handle.hpp> +#include <ftl/uri.hpp> +#include <msgpack.hpp> + +namespace ftl { +namespace protocol { + +/** + * Provide a packet stream to/from a file. If the file already exists it is + * opened readonly, if not it is created write only. A mode to support both + * reading and writing (to re code it) could be supported by using a temp file + * for writing and swapping files when finished. It must be possible to control + * streaming rate from the file. + */ +class File : public Stream { + public: + explicit File(const std::string &uri, bool writeable = false); + explicit File(std::ifstream *); + explicit File(std::ofstream *); + ~File(); + + bool post(const ftl::protocol::StreamPacket &, const ftl::protocol::DataPacket &) override; + + bool begin() override; + bool end() override; + bool active() override; + + void reset() override; + void refresh() override; + + bool enable(FrameID id) override; + bool enable(FrameID id, ftl::protocol::Channel c) override; + bool enable(FrameID id, const ftl::protocol::ChannelSet &channels) override; + + void setProperty(ftl::protocol::StreamProperty opt, std::any value) override; + std::any getProperty(ftl::protocol::StreamProperty opt) override; + bool supportsProperty(ftl::protocol::StreamProperty opt) override; + + StreamType type() const override { return StreamType::kRecorded; } + + /** + * Automatically tick through the frames using a timer. Threads are used. + */ + bool run(); + + /** + * Manually tick through the frames one per call. + */ + bool tick(int64_t); + + /** + * Directly read a packet. Returns false if no more packets exist, true + * otherwise. The callback is called when a packet is read. + */ + bool readPacket(ftl::protocol::Packet &); + + enum class Mode { + Read, + Write, + ReadWrite + }; + + inline void setMode(Mode m) { mode_ = m; } + inline void setStart(int64_t ts) { timestamp_ = ts; } + + // TODO(Nick): have standalone function to for validating the file + /// check if valid file/stream + bool isValid(); + + private: + ftl::URI uri_; + std::ofstream *ostream_; + std::ifstream *istream_; + std::thread thread_; + + bool checked_ = false; + Mode mode_; + msgpack::sbuffer buffer_out_; + msgpack::unpacker buffer_in_; + std::list<ftl::protocol::Packet> data_; + int64_t timestart_ = 0; + int64_t timestamp_ = 0; + int64_t interval_ = 50; + int64_t first_ts_ = 0; + bool active_ = false; + int version_ = 0; + bool is_video_ = true; + bool read_error_ = false; + bool looping_ = false; + int framerate_ = 0; + int speed_ = 1; + + struct FramesetData { + size_t frame_count = 0; + bool needs_endframe = true; + std::vector<int> packet_counts; + int64_t timestamp = 0; + int64_t first_ts = -1; + int interval = 50; + }; + std::unordered_map<int, FramesetData> framesets_; + + MUTEX mutex_; + MUTEX data_mutex_; + std::atomic<int> jobs_ = 0; + + bool _open(); + bool _checkFile(); + + /* Apply version patches etc... */ + void _patchPackets(ftl::protocol::StreamPacket *spkt, ftl::protocol::DataPacket *pkt); +}; + +} // namespace protocol +} // namespace ftl diff --git a/src/streams/muxer.cpp b/src/streams/muxer.cpp index a3ec9ad..74fa38b 100644 --- a/src/streams/muxer.cpp +++ b/src/streams/muxer.cpp @@ -72,7 +72,7 @@ void Muxer::add(const std::shared_ptr<Stream> &s, int fsid) { se.fixed_fs = fsid; Muxer::StreamEntry *ptr = &se; - se.handle = std::move(s->onPacket([this, ptr](const StreamPacket &spkt, const Packet &pkt) { + se.handle = std::move(s->onPacket([this, ptr](const StreamPacket &spkt, const DataPacket &pkt) { FrameID newID = _mapFromInput(ptr, FrameID(spkt.streamID, spkt.frame_number)); StreamPacket spkt2 = spkt; @@ -138,7 +138,7 @@ std::shared_ptr<Stream> Muxer::originStream(FrameID id) const { return (p.second) ? p.second->stream : nullptr; } -bool Muxer::post(const StreamPacket &spkt, const Packet &pkt) { +bool Muxer::post(const StreamPacket &spkt, const DataPacket &pkt) { auto p = _mapToOutput(FrameID(spkt.streamID, spkt.frame_number)); if (!p.second) return false; StreamPacket spkt2 = spkt; diff --git a/src/streams/netstream.cpp b/src/streams/netstream.cpp index a780bfc..1097676 100644 --- a/src/streams/netstream.cpp +++ b/src/streams/netstream.cpp @@ -23,7 +23,7 @@ using ftl::protocol::NetStats; using ftl::protocol::StreamPacket; using ftl::protocol::PacketMSGPACK; using ftl::protocol::StreamPacketMSGPACK; -using ftl::protocol::Packet; +using ftl::protocol::DataPacket; using ftl::protocol::Channel; using ftl::protocol::Codec; using ftl::protocol::FrameID; @@ -114,7 +114,7 @@ Net::~Net() { std::this_thread::sleep_for(std::chrono::milliseconds(10)); } -bool Net::post(const StreamPacket &spkt, const Packet &pkt) { +bool Net::post(const StreamPacket &spkt, const DataPacket &pkt) { if (!active_) return false; if (paused_) return true; bool hasStale = false; @@ -127,7 +127,7 @@ bool Net::post(const StreamPacket &spkt, const Packet &pkt) { pkt_strip.codec = pkt.codec; pkt_strip.bitrate = pkt.bitrate; pkt_strip.frame_count = pkt.frame_count; - pkt_strip.flags = pkt.flags; + pkt_strip.dataFlags = pkt.dataFlags; if (host_) { SHARED_LOCK(mutex_, lk); @@ -191,7 +191,7 @@ bool Net::post(const StreamPacket &spkt, const Packet &pkt) { return true; } -void Net::_processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket &spkt_raw, const Packet &pkt) { +void Net::_processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket &spkt_raw, const DataPacket &pkt) { int64_t now = time_point_cast<milliseconds>(high_resolution_clock::now()).time_since_epoch().count(); if (!active_) return; @@ -252,7 +252,7 @@ void Net::_processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket if (pkt.data.size() > 0) _checkRXRate(pkt.data.size(), now-(spkt.timestamp+ttimeoff), spkt.timestamp); } -void Net::inject(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::Packet &pkt) { +void Net::inject(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::DataPacket &pkt) { _processPacket(nullptr, 0, spkt, pkt); } @@ -417,7 +417,7 @@ void Net::_cleanUp() { * batches (max 255 unique frames by timestamp). Requests are in the form * of packets that match the request except the data component is empty. */ -bool Net::_processRequest(ftl::net::Peer *p, StreamPacket *spkt, const Packet &pkt) { +bool Net::_processRequest(ftl::net::Peer *p, StreamPacket *spkt, const DataPacket &pkt) { bool found = false; DLOG(INFO) << "processing request: " << int(spkt->streamID) << ", " << int(spkt->channel); diff --git a/src/streams/netstream.hpp b/src/streams/netstream.hpp index c861077..dde225e 100644 --- a/src/streams/netstream.hpp +++ b/src/streams/netstream.hpp @@ -47,7 +47,7 @@ class Net : public Stream { Net(const std::string &uri, ftl::net::Universe *net, bool host = false); virtual ~Net(); - bool post(const ftl::protocol::StreamPacket &, const ftl::protocol::Packet &) override; + bool post(const ftl::protocol::StreamPacket &, const ftl::protocol::DataPacket &) override; bool begin() override; bool end() override; @@ -84,8 +84,8 @@ class Net : public Stream { static constexpr int kFramesToRequest = 30; // Unit test support - virtual void hasPosted(const ftl::protocol::StreamPacket &, const ftl::protocol::Packet &) {} - void inject(const ftl::protocol::StreamPacket &, const ftl::protocol::Packet &); + virtual void hasPosted(const ftl::protocol::StreamPacket &, const ftl::protocol::DataPacket &) {} + void inject(const ftl::protocol::StreamPacket &, const ftl::protocol::DataPacket &); private: SHARED_MUTEX mutex_; @@ -127,7 +127,7 @@ class Net : public Stream { std::list<detail::StreamClient> clients_; bool _enable(FrameID id); - bool _processRequest(ftl::net::Peer *p, ftl::protocol::StreamPacket *spkt, const ftl::protocol::Packet &pkt); + bool _processRequest(ftl::net::Peer *p, ftl::protocol::StreamPacket *spkt, const ftl::protocol::DataPacket &pkt); void _checkRXRate(size_t rx_size, int64_t rx_latency, int64_t ts); void _checkTXRate(size_t tx_size, int64_t tx_latency, int64_t ts); bool _sendRequest( @@ -138,7 +138,7 @@ class Net : public Stream { uint8_t bitrate, bool doreset = false); void _cleanUp(); - void _processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket &spkt_raw, const Packet &pkt); + void _processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket &spkt_raw, const DataPacket &pkt); }; } // namespace protocol diff --git a/src/streams/packetMsgpack.hpp b/src/streams/packetMsgpack.hpp index f8aca04..170e180 100644 --- a/src/streams/packetMsgpack.hpp +++ b/src/streams/packetMsgpack.hpp @@ -19,12 +19,56 @@ struct StreamPacketMSGPACK : ftl::protocol::StreamPacket { MSGPACK_DEFINE(timestamp, streamID, frame_number, channel, flags); }; -struct PacketMSGPACK : ftl::protocol::Packet { - MSGPACK_DEFINE(codec, reserved, frame_count, bitrate, flags, data); +struct PacketMSGPACK : ftl::protocol::DataPacket { + MSGPACK_DEFINE(codec, reserved, frame_count, bitrate, dataFlags, data); +}; + +class StreamPacker { + public: + explicit StreamPacker(StreamPacket *p) : packet(p) {} + + MSGPACK_DEFINE( + packet->timestamp, + packet->streamID, + packet->frame_number, + packet->channel, + packet->flags); + + StreamPacket *packet; +}; + +class DataPacker { + public: + explicit DataPacker(DataPacket *p) : packet(p) {} + + MSGPACK_DEFINE( + packet->codec, + packet->reserved, + packet->frame_count, + packet->bitrate, + packet->dataFlags, + packet->data); + + DataPacket *packet; +}; + +class Packer { + public: + Packer() : spack_(nullptr), dpack_(nullptr) {} + explicit Packer(Packet *p) : spack_(p), dpack_(p) {} + void set(Packet *p) { + spack_.packet = p; + dpack_.packet = p; + } + + MSGPACK_DEFINE_ARRAY(spack_, dpack_); + private: + StreamPacker spack_; + DataPacker dpack_; }; static_assert(sizeof(StreamPacketMSGPACK) == sizeof(StreamPacket)); -static_assert(sizeof(PacketMSGPACK) == sizeof(Packet)); +static_assert(sizeof(PacketMSGPACK) == sizeof(DataPacket)); } // namespace protocol } // namespace ftl diff --git a/src/streams/streams.cpp b/src/streams/streams.cpp index bf5ba8c..15c6b1d 100644 --- a/src/streams/streams.cpp +++ b/src/streams/streams.cpp @@ -128,7 +128,7 @@ void Stream::reset() { void Stream::refresh() {} -void Stream::trigger(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::Packet &pkt) { +void Stream::trigger(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::DataPacket &pkt) { cb_.trigger(spkt, pkt); } diff --git a/src/uri.cpp b/src/uri.cpp index fcb90df..af8347c 100644 --- a/src/uri.cpp +++ b/src/uri.cpp @@ -5,16 +5,22 @@ */ #include <cstdlib> +#include <regex> +#include <string> #include <ftl/uri.hpp> #include <nlohmann/json.hpp> // #include <filesystem> TODO When available #include <ftl/lib/loguru.hpp> +#include <ftl/exception.hpp> #ifndef WIN32 #include <unistd.h> #else #include <direct.h> +#include <shlwapi.h> + +#pragma comment(lib, "Shlwapi.lib") #endif using ftl::URI; @@ -63,9 +69,20 @@ void URI::_parse(uri_t puri) { #endif } +#ifdef WIN32 + if (std::regex_match(puri, std::regex("^[A-Z]:.*"))) { + suri.resize(1024); + DWORD size = suri.size(); + if (UrlCreateFromPathA(puri, suri.data(), &size, NULL) != S_OK) { + m_valid = false; + return; + } + } +#endif + #ifdef HAVE_URIPARSESINGLE const char *errpos; - if (uriParseSingleUriA(&uri, puri, &errpos) != URI_SUCCESS) { + if (uriParseSingleUriA(&uri, suri.c_str(), &errpos) != URI_SUCCESS) { #else UriParserStateA uris; uris.uri = &uri; @@ -161,9 +178,28 @@ string URI::to_string() const { return (m_qmap.size() > 0) ? m_base + "?" + getQuery() : m_base; } +std::string URI::toFilePath() const { + if (getScheme() != scheme_t::SCHEME_FILE && getScheme() != scheme_t::SCHEME_NONE) { + throw FTL_Error("Not a file URI"); + } + +#ifdef WIN32 + std::string result; + result.resize(1024); + DWORD size = result.size(); + auto base = getBaseURI(); + if (PathCreateFromUrlA(base.c_str(), result.data(), &size, NULL) != S_OK) { + throw FTL_Error("Not a file URI"); + } + return result; +#else + return getPath(); +#endif +} + string URI::getPathSegment(int n) const { size_t N = (n < 0) ? m_pathseg.size()+n : n; - if (N < 0 || N >= m_pathseg.size()) return ""; + if (N >= m_pathseg.size()) return ""; else return m_pathseg[N]; } diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 824c7ef..0cee8f7 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -95,6 +95,17 @@ target_link_libraries(netstream_unit add_test(NetStreamTest netstream_unit) +### File Stream Unit ########################################################### +add_executable(filestream_unit + $<TARGET_OBJECTS:CatchTestFTL> + ./filestream_unit.cpp +) +target_include_directories(filestream_unit PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../include") +target_link_libraries(filestream_unit + beyond-protocol Threads::Threads ${URIPARSER_LIBRARIES} ${UUID_LIBRARIES} ${OS_LIBS}) + +add_test(FileStreamTest filestream_unit) + ### Webservice E2E ############################################################# add_executable(webservice_e2e $<TARGET_OBJECTS:CatchTestFTL> diff --git a/test/broadcast_unit.cpp b/test/broadcast_unit.cpp index 56f012e..ec7b5ba 100644 --- a/test/broadcast_unit.cpp +++ b/test/broadcast_unit.cpp @@ -9,7 +9,7 @@ using ftl::protocol::Muxer; using ftl::protocol::Broadcast; using ftl::protocol::Stream; using ftl::protocol::StreamPacket; -using ftl::protocol::Packet; +using ftl::protocol::DataPacket; using ftl::protocol::Channel; using ftl::protocol::ChannelSet; using ftl::protocol::FrameID; @@ -19,7 +19,7 @@ class BTestStream : public ftl::protocol::Stream { BTestStream() {}; ~BTestStream() {}; - bool post(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::Packet &pkt) { + bool post(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::DataPacket &pkt) { seen(FrameID(spkt.streamID, spkt.frame_number), spkt.channel); trigger(spkt, pkt); return true; @@ -56,11 +56,11 @@ TEST_CASE("ftl::stream::Broadcast()::write", "[stream]") { StreamPacket tspkt1 = {4,0,0,1,Channel::kColour}; StreamPacket tspkt2 = {4,0,0,1,Channel::kColour}; - auto h1 = s1->onPacket([&tspkt1](const StreamPacket &spkt, const Packet &pkt) { + auto h1 = s1->onPacket([&tspkt1](const StreamPacket &spkt, const DataPacket &pkt) { tspkt1 = spkt; return true; }); - auto h2 = s2->onPacket([&tspkt2](const StreamPacket &spkt, const Packet &pkt) { + auto h2 = s2->onPacket([&tspkt2](const StreamPacket &spkt, const DataPacket &pkt) { tspkt2 = spkt; return true; }); diff --git a/test/filestream_unit.cpp b/test/filestream_unit.cpp new file mode 100644 index 0000000..685fa4b --- /dev/null +++ b/test/filestream_unit.cpp @@ -0,0 +1,122 @@ +#include "catch.hpp" + +#include <filesystem> +#include <ftl/protocol/streams.hpp> +#include <ftl/protocol.hpp> +#include <ftl/time.hpp> + +using ftl::protocol::Channel; +using ftl::protocol::Codec; +using ftl::protocol::StreamPacket; +using ftl::protocol::DataPacket; + +static int ctr = 0; + +TEST_CASE("File write and read", "[stream]") { + std::string file = "ftl_file_stream_test" + std::to_string(ctr++) + ".ftl"; + std::string filename = (std::filesystem::temp_directory_path() / file).string(); + + SECTION("write read single packet") { + auto writer = ftl::createStream(filename); + + REQUIRE( writer->begin() ); + + REQUIRE( writer->post({4,ftl::time::get_time(),2,1, Channel::kConfidence},{Codec::kAny, 0, 0, 0, 0, {'f'}}) ); + + writer->end(); + + auto reader = ftl::getStream(filename); + + StreamPacket tspkt = {4,0,0,1, Channel::kColour}; + auto h = reader->onPacket([&tspkt](const StreamPacket &spkt, const DataPacket &pkt) { + if (spkt.channel == Channel::kEndFrame) return true; + tspkt = spkt; + return true; + }); + REQUIRE( reader->begin() ); + + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + reader->end(); + + //REQUIRE( tspkt.timestamp == 0 ); + REQUIRE( tspkt.streamID == (uint8_t)2 ); + REQUIRE( tspkt.channel == Channel::kConfidence ); + } + + SECTION("write read multiple packets at same timestamp") { + auto writer = ftl::createStream(filename); + + REQUIRE( writer->begin() ); + + REQUIRE( writer->post({5,10,0,1, Channel::kConfidence},{Codec::kAny, 0, 0, 0, 0, {'f'}}) ); + REQUIRE( writer->post({5,10,1,1, Channel::kDepth},{Codec::kAny, 0, 0, 0, 0, {'f'}}) ); + REQUIRE( writer->post({5,10,2,1, Channel::kScreen},{Codec::kAny, 0, 0, 0, 0, {'f'}}) ); + + writer->end(); + + auto reader = ftl::getStream(filename); + + StreamPacket tspkt = {5,0,0,1, Channel::kColour}; + std::atomic_int count = 0; + + auto h = reader->onPacket([&tspkt,&count](const StreamPacket &spkt, const DataPacket &pkt) { + if (spkt.channel == Channel::kEndFrame) return true; + tspkt = spkt; + ++count; + return true; + }); + REQUIRE( reader->begin() ); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + reader->end(); + + REQUIRE( count == 3 ); + REQUIRE( tspkt.timestamp > 0 ); + REQUIRE( tspkt.streamID == 2 ); + REQUIRE( tspkt.channel == Channel::kScreen ); + } + + SECTION("write read multiple packets at different timestamps") { + auto writer = ftl::createStream(filename); + + REQUIRE( writer->begin() ); + + auto time = ftl::time::get_time(); + REQUIRE( writer->post({4,time,0,0, Channel::kConfidence},{Codec::kAny, 0, 1, 0, 0, {'f'}}) ); + REQUIRE( writer->post({4,time+50,0,0,Channel::kDepth},{Codec::kAny, 0, 1, 0, 0, {'f'}}) ); + REQUIRE( writer->post({4,time+2*50,0,0,Channel::kScreen},{Codec::kAny, 0, 1, 0, 0, {'f'}}) ); + + writer->end(); + + auto reader = ftl::getStream(filename); + + StreamPacket tspkt = {0,0,0,1,Channel::kColour}; + int count = 0; + int avgDiff = -1; + std::vector<Channel> channels; + channels.reserve(3); + + auto h = reader->onPacket([&tspkt,&count,&avgDiff, &channels](const StreamPacket &spkt, const DataPacket &pkt) { + if (spkt.channel == Channel::kEndFrame) return true; + + if (tspkt.timestamp > 0) { + avgDiff += spkt.timestamp - tspkt.timestamp; + } + tspkt = spkt; + ++count; + channels.push_back(spkt.channel); + return true; + }); + REQUIRE( reader->begin() ); + + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + avgDiff = avgDiff / (count - 1); + REQUIRE( count == 3 ); + REQUIRE( avgDiff <= 50 ); + REQUIRE( avgDiff >= 48 ); + REQUIRE( channels[0] == Channel::kConfidence ); + REQUIRE( channels[1] == Channel::kDepth ); + REQUIRE( channels[2] == Channel::kScreen ); + } +} diff --git a/test/muxer_unit.cpp b/test/muxer_unit.cpp index 4e099fd..e1ab51f 100644 --- a/test/muxer_unit.cpp +++ b/test/muxer_unit.cpp @@ -9,7 +9,7 @@ using ftl::protocol::Muxer; using ftl::protocol::Broadcast; using ftl::protocol::Stream; using ftl::protocol::StreamPacket; -using ftl::protocol::Packet; +using ftl::protocol::DataPacket; using ftl::protocol::Channel; using ftl::protocol::ChannelSet; using ftl::protocol::FrameID; @@ -19,7 +19,7 @@ class TestStream : public ftl::protocol::Stream { TestStream() {}; ~TestStream() {}; - bool post(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::Packet &pkt) { + bool post(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::DataPacket &pkt) { seen(FrameID(spkt.streamID, spkt.frame_number), spkt.channel); trigger(spkt, pkt); return true; @@ -57,7 +57,7 @@ TEST_CASE("Muxer post, distinct framesets", "[stream]") { ftl::protocol::StreamPacket tspkt = {4,0,0,1, Channel::kColour}; - auto h = s->onPacket([&tspkt](const StreamPacket &spkt, const Packet &pkt) { + auto h = s->onPacket([&tspkt](const StreamPacket &spkt, const DataPacket &pkt) { tspkt = spkt; return true; }); @@ -77,7 +77,7 @@ TEST_CASE("Muxer post, distinct framesets", "[stream]") { mux->add(s2); ftl::protocol::StreamPacket tspkt = {4,0,0,1,Channel::kColour}; - auto h = mux->onPacket([&tspkt](const StreamPacket &spkt, const Packet &pkt) { + auto h = mux->onPacket([&tspkt](const StreamPacket &spkt, const DataPacket &pkt) { tspkt = spkt; return true; }); @@ -94,11 +94,11 @@ TEST_CASE("Muxer post, distinct framesets", "[stream]") { StreamPacket tspkt2 = {4,0,0,1,Channel::kColour}; StreamPacket tspkt3 = {4,0,0,1,Channel::kColour}; - auto h2 = s1->onPacket([&tspkt2](const StreamPacket &spkt, const Packet &pkt) { + auto h2 = s1->onPacket([&tspkt2](const StreamPacket &spkt, const DataPacket &pkt) { tspkt2 = spkt; return true; }); - auto h3 = s2->onPacket([&tspkt3](const StreamPacket &spkt, const Packet &pkt) { + auto h3 = s2->onPacket([&tspkt3](const StreamPacket &spkt, const DataPacket &pkt) { tspkt3 = spkt; return true; }); @@ -125,7 +125,7 @@ TEST_CASE("Muxer post, single frameset", "[stream]") { mux->add(s2,1); StreamPacket tspkt = {4,0,0,1,Channel::kColour}; - auto h = mux->onPacket([&tspkt](const StreamPacket &spkt, const Packet &pkt) { + auto h = mux->onPacket([&tspkt](const StreamPacket &spkt, const DataPacket &pkt) { tspkt = spkt; return true; }); @@ -140,11 +140,11 @@ TEST_CASE("Muxer post, single frameset", "[stream]") { StreamPacket tspkt2 = {4,0,4,4,Channel::kColour}; StreamPacket tspkt3 = {4,0,4,4,Channel::kColour}; - auto h2 = s1->onPacket([&tspkt2](const StreamPacket &spkt, const Packet &pkt) { + auto h2 = s1->onPacket([&tspkt2](const StreamPacket &spkt, const DataPacket &pkt) { tspkt2 = spkt; return true; }); - auto h3 = s2->onPacket([&tspkt3](const StreamPacket &spkt, const Packet &pkt) { + auto h3 = s2->onPacket([&tspkt3](const StreamPacket &spkt, const DataPacket &pkt) { tspkt3 = spkt; return true; }); @@ -175,7 +175,7 @@ TEST_CASE("Muxer read", "[stream]") { mux->add(s2, 0); StreamPacket tspkt = {4,0,0,1,Channel::kColour}; - auto h = mux->onPacket([&tspkt](const StreamPacket &spkt, const Packet &pkt) { + auto h = mux->onPacket([&tspkt](const StreamPacket &spkt, const DataPacket &pkt) { tspkt = spkt; return true; }); @@ -207,7 +207,7 @@ TEST_CASE("Muxer read", "[stream]") { mux->add(s2, 0); StreamPacket tspkt = {4,0,0,1,Channel::kColour}; - auto h = mux->onPacket([&tspkt](const StreamPacket &spkt, const Packet &pkt) { + auto h = mux->onPacket([&tspkt](const StreamPacket &spkt, const DataPacket &pkt) { tspkt = spkt; return true; }); @@ -251,7 +251,7 @@ TEST_CASE("Muxer read multi-frameset", "[stream]") { mux->add(s4,1); StreamPacket tspkt = {4,0,0,1,Channel::kColour}; - auto h = mux->onPacket([&tspkt](const StreamPacket &spkt, const Packet &pkt) { + auto h = mux->onPacket([&tspkt](const StreamPacket &spkt, const DataPacket &pkt) { tspkt = spkt; return true; }); diff --git a/test/netstream_unit.cpp b/test/netstream_unit.cpp index 5cd782c..168b754 100644 --- a/test/netstream_unit.cpp +++ b/test/netstream_unit.cpp @@ -15,7 +15,7 @@ using ftl::protocol::FrameID; using ftl::protocol::StreamProperty; using ftl::protocol::StreamPacket; -using ftl::protocol::Packet; +using ftl::protocol::DataPacket; using ftl::protocol::Channel; using std::this_thread::sleep_for; using std::chrono::milliseconds; @@ -26,7 +26,7 @@ class MockNetStream : public ftl::protocol::Net { public: MockNetStream(const std::string &uri, ftl::net::Universe *net, bool host=false): Net(uri, net, host) {}; - void hasPosted(const StreamPacket &spkt, const Packet &pkt) override { + void hasPosted(const StreamPacket &spkt, const DataPacket &pkt) override { lastSpkt = spkt; } @@ -64,7 +64,7 @@ TEST_CASE("Net stream options") { spkt.frame_number = 2; spkt.channel = Channel::kColour; - Packet pkt; + DataPacket pkt; pkt.frame_count = 1; s1->lastSpkt.timestamp = 0; diff --git a/test/stream_integration.cpp b/test/stream_integration.cpp index 6168147..9eeb565 100644 --- a/test/stream_integration.cpp +++ b/test/stream_integration.cpp @@ -51,10 +51,10 @@ TEST_CASE("TCP Stream", "[net]") { auto s2 = self->getStream("ftl://mystream"); REQUIRE( s2 ); - ftl::protocol::Packet rpkt; + ftl::protocol::DataPacket rpkt; rpkt.bitrate = 20; - auto h = s2->onPacket([&cv, &rpkt](const ftl::protocol::StreamPacket &spkt, const ftl::protocol::Packet &pkt) { + auto h = s2->onPacket([&cv, &rpkt](const ftl::protocol::StreamPacket &spkt, const ftl::protocol::DataPacket &pkt) { rpkt = pkt; cv.notify_one(); return true; @@ -80,7 +80,7 @@ TEST_CASE("TCP Stream", "[net]") { spkt.streamID = 0; spkt.frame_number = 0; spkt.channel = ftl::protocol::Channel::kColour; - ftl::protocol::Packet pkt; + ftl::protocol::DataPacket pkt; pkt.bitrate = 10; pkt.codec = ftl::protocol::Codec::kJPG; pkt.frame_count = 1; -- GitLab