Skip to content
Snippets Groups Projects
filestream.cpp 18 KiB
Newer Older
Nicolas Pope's avatar
Nicolas Pope committed
/**
 * @file filestream.cpp
 * @copyright Copyright (c) 2022 University of Turku, MIT License
 * @author Nicolas Pope
 */

#include <fstream>
#include <unordered_set>
#include <string>
#include <utility>
#include <limits>
#include <algorithm>
#include <filesystem>
Nicolas Pope's avatar
Nicolas Pope committed
#include <thread>
#include <chrono>
#include "filestream.hpp"
#include <ftl/time.hpp>
#include <ftl/counter.hpp>
Nicolas Pope's avatar
Nicolas Pope committed
#include "packetMsgpack.hpp"

#define LOGURU_REPLACE_GLOG 1
#include <loguru.hpp>

using ftl::protocol::File;
using ftl::protocol::StreamPacket;
using ftl::protocol::DataPacket;
using ftl::protocol::Packet;
using std::get;
using ftl::protocol::Channel;
using ftl::protocol::StreamPacketMSGPACK;
using ftl::protocol::PacketMSGPACK;
using std::this_thread::sleep_for;
using std::chrono::milliseconds;
using ftl::protocol::StreamProperty;

File::File(const std::string &uri, bool writeable) :
        Stream(),
        uri_(uri),
        ostream_(nullptr),
        istream_(nullptr),
        active_(false) {
    mode_ = (writeable) ? Mode::Write : Mode::Read;
}

File::File(std::ifstream *is) : Stream(), ostream_(nullptr), istream_(is), active_(false) {
    mode_ = Mode::Read;
}

File::File(std::ofstream *os) : Stream(), ostream_(os), istream_(nullptr), active_(false) {
    mode_ = Mode::Write;
}

File::~File() {
    end();
}

bool File::_checkFile() {
    if (!_open()) return false;

    // Read some packets to identify frame rate.
    int count = 1000;
    int64_t ts = -1000;
    int min_ts_diff = 1000;
    first_ts_ = 10000000000000ll;

    std::unordered_set<ftl::protocol::Codec> codecs_found;

    while (count > 0) {
        Packet data;
        if (!readPacket(data)) {
            break;
        }

        StreamPacket &spkt = data;
        Packet &pkt = data;

        seen(FrameID(spkt.streamID, spkt.frame_number), spkt.channel);

        // TODO(Nick): Extract metadata

        auto &fsdata = framesets_[spkt.streamID];

        codecs_found.emplace(pkt.codec);

        if (fsdata.first_ts < 0) fsdata.first_ts = spkt.timestamp;

        if (spkt.timestamp > 0 && static_cast<int>(spkt.channel) < 32) {
            if (spkt.timestamp > ts) {
                --count;
                auto d = spkt.timestamp - ts;
                if (d < min_ts_diff && d > 0) {
                    min_ts_diff = d;
                }
                ts = spkt.timestamp;
            }
        }
    }

    buffer_in_.reset();
    buffer_in_.remove_nonparsed_buffer();

    checked_ = true;

    is_video_ = count < 9;

    framerate_ = 1000 / min_ts_diff;
    if (!is_video_) {
        looping_ = false;
    }

    interval_ = min_ts_diff;
    for (auto &f : framesets_) {
        f.second.interval = interval_;
    }
    return true;
}

bool File::isValid() {
    return _checkFile();
}

bool File::post(const StreamPacket &s, const DataPacket &p) {
    if (!active_) return false;
    if (mode_ != Mode::Write) {
        // LOG(WARNING) << "Cannot write to read-only ftl file";
        return false;
    }

    // LOG(INFO) << "WRITE: " << s.timestamp << " " << (int)s.channel << " " << p.data.size();

    // Don't write dummy packets to files.
    if (p.data.size() == 0) return true;

    // Discard all data channel packets for now
    // if (!save_data_ && static_cast<int>(s.channel) >= static_cast<int>(ftl::codecs::Channel::Data)) return true;

    StreamPacket s2 = s;

    auto data = std::tie(
        *reinterpret_cast<const StreamPacketMSGPACK*>(&s2),
        *reinterpret_cast<const PacketMSGPACK*>(&p));
    msgpack::sbuffer buffer;
    msgpack::pack(buffer, data);

    UNIQUE_LOCK(mutex_, lk);
    ostream_->write(buffer.data(), buffer.size());
    return ostream_->good();
}

