diff --git a/applications/gui2/src/inputoutput.cpp b/applications/gui2/src/inputoutput.cpp index 9349b232997ec283651c748ad9b62b8b69421e35..5ee5e03f8fd748580e2c8eec760419e063d27dc0 100644 --- a/applications/gui2/src/inputoutput.cpp +++ b/applications/gui2/src/inputoutput.cpp @@ -35,8 +35,7 @@ InputOutput::InputOutput(ftl::Configurable *root, ftl::net::Universe *net) : feed_ = std::unique_ptr<ftl::stream::Feed> (ftl::create<ftl::stream::Feed>(root, "feed", net)); - speaker_ = std::unique_ptr<ftl::audio::Speaker> - (ftl::create<ftl::audio::Speaker>(root, "speaker")); + speaker_ = feed_->speaker(); //auto* f = feed_->filter({ftl::codecs::Channel::Colour, ftl::codecs::Channel::Depth}); //feed_->render(f, Eigen::Matrix4d::Identity()); diff --git a/applications/gui2/src/inputoutput.hpp b/applications/gui2/src/inputoutput.hpp index e80b7546157092e60f82091de64d198492ee775f..98d6a0dbef6b5205b2b97a57ad324fabbd86741d 100644 --- a/applications/gui2/src/inputoutput.hpp +++ b/applications/gui2/src/inputoutput.hpp @@ -35,13 +35,13 @@ public: ftl::net::Universe* net() const; ftl::ctrl::Master* master() const { return master_.get(); } ftl::stream::Feed* feed() const { return feed_.get(); } - ftl::audio::Speaker* speaker() const { return speaker_.get(); } + ftl::audio::Speaker* speaker() const { return speaker_; } private: ftl::net::Universe* net_; std::unique_ptr<ftl::stream::Feed> feed_; std::unique_ptr<ftl::ctrl::Master> master_; - std::unique_ptr<ftl::audio::Speaker> speaker_; + ftl::audio::Speaker *speaker_; }; diff --git a/applications/gui2/src/modules/camera.cpp b/applications/gui2/src/modules/camera.cpp index 3d787aaae181efe55fbb852b82426945ba0a84cf..779c2631f2e4d598e7a4012123d36e2f36805513 100644 --- a/applications/gui2/src/modules/camera.cpp +++ b/applications/gui2/src/modules/camera.cpp @@ -251,17 +251,26 @@ void Camera::activate(ftl::data::FrameID id) { //std::mutex m; //std::condition_variable cv; + io->speaker()->reset(); + io->feed()->mixer().reset(); + filter_ = io->feed()->filter(std::unordered_set<unsigned int>{id.frameset()}, {Channel::Left}); filter_->on( - [this, speaker = io->speaker()](ftl::data::FrameSetPtr fs){ + [this, feed = io->feed(), speaker = io->speaker()](ftl::data::FrameSetPtr fs){ if (paused_) return true; std::atomic_store(¤t_fs_, fs); std::atomic_store(&latest_, fs); // Deal with audio - if (fs->frames[frame_idx].hasOwn(Channel::AudioStereo)) { - speaker->queue(fs->timestamp(), fs->frames[frame_idx]); + //if (fs->frames[frame_idx].hasOwn(Channel::AudioStereo)) { + // speaker->queue(fs->timestamp(), fs->frames[frame_idx]); + //} + + if (feed->mixer().frames() > 0) { + ftl::audio::Audio aframe; + feed->mixer().read(aframe.data(), feed->mixer().frames()); + speaker->queue(fs->timestamp(), aframe); } // Need to notify GUI thread when first data comes @@ -348,15 +357,7 @@ void Camera::toggleOverlay() { } ftl::audio::StereoMixerF<100> *Camera::mixer() { - if (mixer_) return mixer_; - if (movable_) { - auto *rend = io->feed()->getRenderer(frame_id_); - if (rend) { - mixer_ = &(rend->mixer()); - return mixer_; - } - } - return nullptr; + return &io->feed()->mixer(); } bool Camera::isRecording() { diff --git a/applications/gui2/src/views/thumbnails.cpp b/applications/gui2/src/views/thumbnails.cpp index b04a6428da734a5d6bda323c6c9526d5971cc7e2..712264c355fb59aae868564b04b12a11ec4ab91d 100644 --- a/applications/gui2/src/views/thumbnails.cpp +++ b/applications/gui2/src/views/thumbnails.cpp @@ -165,7 +165,7 @@ void Thumbnails::updateThumbnails() { continue; } - auto* tab = tabwidget_->createTab(framesets[fsid]->name()); + auto* tab = tabwidget_->createTab(fs->name()); tab->setLayout(new nanogui::BoxLayout (nanogui::Orientation::Vertical, nanogui::Alignment::Middle, 40)); auto* panel = new nanogui::Widget(tab); diff --git a/applications/tools/CMakeLists.txt b/applications/tools/CMakeLists.txt index 8a6d662a3de011e2a0a2670716ddb5a924fe9ddd..3ff204be172c27bd48b25bcdfba1c53379903bed 100644 --- a/applications/tools/CMakeLists.txt +++ b/applications/tools/CMakeLists.txt @@ -6,3 +6,4 @@ add_subdirectory(middlebury_gen) add_subdirectory(simple_viewer) +add_subdirectory(recorder) diff --git a/applications/tools/recorder/CMakeLists.txt b/applications/tools/recorder/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..398d5c3307a68429091cbc579dfbfebbb4418ccb --- /dev/null +++ b/applications/tools/recorder/CMakeLists.txt @@ -0,0 +1,21 @@ +# Need to include staged files and libs +#include_directories(${PROJECT_SOURCE_DIR}/reconstruct/include) +#include_directories(${PROJECT_BINARY_DIR}) + +set(RECSRC + src/main.cpp +) + +add_executable(ftl-recorder ${RECSRC}) + +#target_include_directories(ftl-reconstruct PUBLIC +# $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> +# $<INSTALL_INTERFACE:include> +# PRIVATE src) + +if (CUDA_FOUND) +set_property(TARGET ftl-recorder PROPERTY CUDA_SEPARABLE_COMPILATION ON) +endif() + +#target_include_directories(cv-node PUBLIC ${PROJECT_SOURCE_DIR}/include) +target_link_libraries(ftl-recorder ftlcommon ftlrgbd Threads::Threads ${OpenCV_LIBS} ftlctrl ftlnet ftlrender ftloperators ftlstreams ftlaudio) diff --git a/applications/tools/recorder/src/main.cpp b/applications/tools/recorder/src/main.cpp new file mode 100644 index 0000000000000000000000000000000000000000..89b24b6e8086a4e0f15891fbb4d9ddd81ce85ddc --- /dev/null +++ b/applications/tools/recorder/src/main.cpp @@ -0,0 +1,147 @@ +#include <ftl/configuration.hpp> +#include <ftl/net.hpp> +#include <ftl/master.hpp> +#include <nlohmann/json.hpp> +#include <loguru.hpp> + +#include <ftl/streams/filestream.hpp> +#include <ftl/streams/netstream.hpp> + +#include <unordered_set> + +using ftl::net::Universe; +using ftl::codecs::Channel; +using std::vector; +using std::string; + + +static std::atomic_int src_count = 0; + + +static void run(ftl::Configurable *root) { + + Universe *net = ftl::create<Universe>(root, "net"); + ftl::ctrl::Master ctrl(root, net); + + ftl::stream::Muxer *mux_in = ftl::create<ftl::stream::Muxer>(root, "muxer"); + ftl::stream::File *file_out = ftl::create<ftl::stream::File>(root, "output"); + + std::unordered_set<ftl::codecs::Channel> channels; + channels.insert(Channel::Colour); + + if (root->value("depth", false)) channels.insert(Channel::Depth); + if (root->value("right", false)) channels.insert(Channel::Right); + if (root->value("audio", false)) channels.insert(Channel::Audio); + + file_out->set("filename", root->value("filename", std::string("out.ftl"))); + file_out->setMode(ftl::stream::File::Mode::Write); + file_out->begin(); + + auto h1 = mux_in->onPacket([file_out](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + file_out->post(spkt, pkt); + return true; + }); + + mux_in->begin(); + + net->onConnect([mux_in,net,root,&channels](ftl::net::Peer *p) { + ftl::pool.push([mux_in,root,net,p,&channels](int id) { + try { + auto peerstreams = p->call<std::vector<std::string>>("list_streams"); + + for (const auto &s : peerstreams) { + int fsid = src_count++; + + auto *ns = ftl::create<ftl::stream::Net>(root, std::string("input") + std::to_string(fsid), net); + ns->set("uri", s); + mux_in->add(ns, fsid); + mux_in->begin(); + mux_in->select(fsid, channels, true); + + LOG(INFO) << "Recording: " << s; + } + } catch (...) { + + } + }); + }); + + if (net->isBound("add_stream")) net->unbind("add_stream"); + net->bind("add_stream", [mux_in,root,net,&channels](ftl::net::Peer &p, std::string uri){ + int fsid = src_count++; + + auto *ns = ftl::create<ftl::stream::Net>(root, std::string("input") + std::to_string(fsid), net); + ns->set("uri", uri); + mux_in->add(ns, fsid); + mux_in->begin(); + mux_in->select(fsid, channels, true); + + LOG(INFO) << "Recording: " << uri; + }); + + net->start(); + net->waitConnections(); + + // Add sources here + if (root->getConfig().contains("sources")) { + for (const auto &s : root->getConfig()["sources"]) { + ftl::URI uri(s); + auto scheme = uri.getScheme(); + if (scheme == ftl::URI::scheme_t::SCHEME_TCP || scheme == ftl::URI::scheme_t::SCHEME_WS) { + net->connect(s); + } else { + LOG(ERROR) << "Unsupported URI: " << s; + } + } + } + + // Add sources from command line as well + auto paths = root->get<vector<string>>("paths"); + + for (auto &x : *paths) { + if (x != "") { + ftl::URI uri(x); + auto scheme = uri.getScheme(); + if (scheme == ftl::URI::scheme_t::SCHEME_TCP || scheme == ftl::URI::scheme_t::SCHEME_WS) { + net->connect(x); + } else { + LOG(ERROR) << "Unsupported URI: " << x; + } + } + } + + // Just do whatever jobs are available + while (ftl::running) { + auto f = ftl::pool.pop(); + if (f) { + f(-1); + } else { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + } + + mux_in->end(); + file_out->end(); + delete mux_in; + delete file_out; + + ftl::config::save(); + + net->shutdown(); + LOG(INFO) << "Stopping..."; + //ftl::timer::stop(true); + //LOG(INFO) << "Timer stopped..."; + ftl::pool.stop(true); + LOG(INFO) << "All threads stopped."; + + delete net; +} + +int main(int argc, char **argv) { + run(ftl::configure(argc, argv, "recorder_default")); + + // Save config changes and delete final objects + ftl::config::cleanup(); + + return ftl::exit_code; +} diff --git a/components/audio/include/ftl/audio/mixer.hpp b/components/audio/include/ftl/audio/mixer.hpp index 5d496217a87696ed402fd5f8dca57aab6a6c44d1..231b399850481a697d13cbf4a7e8c2edc8ab5378 100644 --- a/components/audio/include/ftl/audio/mixer.hpp +++ b/components/audio/include/ftl/audio/mixer.hpp @@ -59,6 +59,7 @@ class FixedMixer : public ftl::audio::Buffer<T> { Buffer<T>::reset(); write_position_ = 0; read_position_ = 0; + for (auto &t : tracks_) t.reset(); } inline int writePosition() const { return write_position_; } diff --git a/components/audio/include/ftl/audio/speaker.hpp b/components/audio/include/ftl/audio/speaker.hpp index 89a890af63e77127b8f83926cc24ac51321ede94..d03e552761f0512e2b3f982d68ef852460f9cd8b 100644 --- a/components/audio/include/ftl/audio/speaker.hpp +++ b/components/audio/include/ftl/audio/speaker.hpp @@ -19,11 +19,14 @@ class Speaker : public ftl::Configurable { ~Speaker(); void queue(int64_t ts, ftl::audio::Frame &fs); + void queue(int64_t ts, const ftl::audio::Audio &af); void setDelay(int64_t ms); void setVolume(float value); float volume(); + void reset() { if (buffer_) buffer_->reset(); } + private: ftl::audio::Buffer<float> *buffer_; bool active_; diff --git a/components/audio/src/speaker.cpp b/components/audio/src/speaker.cpp index d83c67b232648024404dcada14303a83042bd127..61935c31519b10ce7f6605513325dbfef83ab0ef 100644 --- a/components/audio/src/speaker.cpp +++ b/components/audio/src/speaker.cpp @@ -144,13 +144,24 @@ void Speaker::queue(int64_t ts, ftl::audio::Frame &frame) { //LOG(INFO) << "Audio delay: " << buffer_.delay() << "s"; } +void Speaker::queue(int64_t ts, const ftl::audio::Audio &d) { + if (!buffer_) { + _open(960, 48000, 2); + } + if (!buffer_) return; + + //LOG(INFO) << "Buffer Fullness (" << ts << "): " << buffer_->size() << " - " << audio.size(); + buffer_->write(d.data()); + //LOG(INFO) << "Audio delay: " << buffer_.delay() << "s"; +} + void Speaker::setDelay(int64_t ms) { ms -= latency_; float d = static_cast<float>(ms) / 1000.0f + extra_delay_; if (d < 0.0f) d = 0.001f; // Clamp to 0 delay (not ideal to be exactly 0) if (buffer_) { buffer_->setDelay(d); - //LOG(INFO) << "Audio delay: " << buffer_->delay(); + LOG(INFO) << "Audio delay: " << buffer_->delay(); } } diff --git a/components/common/cpp/include/ctpl_stl.h b/components/common/cpp/include/ctpl_stl.h index ebc0a8a1b1035a1d069dd92cb90a4c6851bf4516..245c62425fb638553cbdd02244f8deac0625bab5 100644 --- a/components/common/cpp/include/ctpl_stl.h +++ b/components/common/cpp/include/ctpl_stl.h @@ -81,7 +81,7 @@ namespace ctpl { // the destructor waits for all the functions in the queue to be finished ~thread_pool() { - this->stop(false); + this->stop(true); } // get the number of running threads in the pool diff --git a/components/common/cpp/include/ftl/threads.hpp b/components/common/cpp/include/ftl/threads.hpp index 1e60a20077da11dedc9fc026f6f9932a2c2d9609..c40ed095b5075afe0b4df7409c48ace45b8328cc 100644 --- a/components/common/cpp/include/ftl/threads.hpp +++ b/components/common/cpp/include/ftl/threads.hpp @@ -7,7 +7,7 @@ #define POOL_SIZE 10 -#define DEBUG_MUTEX +//#define DEBUG_MUTEX #define MUTEX_TIMEOUT 2 #if defined DEBUG_MUTEX diff --git a/components/net/cpp/src/peer.cpp b/components/net/cpp/src/peer.cpp index 8e3e73151f80bb3bee610f0beb99aa11757cb596..1bf4b3d7e340c19afe40be40d4045a9ddc83dbd4 100644 --- a/components/net/cpp/src/peer.cpp +++ b/components/net/cpp/src/peer.cpp @@ -80,6 +80,8 @@ static SOCKET tcpConnect(URI &uri, size_t ssize, size_t rsize) { int flags =1; if (setsockopt(csocket, IPPROTO_TCP, TCP_NODELAY, (const char *)&flags, sizeof(flags))) { LOG(ERROR) << "ERROR: setsocketopt(), TCP_NODELAY"; }; + LOG(INFO) << "TcpConnect buffers: " << ssize << ", " << rsize; + int a = static_cast<int>(rsize); if (setsockopt(csocket, SOL_SOCKET, SO_RCVBUF, (const char *)&a, sizeof(int)) == -1) { fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno)); diff --git a/components/operators/src/weighting.cu b/components/operators/src/weighting.cu index 90aa5fd494f19e20a28eb7f6dc3ff822bd5c05ce..b9c74ff89d689657d6bfdb8ff5452e94f25377af 100644 --- a/components/operators/src/weighting.cu +++ b/components/operators/src/weighting.cu @@ -26,6 +26,8 @@ __global__ void pixel_weight_kernel( if (x < size.width && y < size.height) { Mask mask(mask_out(x,y)); + if (normals_out.isValid()) normals_out(x,y) = make_half4(0.0f); + const float d = depth.tex2D((int)x, (int)y); // Multiples of pixel size at given depth //const float threshold = (depthCoef / ((depthCoef / d) - (radius+disconDisparities-1))) - d; diff --git a/components/streams/include/ftl/streams/feed.hpp b/components/streams/include/ftl/streams/feed.hpp index 4aab46ca29d8106d955022529b7699e152511c22..7a5d05794dcc53b7764db81ccbdbfa313a858a1f 100644 --- a/components/streams/include/ftl/streams/feed.hpp +++ b/components/streams/include/ftl/streams/feed.hpp @@ -9,6 +9,8 @@ #include <ftl/rgbd/source.hpp> #include <ftl/data/framepool.hpp> +#include <ftl/audio/mixer.hpp> +#include <ftl/audio/speaker.hpp> #include <ftl/streams/stream.hpp> #include <ftl/streams/receiver.hpp> @@ -138,6 +140,10 @@ public: */ void render(); + inline ftl::audio::StereoMixerF<100> &mixer() { return mixer_; } + + ftl::audio::Speaker *speaker() { return speaker_; } + void startRecording(Filter *, const std::string &filename); void startStreaming(Filter *, const std::string &filename); void startStreaming(Filter *); @@ -178,6 +184,7 @@ private: std::condition_variable cv_net_connect_; ftl::net::Universe* const net_; + ftl::audio::Speaker *speaker_; std::unique_ptr<ftl::data::Pool> pool_; std::unique_ptr<ftl::stream::Intercept> interceptor_; // multiple streams to single fs @@ -186,6 +193,7 @@ private: // streams to fs std::unique_ptr<ftl::stream::Receiver> receiver_; ftl::Handle handle_receiver_; + ftl::Handle handle_rec_error_; // framesets to stream std::unique_ptr<ftl::stream::Sender> sender_; @@ -221,6 +229,14 @@ private: uint32_t fs_counter_ = 0; uint32_t file_counter_ = 0; + struct AudioMixerMapping { + int64_t last_timestamp=0; + int track=-1; + }; + + std::unordered_map<uint32_t, AudioMixerMapping> mixmap_; + ftl::audio::StereoMixerF<100> mixer_; + uint32_t allocateFrameSetId(const std::string &group); void add(uint32_t fsid, const std::string &uri, ftl::stream::Stream *s); @@ -236,6 +252,7 @@ private: void _createPipeline(uint32_t fsid); ftl::operators::Graph* _addPipeline(uint32_t fsid); void _dispatch(const ftl::data::FrameSetPtr &fs); + void _processAudio(const ftl::data::FrameSetPtr &fs); void _beginRecord(Filter *f); void _stopRecording(); diff --git a/components/streams/include/ftl/streams/filestream.hpp b/components/streams/include/ftl/streams/filestream.hpp index 27bee2cdee78401d93104aec99977e7c8ad651b9..a2888c86411328afbd850a5a26c3e71f37e257be 100644 --- a/components/streams/include/ftl/streams/filestream.hpp +++ b/components/streams/include/ftl/streams/filestream.hpp @@ -75,8 +75,16 @@ class File : public Stream { ftl::Handle timer_; bool is_video_; bool save_data_; - bool needs_endframe_ = true; - std::vector<int> packet_counts_; + + struct FramesetData { + size_t frame_count=0; + bool needs_endframe = true; + std::vector<int> packet_counts; + int64_t timestamp = 0; + int64_t first_ts=-1; + int interval=50; + }; + std::unordered_map<int,FramesetData> framesets_; //StreamCallback cb_; MUTEX mutex_; diff --git a/components/streams/include/ftl/streams/receiver.hpp b/components/streams/include/ftl/streams/receiver.hpp index 07f90efeec27fb76a24f43aa09532038e4c19b67..090014243695a08956eeaf12aade4ba50ac30a5d 100644 --- a/components/streams/include/ftl/streams/receiver.hpp +++ b/components/streams/include/ftl/streams/receiver.hpp @@ -37,6 +37,8 @@ class Receiver : public ftl::Configurable, public ftl::data::Generator { */ ftl::Handle onFrameSet(const ftl::data::FrameSetCallback &cb) override; + ftl::Handle onError(const std::function<bool(ftl::data::FrameID)> &cb) { return error_cb_.on(cb); } + ftl::streams::BaseBuilder &builder(uint32_t id); void registerBuilder(const std::shared_ptr<ftl::streams::BaseBuilder> &b); @@ -49,6 +51,7 @@ class Receiver : public ftl::Configurable, public ftl::data::Generator { ftl::stream::Stream *stream_; ftl::data::Pool *pool_; ftl::SingletonHandler<const ftl::data::FrameSetPtr&> callback_; + ftl::Handler<ftl::data::FrameID> error_cb_; std::unordered_map<uint32_t, std::shared_ptr<ftl::streams::BaseBuilder>> builders_; std::unordered_map<uint32_t, ftl::Handle> handles_; ftl::codecs::Channel second_channel_; @@ -60,11 +63,10 @@ class Receiver : public ftl::Configurable, public ftl::data::Generator { struct InternalVideoStates { InternalVideoStates(); - int64_t timestamp; - //ftl::rgbd::Frame frame; + int64_t timestamps[32]; ftl::codecs::Decoder* decoders[32]; cv::cuda::GpuMat surface[32]; - MUTEX mutex; + RECURSIVE_MUTEX mutex; ftl::codecs::Channels<0> completed; int width=0; int height=0; diff --git a/components/streams/include/ftl/streams/stream.hpp b/components/streams/include/ftl/streams/stream.hpp index f67b6fd84d0f856fe80238cf5f6ca79070736fa2..3ddc80149c64ace933ac30bce0fa8cfbbce45ac4 100644 --- a/components/streams/include/ftl/streams/stream.hpp +++ b/components/streams/include/ftl/streams/stream.hpp @@ -118,7 +118,7 @@ class Muxer : public Stream { explicit Muxer(nlohmann::json &config); virtual ~Muxer(); - void add(Stream *, size_t fsid=0); + void add(Stream *, size_t fsid=0, const std::function<int()> &cb=nullptr); void remove(Stream *); //bool onPacket(const StreamCallback &) override; @@ -136,9 +136,10 @@ class Muxer : public Stream { private: struct StreamEntry { Stream *stream; - std::vector<int> maps; + std::unordered_map<int, std::vector<int>> maps; uint32_t original_fsid = 0; ftl::Handle handle; + std::vector<int> ids; }; std::list<StreamEntry> streams_; diff --git a/components/streams/src/baserender.hpp b/components/streams/src/baserender.hpp index 55082964fbc0b6461d2ec59202b199e6ed3a2661..72a51e1336693e7b54e92d47868580178acad0f7 100644 --- a/components/streams/src/baserender.hpp +++ b/components/streams/src/baserender.hpp @@ -34,7 +34,7 @@ class BaseSourceImpl { protected: ftl::render::Source *host_; - ftl::audio::StereoMixerF<100> mixer_; + ftl::audio::StereoMixerF<100> mixer_; // TODO: Remove }; } diff --git a/components/streams/src/builder.cpp b/components/streams/src/builder.cpp index e279b70ccb3520e219aa7a5c2960701104ce9ba8..620225c427516e26ccc60311de80e2c83d390fe5 100644 --- a/components/streams/src/builder.cpp +++ b/components/streams/src/builder.cpp @@ -392,7 +392,7 @@ std::shared_ptr<ftl::data::FrameSet> ForeignBuilder::_getFrameset() { } else { // Force complete of old frame if (framesets_.size() >= completion_size_) { - LOG(WARNING) << "Forced completion: " << framesets_.back()->timestamp(); + LOG(WARNING) << "Forced completion (" << framesets_.back()->frameset() << "): " << framesets_.back()->timestamp(); framesets_.back()->mask = 0xFF; } diff --git a/components/streams/src/feed.cpp b/components/streams/src/feed.cpp index 11df5649776ff63bb1de281687011d4de5abb434..5377ca6c4c914ee8b88028e5a5a8fdeb6c7710e8 100644 --- a/components/streams/src/feed.cpp +++ b/components/streams/src/feed.cpp @@ -102,6 +102,8 @@ Feed::Feed(nlohmann::json &config, ftl::net::Universe*net) : "recorder" }); + speaker_ = ftl::create<ftl::audio::Speaker>(this, "speaker"); + pool_ = std::make_unique<ftl::data::Pool>(3,5); stream_ = std::unique_ptr<ftl::stream::Muxer> @@ -166,6 +168,14 @@ Feed::Feed(nlohmann::json &config, ftl::net::Universe*net) : //std::unique_lock<std::mutex> lk(mtx_); }); + handle_rec_error_ = receiver_->onError([this](ftl::data::FrameID fid) { + LOG(WARNING) << "Receiver error: resetting"; + stream_->reset(); + speaker_->reset(); + mixer_.reset(); + return true; + }); + handle_receiver_ = receiver_->onFrameSet( [this](const ftl::data::FrameSetPtr& fs) { if (value("drop_partial_framesets", false)) { @@ -192,12 +202,15 @@ Feed::Feed(nlohmann::json &config, ftl::net::Universe*net) : }); if (!did_pipe) { - LOG(WARNING) << "Feed Pipeline dropped"; + LOG(WARNING) << "Feed Pipeline dropped (" << fs->frameset() << ")"; ftl::pool.push([this,fs](int id) { _dispatch(fs); }); } + + _processAudio(fs); } else { + _processAudio(fs); _dispatch(fs); } @@ -245,11 +258,47 @@ Feed::~Feed() { interceptor_.reset(); stream_.reset(); + + std::unordered_set<ftl::stream::Stream*> garbage; + for (auto &ls : streams_) { for (auto *s : ls.second) { - delete s; + //delete s; + garbage.insert(s); } } + + for (auto *s : garbage) { + delete s; + } + + delete speaker_; +} + +void Feed::_processAudio(const ftl::data::FrameSetPtr &fs) { + if (mixer_.frames() > 50) { + mixer_.reset(); + } + + for (auto &f : fs->frames) { + // If audio is present, mix with the other frames + if (f.hasChannel(Channel::AudioStereo)) { + // Map a mixer track to this frame + auto &mixmap = mixmap_[f.id().id]; + if (mixmap.track == -1) { + mixmap.track = mixer_.add(f.name()); + } + + // Do mix but must not mix same frame multiple times + if (mixmap.last_timestamp != f.timestamp()) { + const auto &audio = f.get<std::list<ftl::audio::Audio>>(Channel::AudioStereo).front(); + mixer_.write(mixmap.track, audio.data()); + mixmap.last_timestamp = f.timestamp(); + } + } + } + + mixer_.mix(); } void Feed::_dispatch(const ftl::data::FrameSetPtr &fs) { @@ -446,13 +495,13 @@ ftl::operators::Graph* Feed::_addPipeline(uint32_t fsid) { if (devices_.count(fsid)) { pre_pipelines_[fsid] = ftl::config::create<ftl::operators::Graph> - (devices_[fsid], std::string("pipeline")); + (devices_[fsid], std::string("pipeline_")+std::to_string(fsid)); } else if (renderers_.count(fsid)) { pre_pipelines_[fsid] = ftl::config::create<ftl::operators::Graph> - (renderers_[fsid], std::string("pipeline")); + (renderers_[fsid], std::string("pipeline")+std::to_string(fsid)); } else if (streams_.count(fsid)) { pre_pipelines_[fsid] = ftl::config::create<ftl::operators::Graph> - (streams_[fsid].front(), std::string("pipeline")); + (streams_[fsid].front(), std::string("pipeline")+std::to_string(fsid)); } //pre_pipelines_[fsid] = ftl::config::create<ftl::operators::Graph> @@ -786,7 +835,21 @@ void Feed::add(uint32_t fsid, const std::string &uri, ftl::stream::Stream* strea _createPipeline(fsid); - stream_->add(stream, fsid); + stream_->add(stream, fsid, [this,stream]() { + int fsid = 0; + { + UNIQUE_LOCK(mtx_, lk); + fsid = allocateFrameSetId(""); + latest_[fsid] = nullptr; + streams_[fsid].push_back(stream); + _createPipeline(fsid); + + stream_->begin(); + stream_->select(fsid, {Channel::Colour}, true); + } + add_src_cb_.trigger(fsid); + return fsid; + }); stream_->begin(); stream_->select(fsid, {Channel::Colour}, true); } diff --git a/components/streams/src/filestream.cpp b/components/streams/src/filestream.cpp index 35b48d217d4fceb7c289646c90af376e78365d14..a49b743e799413123178a71483a6782988b76c63 100644 --- a/components/streams/src/filestream.cpp +++ b/components/streams/src/filestream.cpp @@ -50,7 +50,7 @@ bool File::_checkFile() { LOG(INFO) << "FTL format version " << version_; // Read some packets to identify frame rate. - int count = 10; + int count = 1000; int64_t ts = -1000; int min_ts_diff = 1000; first_ts_ = 10000000000000ll; @@ -66,9 +66,11 @@ bool File::_checkFile() { auto &spkt = std::get<0>(data); auto &pkt = std::get<1>(data); + auto &fsdata = framesets_[spkt.streamID]; + codecs_found.emplace(pkt.codec); - if (spkt.timestamp < first_ts_) first_ts_ = spkt.timestamp; + if (fsdata.first_ts < 0) fsdata.first_ts = spkt.timestamp; //LOG(INFO) << "TIMESTAMP: " << spkt.timestamp; @@ -101,6 +103,9 @@ bool File::_checkFile() { LOG(INFO) << " -- Codecs:" << codec_str; interval_ = min_ts_diff; + for (auto &f : framesets_) { + f.second.interval = interval_; + } return true; } @@ -238,33 +243,52 @@ bool File::tick(int64_t ts) { //UNIQUE_LOCK(data_mutex_, dlk); if (data_.size() > 0) has_data = true; - if (needs_endframe_) { + /*if (needs_endframe_) { // Reset packet counts for (auto &p : packet_counts_) p = 0; - } + }*/ - size_t frame_count = 0; + size_t complete_count = 0; for (auto i = data_.begin(); i != data_.end(); ) { - if (std::get<0>(*i).timestamp <= timestamp_) { + auto &fsdata = framesets_[std::get<0>(*i).streamID]; + if (fsdata.timestamp == 0) fsdata.timestamp = std::get<0>(*i).timestamp; + + // Limit to file framerate + if (std::get<0>(*i).timestamp > ts) { + break; + } + + if (std::get<0>(*i).timestamp < fsdata.timestamp) { + LOG(WARNING) << "Received old packet: " << std::get<0>(*i).timestamp << " vs " << fsdata.timestamp << " ( channel = " << int(std::get<0>(*i).channel) << " )"; + i = data_.erase(i); + continue; + } + + if (std::get<0>(*i).timestamp <= fsdata.timestamp) { auto &spkt = std::get<0>(*i); auto &pkt = std::get<1>(*i); + //LOG(INFO) << "PACKET: " << spkt.timestamp << ", " << fsdata.timestamp << ", " << int(spkt.streamID) << ", " << int(spkt.channel); + + ++jobs_; - spkt.timestamp = ts; + //spkt.timestamp = ts; - if (spkt.channel == Channel::EndFrame) needs_endframe_ = false; + if (spkt.channel == Channel::EndFrame) { + fsdata.needs_endframe = false; + } - if (needs_endframe_) { + if (fsdata.needs_endframe) { if (spkt.frame_number < 255) { - frame_count = std::max(frame_count, static_cast<size_t>(spkt.frame_number + pkt.frame_count)); - while (packet_counts_.size() < frame_count) packet_counts_.push_back(0); - ++packet_counts_[spkt.frame_number]; + fsdata.frame_count = std::max(fsdata.frame_count, static_cast<size_t>(spkt.frame_number + pkt.frame_count)); + while (fsdata.packet_counts.size() < fsdata.frame_count) fsdata.packet_counts.push_back(0); + ++fsdata.packet_counts[spkt.frame_number]; } else { // Add frameset packets to frame 0 counts - frame_count = std::max(frame_count, size_t(1)); - while (packet_counts_.size() < frame_count) packet_counts_.push_back(0); - ++packet_counts_[0]; + fsdata.frame_count = std::max(fsdata.frame_count, size_t(1)); + while (fsdata.packet_counts.size() < fsdata.frame_count) fsdata.packet_counts.push_back(0); + ++fsdata.packet_counts[0]; } } @@ -290,11 +314,13 @@ bool File::tick(int64_t ts) { --jobs_; }); } else { - if (needs_endframe_) { + ++complete_count; + + if (fsdata.needs_endframe) { // Send final frame packet. StreamPacket spkt; - spkt.timestamp = ts; - spkt.streamID = 0; // FIXME: Allow for non-zero framesets. + spkt.timestamp = fsdata.timestamp; + spkt.streamID = std::get<0>(*i).streamID; spkt.flags = 0; spkt.channel = Channel::EndFrame; @@ -304,9 +330,10 @@ bool File::tick(int64_t ts) { pkt.packet_count = 1; pkt.frame_count = 1; - for (size_t i=0; i<frame_count; ++i) { + for (size_t i=0; i<fsdata.frame_count; ++i) { spkt.frame_number = i; - pkt.packet_count = packet_counts_[i]+1; + pkt.packet_count = fsdata.packet_counts[i]+1; + fsdata.packet_counts[i] = 0; try { cb_.trigger(spkt, pkt); @@ -316,14 +343,18 @@ bool File::tick(int64_t ts) { LOG(ERROR) << "Exception in packet callback: " << e.what(); } } + } else { } - break; + fsdata.timestamp = std::get<0>(*i).timestamp; //fsdata.interval; + if (complete_count == framesets_.size()) break; } } } - int64_t extended_ts = timestamp_ + 200; // Buffer 200ms ahead + int64_t max_ts = 0; + for (auto &fsd : framesets_) max_ts = std::max(max_ts, (fsd.second.timestamp == 0) ? timestart_ : fsd.second.timestamp); + int64_t extended_ts = max_ts + 200; // Buffer 200ms ahead while ((active_ && istream_->good()) || buffer_in_.nonparsed_size() > 0u) { UNIQUE_LOCK(data_mutex_, dlk); @@ -337,9 +368,13 @@ bool File::tick(int64_t ts) { break; } + auto &fsdata = framesets_[std::get<0>(data).streamID]; + + if (fsdata.first_ts < 0) LOG(WARNING) << "Bad first timestamp"; + // Adjust timestamp // FIXME: A potential bug where multiple times are merged into one? - std::get<0>(data).timestamp = (((std::get<0>(data).timestamp) - first_ts_) / interval_) * interval_ + timestart_; + std::get<0>(data).timestamp = (((std::get<0>(data).timestamp) - fsdata.first_ts)) + timestart_; std::get<0>(data).hint_capability = ((is_video_) ? 0 : ftl::codecs::kStreamCap_Static) | ftl::codecs::kStreamCap_Recorded; // Maintain availability of channels. @@ -371,15 +406,18 @@ bool File::tick(int64_t ts) { } } - if (has_data) timestamp_ += interval_; + //if (has_data) { + // for (auto &fsd : framesets_) fsd.second.timestamp += interval_; + //} if (data_.size() == 0 && value("looping", true)) { buffer_in_.reset(); buffer_in_.remove_nonparsed_buffer(); _open(); - timestart_ = (ftl::timer::get_time() / ftl::timer::getInterval()) * ftl::timer::getInterval(); - timestamp_ = timestart_; + timestart_ = ftl::timer::get_time(); // (ftl::timer::get_time() / ftl::timer::getInterval()) * ftl::timer::getInterval(); + //timestamp_ = timestart_; + for (auto &fsd : framesets_) fsd.second.timestamp = 0; return true; } @@ -428,10 +466,11 @@ bool File::begin(bool dorun) { _open(); // Capture current time to adjust timestamps - timestart_ = (ftl::timer::get_time() / ftl::timer::getInterval()) * ftl::timer::getInterval(); + timestart_ = ftl::timer::get_time(); //(ftl::timer::get_time() / ftl::timer::getInterval()) * ftl::timer::getInterval(); active_ = true; //interval_ = 40; - timestamp_ = timestart_; + //timestamp_ = timestart_; + //for (auto &fsd : framesets_) fsd.second.timestamp = timestart_; tick(timestart_); // Do some now! if (dorun) run(); @@ -456,7 +495,9 @@ bool File::begin(bool dorun) { timestart_ = ftl::timer::get_time(); active_ = true; interval_ = ftl::timer::getInterval(); - timestamp_ = timestart_; + //timestamp_ = timestart_; + + //for (auto &fsd : framesets_) fsd.second.timestamp = timestart_; } return true; @@ -491,7 +532,7 @@ bool File::end() { } void File::reset() { - UNIQUE_LOCK(mutex_, lk); + /*UNIQUE_LOCK(mutex_, lk); // TODO: Find a better solution while (jobs_ > 0) std::this_thread::sleep_for(std::chrono::milliseconds(2)); @@ -502,7 +543,8 @@ void File::reset() { _open(); timestart_ = (ftl::timer::get_time() / ftl::timer::getInterval()) * ftl::timer::getInterval(); - timestamp_ = timestart_; + //timestamp_ = timestart_; + for (auto &fsd : framesets_) fsd.second.timestamp = timestart_;*/ } bool File::active() { diff --git a/components/streams/src/receiver.cpp b/components/streams/src/receiver.cpp index b53fc49a7beea2f4521dbc60757c02fbdf1cafde..97c58c7dcf8f727a8b8dd58321caf15e33170b88 100644 --- a/components/streams/src/receiver.cpp +++ b/components/streams/src/receiver.cpp @@ -116,8 +116,10 @@ void Receiver::_createDecoder(InternalVideoStates &frame, int chan, const ftl::c } Receiver::InternalVideoStates::InternalVideoStates() { - for (int i=0; i<32; ++i) decoders[i] = nullptr; - timestamp = -1; + for (int i=0; i<32; ++i) { + decoders[i] = nullptr; + timestamps[i] = 0; + } } Receiver::InternalVideoStates &Receiver::_getVideoFrame(const StreamPacket &spkt, int ix) { @@ -125,7 +127,8 @@ Receiver::InternalVideoStates &Receiver::_getVideoFrame(const StreamPacket &spkt UNIQUE_LOCK(mutex_, lk); while (video_frames_[spkt.streamID].size() <= fn) { - video_frames_[spkt.streamID].push_back(new InternalVideoStates); + auto *ns = new InternalVideoStates; + video_frames_[spkt.streamID].push_back(ns); } auto &f = *video_frames_[spkt.streamID][fn]; @@ -270,6 +273,38 @@ void Receiver::_processVideo(const StreamPacket &spkt, const Packet &pkt) { return; } + { + UNIQUE_LOCK(ividstate.mutex, lk); + + /*if (ividstate.timestamps[int(spkt.channel)] != 0 && spkt.timestamp > ividstate.timestamps[int(spkt.channel)] + ividstate.interval) { + if (spkt.timestamp > ividstate.timestamps[int(spkt.channel)] + 3*ividstate.interval) { + LOG(ERROR) << "Multiple frame delays or drops, discarding totally"; + ividstate.timestamps[int(spkt.channel)] = std::max(ividstate.timestamps[int(spkt.channel)], spkt.timestamp); + return; + } else if (spkt.timestamp > ividstate.timestamps[int(spkt.channel)] + 2*ividstate.interval) { + LOG(WARNING) << "Frame was dropped, do the todo item now: " << ividstate.interval << "," << spkt.timestamp << "," << ividstate.timestamps[int(spkt.channel)]; + if (ividstate.todos[int(spkt.channel)].first.timestamp > ividstate.timestamps[int(spkt.channel)]) { + _processVideo(ividstate.todos[int(spkt.channel)].first, ividstate.todos[int(spkt.channel)].second); + } + } else { + LOG(WARNING) << "Future frame received early: " << (spkt.timestamp - ividstate.timestamps[int(spkt.channel)] + ividstate.interval); + ividstate.todos[int(spkt.channel)].first = spkt; + ividstate.todos[int(spkt.channel)].second = pkt; + return; + } + }*/ + if (spkt.timestamp < ividstate.timestamps[int(spkt.channel)]) { + lk.unlock(); + LOG(ERROR) << "Out-of-order decode"; + _terminateVideoPacket(spkt, pkt); + error_cb_.trigger(ftl::data::FrameID(spkt.streamID, spkt.frame_number)); + return; + }// else if (spkt.timestamp > ividstate.timestamps[int(spkt.channel)]) { + // ividstate.interval = std::min(ividstate.interval, spkt.timestamp - ividstate.timestamps[int(spkt.channel)]); + //} + ividstate.timestamps[int(spkt.channel)] = std::max(ividstate.timestamps[int(spkt.channel)], spkt.timestamp); + } + cv::cuda::GpuMat surface; //bool is_static = ividstate.decoders[channum] && (spkt.hint_capability & ftl::codecs::kStreamCap_Static); @@ -315,58 +350,61 @@ void Receiver::_processVideo(const StreamPacket &spkt, const Packet &pkt) { // Get the frameset auto &build = builder(spkt.streamID); - auto fs = build.get(spkt.timestamp, spkt.frame_number+pkt.frame_count-1); - if (!fs) { - LOG(WARNING) << "Dropping a video frame"; - return; - } + { + auto fs = build.get(spkt.timestamp, spkt.frame_number+pkt.frame_count-1); + + if (!fs) { + LOG(WARNING) << "Dropping a video frame"; + return; + } - auto cvstream = cv::cuda::StreamAccessor::wrapStream(decoder->stream()); + auto cvstream = cv::cuda::StreamAccessor::wrapStream(decoder->stream()); - // Mark a frameset as being partial - if (pkt.flags & ftl::codecs::kFlagPartial) { - fs->markPartial(); - } + // Mark a frameset as being partial + if (pkt.flags & ftl::codecs::kFlagPartial) { + fs->markPartial(); + } - // Now split the tiles from surface into frames, doing colour conversions - // at the same time. - // Note: Done in reverse to allocate correct number of frames first time round - // FIXME: Don't do this copy for single tiles - for (int i=pkt.frame_count-1; i>=0; --i) { - //InternalVideoStates &vidstate = _getVideoFrame(spkt,i); - auto &frame = fs->frames[spkt.frame_number+i]; + // Now split the tiles from surface into frames, doing colour conversions + // at the same time. + // Note: Done in reverse to allocate correct number of frames first time round + // FIXME: Don't do this copy for single tiles + for (int i=pkt.frame_count-1; i>=0; --i) { + //InternalVideoStates &vidstate = _getVideoFrame(spkt,i); + auto &frame = fs->frames[spkt.frame_number+i]; - //if (!frame.origin()) frame.setOrigin(&vidstate.state); + //if (!frame.origin()) frame.setOrigin(&vidstate.state); - if (frame.hasChannel(spkt.channel)) { - LOG(WARNING) << "Previous frame not complete: " << spkt.timestamp; - } + if (frame.hasChannel(spkt.channel)) { + LOG(WARNING) << "Previous frame not complete: " << spkt.timestamp; + } - // Add channel to frame and allocate memory if required - const cv::Size size = cv::Size(width, height); - auto &buf = frame.createChange<ftl::rgbd::VideoFrame>(spkt.channel, build.changeType(), pkt).createGPU(); - buf.create(size, ftl::codecs::type(spkt.channel)); //(isFloatChannel(rchan) ? CV_32FC1 : CV_8UC4)); + // Add channel to frame and allocate memory if required + const cv::Size size = cv::Size(width, height); + auto &buf = frame.createChange<ftl::rgbd::VideoFrame>(spkt.channel, build.changeType(), pkt).createGPU(); + buf.create(size, ftl::codecs::type(spkt.channel)); //(isFloatChannel(rchan) ? CV_32FC1 : CV_8UC4)); - cv::Rect roi((i % tx)*width, (i / tx)*height, width, height); - cv::cuda::GpuMat sroi = surface(roi); - sroi.copyTo(buf, cvstream); - } + cv::Rect roi((i % tx)*width, (i / tx)*height, width, height); + cv::cuda::GpuMat sroi = surface(roi); + sroi.copyTo(buf, cvstream); + } - // Must ensure all processing is finished before completing a frame. - //cudaSafeCall(cudaStreamSynchronize(decoder->stream())); + // Must ensure all processing is finished before completing a frame. + //cudaSafeCall(cudaStreamSynchronize(decoder->stream())); - cudaSafeCall(cudaEventRecord(decoder->event(), decoder->stream())); - //for (int i=0; i<pkt.frame_count; ++i) { - // cudaSafeCall(cudaStreamWaitEvent(fs->frames[spkt.frame_number+i].stream(), decoder->event(), 0)); - //} + cudaSafeCall(cudaEventRecord(decoder->event(), decoder->stream())); + //for (int i=0; i<pkt.frame_count; ++i) { + // cudaSafeCall(cudaStreamWaitEvent(fs->frames[spkt.frame_number+i].stream(), decoder->event(), 0)); + //} - // For now, always add to frame 0 stream - cudaSafeCall(cudaStreamWaitEvent(fs->frames[0].stream(), decoder->event(), 0)); + // For now, always add to frame 0 stream + cudaSafeCall(cudaStreamWaitEvent(fs->frames[0].stream(), decoder->event(), 0)); - fs->localTimestamp = spkt.localTimestamp; + fs->localTimestamp = spkt.localTimestamp; - _finishPacket(fs, spkt.frame_number); + _finishPacket(fs, spkt.frame_number); + } } void Receiver::_finishPacket(ftl::streams::LockedFrameSet &fs, size_t fix) { diff --git a/components/streams/src/renderers/openvr_render.cpp b/components/streams/src/renderers/openvr_render.cpp index 5c299083ebb2a8d421e674ac31b725014e80e117..1c90b830110b9125966a01e654dceea7677b8edc 100644 --- a/components/streams/src/renderers/openvr_render.cpp +++ b/components/streams/src/renderers/openvr_render.cpp @@ -420,7 +420,7 @@ bool OpenVRRender::retrieve(ftl::data::Frame &frame_out) { auto &f = s->frames[i]; // If audio is present, mix with the other frames - if (f.hasChannel(Channel::AudioStereo)) { + /*if (f.hasChannel(Channel::AudioStereo)) { // Map a mixer track to this frame auto &mixmap = mixmap_[f.id().id]; if (mixmap.track == -1) { @@ -433,7 +433,7 @@ bool OpenVRRender::retrieve(ftl::data::Frame &frame_out) { mixer_.write(mixmap.track, audio.data()); mixmap.last_timestamp = f.timestamp(); } - } + }*/ // Add pose as a camera shape auto &shape = shapes.list.emplace_back(); @@ -496,16 +496,16 @@ bool OpenVRRender::retrieve(ftl::data::Frame &frame_out) { //renderer2_->render(); - mixer_.mix(); + //mixer_.mix(); // Write mixed audio to frame. - if (mixer_.frames() > 0) { + /*if (mixer_.frames() > 0) { auto &list = frame_out.create<std::list<ftl::audio::Audio>>(Channel::AudioStereo).list; list.clear(); int fcount = mixer_.frames(); mixer_.read(list.emplace_front().data(), fcount); - } + }*/ // TODO: Blend option diff --git a/components/streams/src/renderers/screen_render.cpp b/components/streams/src/renderers/screen_render.cpp index 4556cfaec9505e9924b3d49c0013e2630920a2e3..a530750cce22f889814929507de54790bc435d28 100644 --- a/components/streams/src/renderers/screen_render.cpp +++ b/components/streams/src/renderers/screen_render.cpp @@ -167,7 +167,7 @@ bool ScreenRender::retrieve(ftl::data::Frame &frame_out) { auto &f = s->frames[i]; // If audio is present, mix with the other frames - if (f.hasChannel(Channel::AudioStereo)) { + /*if (f.hasChannel(Channel::AudioStereo)) { // Map a mixer track to this frame auto &mixmap = mixmap_[f.id().id]; if (mixmap.track == -1) { @@ -180,7 +180,7 @@ bool ScreenRender::retrieve(ftl::data::Frame &frame_out) { mixer_.write(mixmap.track, audio.data()); mixmap.last_timestamp = f.timestamp(); } - } + }*/ // Add pose as a camera shape auto &shape = shapes.list.emplace_back(); @@ -198,16 +198,17 @@ bool ScreenRender::retrieve(ftl::data::Frame &frame_out) { } } - mixer_.mix(); + //mixer_.mix(); // Write mixed audio to frame. - if (mixer_.frames() > 0) { - auto &list = frame_out.create<std::list<ftl::audio::Audio>>(Channel::AudioStereo).list; - list.clear(); - - int fcount = mixer_.frames(); - mixer_.read(list.emplace_front().data(), fcount); - } + //if (feed_->mixer().frames() > 0) { + //LOG(INFO) << "Render copy of " << feed_->mixer().frames() << " audio frames"; + //auto &list = frame_out.create<std::list<ftl::audio::Audio>>(Channel::AudioStereo).list; + //list.clear(); + + //int fcount = mixer_.frames(); + //mixer_.read(list.emplace_front().data(), fcount); + //} // This waits for GPU also if (!data_only) renderer_->end(); diff --git a/components/streams/src/renderers/screen_render.hpp b/components/streams/src/renderers/screen_render.hpp index fedc65ad48278fb4cc73840ebffb2146ec2e98cb..89320aed4b6e45a5cb73e693b25901470060b41b 100644 --- a/components/streams/src/renderers/screen_render.hpp +++ b/components/streams/src/renderers/screen_render.hpp @@ -38,12 +38,12 @@ class ScreenRender : public ftl::render::BaseSourceImpl { ftl::operators::Graph *post_pipe_; std::atomic_flag calibration_uptodate_; - struct AudioMixerMapping { + /*struct AudioMixerMapping { int64_t last_timestamp=0; int track=-1; }; - std::unordered_map<uint32_t, AudioMixerMapping> mixmap_; + std::unordered_map<uint32_t, AudioMixerMapping> mixmap_;*/ }; } diff --git a/components/streams/src/stream.cpp b/components/streams/src/stream.cpp index 72a8524e97d538ad1f407bcad15ebf76cf3b2517..b68bc01862d7b98bb5c903f7244444d70e035898 100644 --- a/components/streams/src/stream.cpp +++ b/components/streams/src/stream.cpp @@ -86,16 +86,17 @@ Muxer::~Muxer() { } -void Muxer::add(Stream *s, size_t fsid) { +void Muxer::add(Stream *s, size_t fsid, const std::function<int()> &cb) { UNIQUE_LOCK(mutex_,lk); if (fsid < 0u || fsid >= ftl::stream::kMaxStreams) return; auto &se = streams_.emplace_back(); //int i = streams_.size()-1; se.stream = s; + se.ids.push_back(fsid); ftl::stream::Muxer::StreamEntry *ptr = &se; - se.handle = std::move(s->onPacket([this,s,fsid,ptr](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + se.handle = std::move(s->onPacket([this,s,ptr,cb](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { //TODO: Allow input streams to have other streamIDs // Same fsid means same streamIDs map together in the end @@ -105,8 +106,27 @@ void Muxer::add(Stream *s, size_t fsid) { ptr = &streams_[i]; }*/ + if (!cb && spkt.streamID > 0) { + LOG(WARNING) << "Multiple framesets in stream"; + return true; + } + + if (ptr->ids.size() <= spkt.streamID) { + UNIQUE_LOCK(mutex_,lk); + if (ptr->ids.size() <= spkt.streamID) { + ptr->ids.resize(spkt.streamID + 1); + ptr->ids[spkt.streamID] = cb(); + } + } + + int fsid; + { + SHARED_LOCK(mutex_, lk); + fsid = ptr->ids[spkt.streamID]; + } + ftl::codecs::StreamPacket spkt2 = spkt; - ptr->original_fsid = spkt.streamID; + ptr->original_fsid = spkt.streamID; // FIXME: Multiple originals needed spkt2.streamID = fsid; if (spkt2.frame_number < 255) { @@ -115,7 +135,7 @@ void Muxer::add(Stream *s, size_t fsid) { } _notify(spkt2, pkt); - s->select(spkt.streamID, selected(fsid)); + s->select(spkt.streamID, selected(fsid), true); return true; })); } @@ -161,7 +181,7 @@ bool Muxer::post(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packe //LOG(INFO) << "POST " << spkt.frame_number; ftl::codecs::StreamPacket spkt2 = spkt; - spkt2.streamID = se->original_fsid; + spkt2.streamID = se->original_fsid; // FIXME: Multiple possible originals spkt2.frame_number = ssid; se->stream->select(spkt2.streamID, selected(spkt.frameSetID())); return se->stream->post(spkt2, pkt); @@ -202,25 +222,40 @@ void Muxer::reset() { int Muxer::_lookup(size_t fsid, ftl::stream::Muxer::StreamEntry *se, int ssid, int count) { SHARED_LOCK(mutex_, lk); - //auto &se = streams_[sid]; - if (static_cast<uint32_t>(ssid) >= se->maps.size()) { + + auto i = se->maps.find(fsid); + if (i == se->maps.end()) { + lk.unlock(); + { + UNIQUE_LOCK(mutex_, lk2); + if (se->maps.count(fsid) == 0) { + se->maps[fsid] = {}; + } + i = se->maps.find(fsid); + } + lk.lock(); + } + + auto &map = i->second; + + if (static_cast<uint32_t>(ssid) >= map.size()) { lk.unlock(); { UNIQUE_LOCK(mutex_, lk2); - while (static_cast<uint32_t>(ssid) >= se->maps.size()) { + while (static_cast<uint32_t>(ssid) >= map.size()) { int nid = nid_[fsid]++; - revmap_[fsid].push_back({se, static_cast<uint32_t>(se->maps.size())}); - se->maps.push_back(nid); + revmap_[fsid].push_back({se, static_cast<uint32_t>(map.size())}); + map.push_back(nid); for (int i=1; i<count; ++i) { int nid = nid_[fsid]++; - revmap_[fsid].push_back({se, static_cast<uint32_t>(se->maps.size())}); - se->maps.push_back(nid); + revmap_[fsid].push_back({se, static_cast<uint32_t>(map.size())}); + map.push_back(nid); } } } lk.lock(); } - return se->maps[ssid]; + return map[ssid]; } void Muxer::_notify(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { diff --git a/components/streams/test/filestream_unit.cpp b/components/streams/test/filestream_unit.cpp index 53287ae95f110e37cc500f8313692f332caee738..12a43b621f848efce6e4a43faac48c0c8b77ef88 100644 --- a/components/streams/test/filestream_unit.cpp +++ b/components/streams/test/filestream_unit.cpp @@ -44,7 +44,7 @@ TEST_CASE("ftl::stream::File write and read", "[stream]") { }); REQUIRE( reader->begin(false) ); - reader->tick(100); + reader->tick(ftl::timer::get_time()+10); reader->end(); //REQUIRE( tspkt.timestamp == 0 ); @@ -58,16 +58,17 @@ TEST_CASE("ftl::stream::File write and read", "[stream]") { REQUIRE( writer->begin() ); - REQUIRE( writer->post({4,0,0,1,ftl::codecs::Channel::Confidence},{ftl::codecs::codec_t::Any, 0, 0, 0, 0, {'f'}}) ); - REQUIRE( writer->post({4,0,1,1,ftl::codecs::Channel::Depth},{ftl::codecs::codec_t::Any, 0, 0, 0, 0, {'f'}}) ); - REQUIRE( writer->post({4,0,2,1,ftl::codecs::Channel::Screen},{ftl::codecs::codec_t::Any, 0, 0, 0, 0, {'f'}}) ); + REQUIRE( writer->post({5,10,0,1,ftl::codecs::Channel::Confidence},{ftl::codecs::codec_t::Any, 0, 0, 0, 0, {'f'}}) ); + REQUIRE( writer->post({5,10,1,1,ftl::codecs::Channel::Depth},{ftl::codecs::codec_t::Any, 0, 0, 0, 0, {'f'}}) ); + REQUIRE( writer->post({5,10,2,1,ftl::codecs::Channel::Screen},{ftl::codecs::codec_t::Any, 0, 0, 0, 0, {'f'}}) ); writer->end(); reader->set("filename", (std::filesystem::temp_directory_path() / "ftl_file_stream_test.ftl").string()); - ftl::codecs::StreamPacket tspkt = {4,0,0,1,ftl::codecs::Channel::Colour}; - int count = 0; + ftl::codecs::StreamPacket tspkt = {5,0,0,1,ftl::codecs::Channel::Colour}; + std::atomic_int count = 0; + auto h = reader->onPacket([&tspkt,&count](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { tspkt = spkt; ++count; @@ -75,7 +76,7 @@ TEST_CASE("ftl::stream::File write and read", "[stream]") { }); REQUIRE( reader->begin(false) ); - reader->tick(100); + reader->tick(ftl::timer::get_time()+10); reader->end(); REQUIRE( count == 3 ); @@ -108,7 +109,7 @@ TEST_CASE("ftl::stream::File write and read", "[stream]") { }); REQUIRE( reader->begin(false) ); - reader->tick(100); + reader->tick(ftl::timer::get_time()+ftl::timer::getInterval()); std::this_thread::sleep_for(std::chrono::milliseconds(10)); REQUIRE( count == 2 ); @@ -116,14 +117,14 @@ TEST_CASE("ftl::stream::File write and read", "[stream]") { //auto itime = tspkt.timestamp; count = 0; - reader->tick(101); + reader->tick(ftl::timer::get_time()+2*ftl::timer::getInterval()); std::this_thread::sleep_for(std::chrono::milliseconds(10)); REQUIRE( count == 2 ); //REQUIRE( tspkt.timestamp == itime+ftl::timer::getInterval() ); count = 0; - reader->tick(102); + reader->tick(ftl::timer::get_time()+3*ftl::timer::getInterval()); std::this_thread::sleep_for(std::chrono::milliseconds(10)); REQUIRE( count == 1 );