bool File::readPacket(Packet &data) {
    bool partial = false;
    ftl::protocol::Packer pack;

    while ((istream_->good()) || buffer_in_.nonparsed_size() > 0u) {
        if (buffer_in_.nonparsed_size() == 0 || (partial && buffer_in_.nonparsed_size() < 10000000)) {
            buffer_in_.reserve_buffer(10000000);
            istream_->read(buffer_in_.buffer(), buffer_in_.buffer_capacity());
            // if (stream_->bad()) return false;

            int bytes = istream_->gcount();
            if (bytes == 0) return false;
            buffer_in_.buffer_consumed(bytes);
            partial = false;
        }

        msgpack::object_handle msg;
        if (!buffer_in_.next(msg)) {
            partial = true;
            continue;
        }

        msgpack::object obj = msg.get();

        try {
            // Older versions have a different SPKT structure.
            if (version_ < 5) {
                /*std::tuple<StreamPacketV4MSGPACK, PacketMSGPACK> datav4;
                obj.convert(datav4);

                auto &spkt = std::get<0>(data);
                auto &spktv4 = std::get<0>(datav4);
                spkt.version = 4;
                spkt.streamID = spktv4.streamID;
                spkt.channel = spktv4.channel;
                spkt.frame_number = spktv4.frame_number;
                spkt.timestamp = spktv4.timestamp;
                spkt.flags = 0;

                std::get<1>(data) = std::move(std::get<1>(datav4));*/
                error(ftl::protocol::Error::kBadVersion, "Version too old");
                return false;
            } else {
                pack.set(&data);
                obj.convert(pack);
            }
        } catch (std::exception &e) {
Nicolas Pope's avatar
Nicolas Pope committed
            DLOG(INFO) << "Corrupt message: " << buffer_in_.nonparsed_size() << " - " << e.what();
Nicolas Pope's avatar
Nicolas Pope committed
            // active_ = false;
            return false;
        }

        // Correct for older version differences.
        // _patchPackets(&std::get<0>(data), &std::get<1>(data));

        return true;
    }

    return false;
}

void File::_patchPackets(StreamPacket *spkt, DataPacket *pkt) {
    // Fix to clear flags for version 2.
    /*if (version_ <= 2) {
        pkt.flags = 0;
    }
    if (version_ < 4) {
        spkt.frame_number = spkt.streamID;
        spkt.streamID = 0;
        if (isFloatChannel(spkt.channel)) pkt.flags |= ftl::protocol::kFlagFloat;

        auto codec = pkt.codec;
        if (codec == ftl::codecs::codec_t::HEVC) pkt.codec = ftl::codecs::codec_t::HEVC_LOSSLESS;
    }*/

    spkt->version = ftl::protocol::kCurrentFTLVersion;

    // Fix for flags corruption
    if (pkt->data.size() == 0) {
        pkt->dataFlags = 0;
    }
}

bool File::tick(int64_t ts) {
    if (!active_) return false;
    if (mode_ != Mode::Read) {
Nicolas Pope's avatar
Nicolas Pope committed
        DLOG(ERROR) << "Cannot read from a write only file";
Nicolas Pope's avatar
Nicolas Pope committed
        return false;
    }

    // Skip if paused
    // if (value("paused", false)) return true;

    #ifdef DEBUG_MUTEX
    UNIQUE_LOCK(mutex_, lk);
    #else
    std::unique_lock<std::mutex> lk(mutex_, std::defer_lock);
    if (!lk.try_lock()) return true;
    #endif

    if (jobs_ > 0) {
        return true;
    }

    // Check buffer first for frames already read
    size_t complete_count = 0;

    for (auto i = data_.begin(); i != data_.end(); ) {
        auto &fsdata = framesets_[i->streamID];
        if (fsdata.timestamp == 0) fsdata.timestamp = i->timestamp;

        // Limit to file framerate
        if (i->timestamp > ts) {
            break;
        }

        // Is the packet too old?
        if (i->timestamp < fsdata.timestamp) {
            i = data_.erase(i);
            continue;
        }

        if (i->timestamp <= fsdata.timestamp) {
            StreamPacket &spkt = *i;

            if (spkt.channel == Channel::kEndFrame) {
                fsdata.needs_endframe = false;
            }

            if (fsdata.needs_endframe) {
                if (spkt.frame_number < 255) {
                    Packet &pkt = *i;

                    fsdata.frame_count = std::max(
                        fsdata.frame_count,
                        static_cast<size_t>(spkt.frame_number + pkt.frame_count));
                    while (fsdata.packet_counts.size() <= spkt.frame_number) fsdata.packet_counts.push_back(0);
                    ++fsdata.packet_counts[spkt.frame_number];
                } else {
                    // Add frameset packets to frame 0 counts
                    fsdata.frame_count = std::max(fsdata.frame_count, size_t(1));
                    while (fsdata.packet_counts.size() < fsdata.frame_count) fsdata.packet_counts.push_back(0);
                    ++fsdata.packet_counts[0];
                }
            }

            auto j = i;
            ++i;

            // TODO(Nick): Probably better not to do a thread per packet
            ftl::pool.push([this, c = std::move(ftl::Counter(&jobs_)), i = j](int id) {
Nicolas Pope's avatar
Nicolas Pope committed
                StreamPacket &spkt = *i;
                Packet &pkt = *i;

                spkt.localTimestamp = spkt.timestamp;

                trigger(spkt, pkt);

                UNIQUE_LOCK(data_mutex_, dlk);
                data_.erase(i);
            });
        } else {
            ++complete_count;

            if (fsdata.needs_endframe) {
                for (size_t j = 0; j < fsdata.frame_count; ++j) {
                    auto timestamp = fsdata.timestamp;
                    auto sid = i->streamID;
                    auto pcount = fsdata.packet_counts[j];

                    ftl::pool.push([this, timestamp, sid, j, pcount](int id) {
                        // Send final frame packet.
                        StreamPacket spkt;
                        spkt.timestamp = timestamp;
                        spkt.streamID = sid;
                        spkt.flags = 0;
                        spkt.channel = Channel::kEndFrame;

                        DataPacket pkt;
                        pkt.bitrate = 255;
                        pkt.codec = Codec::kInvalid;
                        pkt.packet_count = 1;
                        pkt.frame_count = 1;

                        spkt.frame_number = j;
                        pkt.packet_count = pcount+1;

                        trigger(spkt, pkt);
                    });

                    fsdata.packet_counts[j] = 0;
                }
            } else {
            }

            fsdata.timestamp = i->timestamp;
            if (complete_count == framesets_.size()) break;
        }
    }

    int64_t max_ts = std::numeric_limits<int64_t>::min();
    for (auto &fsd : framesets_) {
        max_ts = std::max(max_ts, (fsd.second.timestamp <= 0) ? timestart_ : fsd.second.timestamp);
    }
    int64_t extended_ts = max_ts + 200;  // Buffer 200ms ahead

    while (!read_error_ && ((active_ && istream_->good()) || buffer_in_.nonparsed_size() > 0u)) {
        UNIQUE_LOCK(data_mutex_, dlk);
        auto &data = data_.emplace_back();
        dlk.unlock();

        bool res = readPacket(data);
        if (!res) {
            dlk.lock();
            data_.pop_back();
            read_error_ = true;
            break;
        }

        auto &fsdata = framesets_[data.streamID];

        if (fsdata.first_ts < 0) {
Nicolas Pope's avatar
Nicolas Pope committed
            DLOG(WARNING) << "Bad first timestamp " << fsdata.first_ts << ", " << data.timestamp;
Nicolas Pope's avatar
Nicolas Pope committed
        }

        // Adjust timestamp
        // FIXME: A potential bug where multiple times are merged into one?
        data.timestamp = (((data.timestamp) - fsdata.first_ts)) + timestart_;
        data.hint_capability =
            ((is_video_) ? 0 : ftl::protocol::kStreamCap_Static) | ftl::protocol::kStreamCap_Recorded;

        if (data.timestamp > extended_ts) {
            break;
        }
    }

    // Force send end frames for static files
    if (data_.size() == 0 && !is_video_) {
        for (auto &fsix : framesets_) {
            auto &fsdata = fsix.second;
            if (fsdata.needs_endframe) {
                fsdata.needs_endframe = false;
                // Send final frame packet.
                StreamPacket spkt;
                spkt.timestamp = fsdata.timestamp;
                spkt.streamID = fsix.first;
                spkt.flags = 0;
                spkt.channel = Channel::kEndFrame;

                DataPacket pkt;
                pkt.bitrate = 255;
                pkt.codec = Codec::kInvalid;
                pkt.packet_count = 1;
                pkt.frame_count = 1;

                for (size_t i = 0; i < fsdata.frame_count; ++i) {
                    spkt.frame_number = i;
                    pkt.packet_count = fsdata.packet_counts[i]+1;
                    fsdata.packet_counts[i] = 0;

                    trigger(spkt, pkt);
                }
            }
        }
    }

    if (data_.size() == 0 && looping_) {
        buffer_in_.reset();
        buffer_in_.remove_nonparsed_buffer();
        _open();

        read_error_ = false;
        timestart_ = ftl::time::get_time();
        for (auto &fsd : framesets_) fsd.second.timestamp = 0;
        return true;
    }

    return data_.size() > 0;
}

bool File::_open() {
    if (istream_ && istream_->is_open()) {
        istream_->clear();
        istream_->seekg(0);
    } else {
        if (!istream_) istream_ = new std::ifstream;
        istream_->open(uri_.toFilePath(), std::ifstream::in | std::ifstream::binary);

        if (!istream_->good()) {
Nicolas Pope's avatar
Nicolas Pope committed
            DLOG(ERROR) << "Could not open file: " << uri_.toFilePath();
Nicolas Pope's avatar
Nicolas Pope committed
            return false;
        }
    }

    ftl::protocol::Header h;
    (*istream_).read(reinterpret_cast<char*>(&h), sizeof(h));
    if (h.magic[0] != 'F' || h.magic[1] != 'T' || h.magic[2] != 'L' || h.magic[3] != 'F') return false;

    if (h.version >= 2) {
        ftl::protocol::IndexHeader ih;
        (*istream_).read(reinterpret_cast<char*>(&ih), sizeof(ih));
    }

    version_ = h.version;
    return true;
}

bool File::run() {
    thread_ = std::thread([this]() {
        while (active_) {
            auto now = ftl::time::get_time();
            tick(now);
            auto used = ftl::time::get_time() - now;
            int64_t spare = interval_ - used;
            // LOG(INFO) << "SLEEP = " << spare;
            sleep_for(milliseconds(std::max(int64_t(1), spare)));
        }
    });

    #ifndef WIN32
    sched_param p;
    p.sched_priority = sched_get_priority_max(SCHED_RR);
    pthread_setschedparam(thread_.native_handle(), SCHED_RR, &p);
    #endif

    // TODO(Nick): Windows thread priority

    return true;
}

bool File::_validateFilename() const {
    std::filesystem::path file = std::filesystem::u8path(uri_.toFilePath());
    if (!std::filesystem::exists(file)) return true;
    if (std::string(file.extension().u8string().c_str()) == ".ftl") return true;
    // TODO(Nick): Could also check directory path
    return false;
}

Nicolas Pope's avatar
Nicolas Pope committed
bool File::begin() {
    if (active_) return true;
    if (mode_ == Mode::Read) {
        if (!checked_) {
            if (!_checkFile()) {
Nicolas Pope's avatar
Nicolas Pope committed
                DLOG(ERROR) << "Could not open file: " << uri_.toFilePath();
                return false;
            }
        }
Nicolas Pope's avatar
Nicolas Pope committed
        _open();

        // Capture current time to adjust timestamps
        timestart_ = ftl::time::get_time();
        active_ = true;
        read_error_ = false;

        tick(timestart_);  // Do some now!
        run();
    } else if (mode_ == Mode::Write) {
        if (!ostream_) ostream_ = new std::ofstream;
        if (!_validateFilename()) return false;
Nicolas Pope's avatar
Nicolas Pope committed
        ostream_->open(uri_.toFilePath(), std::ifstream::out | std::ifstream::binary);

        if (!ostream_->good()) {
Nicolas Pope's avatar
Nicolas Pope committed
            DLOG(ERROR) << "Could not open file: '" << uri_.toFilePath() << "'";
Nicolas Pope's avatar
Nicolas Pope committed
            return false;
        }

        ftl::protocol::Header h;
        (*ostream_).write((const char*)&h, sizeof(h));

        ftl::protocol::IndexHeader ih;
        ih.reserved[0] = -1;
        (*ostream_).write((const char*)&ih, sizeof(ih));

        // Capture current time to adjust timestamps
        timestart_ = ftl::time::get_time();
        active_ = true;
        interval_ = 50;  // TODO(Nick): Where to get this from?
    }

    return true;
}

bool File::end() {
    if (!active_) return false;
    active_ = false;

    if (thread_.joinable()) thread_.join();

    UNIQUE_LOCK(mutex_, lk);

    while (jobs_ > 0) {
        std::this_thread::sleep_for(std::chrono::milliseconds(1));
    }

    if (mode_ == Mode::Read) {
        if (istream_) {
            istream_->close();
            delete istream_;
            istream_ = nullptr;
        }
    } else if (mode_ == Mode::Write) {
        if (ostream_) {
            ostream_->close();
            delete ostream_;
            ostream_ = nullptr;
        }
    }
    return true;
}

void File::reset() {
    /*UNIQUE_LOCK(mutex_, lk);

    // TODO: Find a better solution
    while (jobs_ > 0) std::this_thread::sleep_for(std::chrono::milliseconds(2));

    data_.clear();
    buffer_in_.reset();
    buffer_in_.remove_nonparsed_buffer();
    _open();

    timestart_ = (ftl::timer::get_time() / ftl::timer::getInterval()) * ftl::timer::getInterval();
    //timestamp_ = timestart_;
    for (auto &fsd : framesets_) fsd.second.timestamp = timestart_;*/
}

bool File::active() {
    return active_;
}

void File::refresh() {}

bool File::enable(FrameID id) {
    return Stream::enable(id);
}

bool File::enable(FrameID id, ftl::protocol::Channel c) {
    return Stream::enable(id, c);
}

bool File::enable(FrameID id, const ftl::protocol::ChannelSet &channels) {
    return Stream::enable(id, channels);
}

void File::setProperty(StreamProperty opt, std::any value) {
    switch (opt) {
    case StreamProperty::kFrameRate         :
    case StreamProperty::kURI               :   throw FTL_Error("Readonly property");
    case StreamProperty::kLooping           :   looping_ = std::any_cast<bool>(value); break;
    case StreamProperty::kSpeed             :   speed_ = std::any_cast<int>(value); break;
    default                                 :   throw FTL_Error("Property not supported");
    }
}

std::any File::getProperty(StreamProperty opt) {
    switch (opt) {
    case StreamProperty::kSpeed             :   return speed_;
    case StreamProperty::kFrameRate         :   return framerate_;
    case StreamProperty::kLooping           :   return looping_;
    case StreamProperty::kURI               :   return uri_.getBaseURI();
    default                                 :   throw FTL_Error("Property not supported");
    }
}

bool File::supportsProperty(StreamProperty opt) {
    switch (opt) {
    case StreamProperty::kSpeed             :
    case StreamProperty::kFrameRate         :
    case StreamProperty::kLooping           :
    case StreamProperty::kURI               :   return true;
    default                                 :   return false;
    }
}