Skip to content
Snippets Groups Projects
Commit 39c2520f authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Working sender

parent 0e626dbb
No related branches found
No related tags found
1 merge request!316Resolves #343 GUI and Frame Refactor
Pipeline #27443 failed
...@@ -143,11 +143,11 @@ static void run(ftl::Configurable *root) { ...@@ -143,11 +143,11 @@ static void run(ftl::Configurable *root) {
source->set("uri", uri.getBaseURI()); source->set("uri", uri.getBaseURI());
} }
//ftl::stream::Sender *sender = ftl::create<ftl::stream::Sender>(root, "sender"); ftl::stream::Sender *sender = ftl::create<ftl::stream::Sender>(root, "sender");
ftl::stream::Net *outstream = ftl::create<ftl::stream::Net>(root, "stream", net); ftl::stream::Net *outstream = ftl::create<ftl::stream::Net>(root, "stream", net);
outstream->set("uri", outstream->getID()); outstream->set("uri", outstream->getID());
outstream->begin(); outstream->begin();
//sender->setStream(outstream); sender->setStream(outstream);
/*auto *grp = new ftl::rgbd::Group(); /*auto *grp = new ftl::rgbd::Group();
source->setChannel(Channel::Depth); source->setChannel(Channel::Depth);
...@@ -157,7 +157,7 @@ static void run(ftl::Configurable *root) { ...@@ -157,7 +157,7 @@ static void run(ftl::Configurable *root) {
auto creator = pool.creator<ftl::data::IntervalFrameCreator>(0, source); auto creator = pool.creator<ftl::data::IntervalFrameCreator>(0, source);
// Listen for any flush events for frameset 0 // Listen for any flush events for frameset 0
auto flushh = pool.group(0).onFlush([](ftl::data::Frame &f, ftl::codecs::Channel c) { /*auto flushh = pool.group(0).onFlush([](ftl::data::Frame &f, ftl::codecs::Channel c) {
// Send to sender for encoding // Send to sender for encoding
if (c == ftl::codecs::Channel::Colour) { if (c == ftl::codecs::Channel::Colour) {
...@@ -167,12 +167,16 @@ static void run(ftl::Configurable *root) { ...@@ -167,12 +167,16 @@ static void run(ftl::Configurable *root) {
cv::waitKey(1); cv::waitKey(1);
} }
return true; return true;
}); });*/
// Callback for when a source has populated a frame with data // Callback for when a source has populated a frame with data
auto h = source->onFrame([](ftl::data::Frame &f) { auto h = source->onFrame([sender](ftl::data::Frame &f) {
// Make a frameset first
auto fs = ftl::data::FrameSet::fromFrame(f);
// Lock and send colour right now to encode in parallel // Lock and send colour right now to encode in parallel
f.flush(ftl::codecs::Channel::Colour); fs->flush(ftl::codecs::Channel::Colour);
sender->post(*fs, ftl::codecs::Channel::Colour);
// Do pipeline here... // Do pipeline here...
return true; return true;
......
...@@ -9,7 +9,7 @@ set(STREAMSRC ...@@ -9,7 +9,7 @@ set(STREAMSRC
src/stream.cpp src/stream.cpp
src/filestream.cpp src/filestream.cpp
#src/receiver.cpp #src/receiver.cpp
#src/sender.cpp src/sender.cpp
src/netstream.cpp src/netstream.cpp
src/injectors.cpp src/injectors.cpp
src/parsers.cpp src/parsers.cpp
......
...@@ -14,7 +14,7 @@ class Sender : public ftl::Configurable { ...@@ -14,7 +14,7 @@ class Sender : public ftl::Configurable {
void setStream(ftl::stream::Stream*); void setStream(ftl::stream::Stream*);
void post(ftl::data::Frame &f, ftl::codecs::Channel c); void post(ftl::data::FrameSet &fs, ftl::codecs::Channel c);
private: private:
std::unordered_map<uint32_t, ftl::codecs::Encoder*> encoders_; std::unordered_map<uint32_t, ftl::codecs::Encoder*> encoders_;
......
...@@ -27,12 +27,12 @@ class Sender : public ftl::Configurable { ...@@ -27,12 +27,12 @@ class Sender : public ftl::Configurable {
* Encode and transmit an entire frame set. Frames may already contain * Encode and transmit an entire frame set. Frames may already contain
* an encoded form, in which case that is used instead. * an encoded form, in which case that is used instead.
*/ */
void post(ftl::rgbd::FrameSet &fs); void post(ftl::data::FrameSet &fs, ftl::codecs::Channel c);
/** /**
* Encode and transmit a set of audio channels. * Encode and transmit a set of audio channels.
*/ */
void post(const ftl::audio::FrameSet &fs); //void post(const ftl::audio::FrameSet &fs);
//void onStateChange(const std::function<void(ftl::codecs::Channel, int, int)>&); //void onStateChange(const std::function<void(ftl::codecs::Channel, int, int)>&);
......
...@@ -74,7 +74,7 @@ ftl::audio::Encoder *Sender::_getAudioEncoder(int fsid, int sid, ftl::codecs::Ch ...@@ -74,7 +74,7 @@ ftl::audio::Encoder *Sender::_getAudioEncoder(int fsid, int sid, ftl::codecs::Ch
return state.encoder; return state.encoder;
} }
void Sender::post(const ftl::audio::FrameSet &fs) { /*void Sender::post(const ftl::audio::FrameSet &fs) {
if (!stream_) return; if (!stream_) return;
//if (fs.stale) return; //if (fs.stale) return;
...@@ -100,11 +100,7 @@ void Sender::post(const ftl::audio::FrameSet &fs) { ...@@ -100,11 +100,7 @@ void Sender::post(const ftl::audio::FrameSet &fs) {
pkt.codec = ftl::codecs::codec_t::OPUS; pkt.codec = ftl::codecs::codec_t::OPUS;
//pkt.definition = ftl::codecs::definition_t::Any; //pkt.definition = ftl::codecs::definition_t::Any;
/*switch (settings.sample_rate) {
case 48000 : pkt.definition = ftl::codecs::definition_t::hz48000; break;
case 44100 : pkt.definition = ftl::codecs::definition_t::hz44100; break;
default: break;
}*/
pkt.definition = ftl::codecs::definition_t::hz48000; pkt.definition = ftl::codecs::definition_t::hz48000;
...@@ -130,7 +126,7 @@ void Sender::post(const ftl::audio::FrameSet &fs) { ...@@ -130,7 +126,7 @@ void Sender::post(const ftl::audio::FrameSet &fs) {
//LOG(INFO) << "SENT AUDIO: " << fs.timestamp << " - " << pkt.data.size(); //LOG(INFO) << "SENT AUDIO: " << fs.timestamp << " - " << pkt.data.size();
} }
} }*/
template <typename T> template <typename T>
static void writeValue(std::vector<unsigned char> &data, T value) { static void writeValue(std::vector<unsigned char> &data, T value) {
...@@ -160,14 +156,10 @@ static void mergeNALUnits(const std::list<ftl::codecs::Packet> &pkts, ftl::codec ...@@ -160,14 +156,10 @@ static void mergeNALUnits(const std::list<ftl::codecs::Packet> &pkts, ftl::codec
} }
} }
void Sender::post(ftl::rgbd::FrameSet &fs) { void Sender::post(ftl::data::FrameSet &fs, ftl::codecs::Channel c) {
if (!stream_) return; if (!stream_) return;
Channels selected; //if (stream_->size() > 0) selected = stream_->selected(0);
Channels available; // but not selected and actually sent.
Channels needencoding;
if (stream_->size() > 0) selected = stream_->selected(0);
bool do_inject = !do_inject_.test_and_set(); bool do_inject = !do_inject_.test_and_set();
...@@ -194,6 +186,9 @@ void Sender::post(ftl::rgbd::FrameSet &fs) { ...@@ -194,6 +186,9 @@ void Sender::post(ftl::rgbd::FrameSet &fs) {
stream_->post(spkt, pkt); stream_->post(spkt, pkt);
}*/ }*/
bool available = false;
bool needs_encoding = true;
for (size_t i=0; i<fs.frames.size(); ++i) { for (size_t i=0; i<fs.frames.size(); ++i) {
const auto &frame = fs.frames[i]; const auto &frame = fs.frames[i];
...@@ -228,9 +223,9 @@ void Sender::post(ftl::rgbd::FrameSet &fs) { ...@@ -228,9 +223,9 @@ void Sender::post(ftl::rgbd::FrameSet &fs) {
stream_->post(spkt, pkt); stream_->post(spkt, pkt);
}*/ }*/
for (auto ic : frame.changed()) { //for (auto ic : frame.changed()) {
auto c = ic.first; //auto c = ic.first;
if (selected.has(c)) { if (true) { //if (selected.has(c)) {
// FIXME: Sends high res colour, but receive end currently broken // FIXME: Sends high res colour, but receive end currently broken
//auto cc = (c == Channel::Colour && frame.hasChannel(Channel::ColourHighRes)) ? Channel::ColourHighRes : c; //auto cc = (c == Channel::Colour && frame.hasChannel(Channel::ColourHighRes)) ? Channel::ColourHighRes : c;
auto cc = c; auto cc = c;
...@@ -245,6 +240,7 @@ void Sender::post(ftl::rgbd::FrameSet &fs) { ...@@ -245,6 +240,7 @@ void Sender::post(ftl::rgbd::FrameSet &fs) {
// Check if there are existing encoded packets // Check if there are existing encoded packets
const auto &packets = frame.getEncoded(cc); const auto &packets = frame.getEncoded(cc);
if (packets.size() > 0) { if (packets.size() > 0) {
needs_encoding = false;
if (packets.size() > 1) { if (packets.size() > 1) {
LOG(WARNING) << "Multi-packet send: " << (int)cc; LOG(WARNING) << "Multi-packet send: " << (int)cc;
ftl::codecs::Packet pkt; ftl::codecs::Packet pkt;
...@@ -256,17 +252,15 @@ void Sender::post(ftl::rgbd::FrameSet &fs) { ...@@ -256,17 +252,15 @@ void Sender::post(ftl::rgbd::FrameSet &fs) {
stream_->post(spkt, packets.front()); stream_->post(spkt, packets.front());
//} //}
} }
} else {
needencoding += c;
} }
} else { } else {
available += c; available = true;
}
} }
//}
} }
for (auto c : available) { if (available) {
// Not selected so send an empty packet... // Not selected so send an empty packet...
StreamPacket spkt; StreamPacket spkt;
spkt.version = 4; spkt.version = 4;
...@@ -282,7 +276,7 @@ void Sender::post(ftl::rgbd::FrameSet &fs) { ...@@ -282,7 +276,7 @@ void Sender::post(ftl::rgbd::FrameSet &fs) {
stream_->post(spkt, pkt); stream_->post(spkt, pkt);
} }
for (auto c : needencoding) { if (needs_encoding) {
// TODO: One thread per channel. // TODO: One thread per channel.
_encodeChannel(fs, c, do_inject || iframe_ == 0); _encodeChannel(fs, c, do_inject || iframe_ == 0);
} }
...@@ -290,7 +284,7 @@ void Sender::post(ftl::rgbd::FrameSet &fs) { ...@@ -290,7 +284,7 @@ void Sender::post(ftl::rgbd::FrameSet &fs) {
//do_inject_ = false; //do_inject_ = false;
} }
void Sender::_encodeChannel(ftl::rgbd::FrameSet &fs, Channel c, bool reset) { void Sender::_encodeChannel(ftl::data::FrameSet &fs, Channel c, bool reset) {
bool lossless = value("lossless", false); bool lossless = value("lossless", false);
int max_bitrate = std::max(0, std::min(255, value("max_bitrate", 255))); int max_bitrate = std::max(0, std::min(255, value("max_bitrate", 255)));
//int min_bitrate = std::max(0, std::min(255, value("min_bitrate", 0))); // TODO: Use this //int min_bitrate = std::max(0, std::min(255, value("min_bitrate", 0))); // TODO: Use this
......
...@@ -266,6 +266,8 @@ class Frame { ...@@ -266,6 +266,8 @@ class Frame {
*/ */
static Frame make_standalone(); static Frame make_standalone();
inline Pool *pool() const { return pool_; }
protected: protected:
std::any &createAnyChange(ftl::codecs::Channel c, ftl::data::ChangeType t); std::any &createAnyChange(ftl::codecs::Channel c, ftl::data::ChangeType t);
......
...@@ -77,6 +77,22 @@ class FrameSet : public ftl::data::Frame { ...@@ -77,6 +77,22 @@ class FrameSet : public ftl::data::Frame {
inline Frame &operator[](int ix) { return frames[ix]; } inline Frame &operator[](int ix) { return frames[ix]; }
inline const Frame &operator[](int ix) const { return frames[ix]; } inline const Frame &operator[](int ix) const { return frames[ix]; }
/**
* Flush all frames in the frameset.
*/
void flush();
/**
* Flush a channel for all frames in the frameset.
*/
void flush(ftl::codecs::Channel);
/**
* Make a frameset from a single frame. It borrows the pool, id and
* timestamp from the frame and creates a wrapping frameset instance.
*/
static std::shared_ptr<FrameSet> fromFrame(Frame &);
private: private:
std::atomic<int> flags_; std::atomic<int> flags_;
}; };
......
...@@ -45,3 +45,23 @@ const ftl::data::Frame &ftl::data::FrameSet::firstFrame() const { ...@@ -45,3 +45,23 @@ const ftl::data::Frame &ftl::data::FrameSet::firstFrame() const {
} }
throw FTL_Error("No frames in frameset"); throw FTL_Error("No frames in frameset");
} }
void FrameSet::flush() {
for (auto &f : frames) f.flush();
}
void FrameSet::flush(ftl::codecs::Channel c) {
for (auto &f : frames) f.flush(c);
}
/**
* Make a frameset from a single frame. It borrows the pool, id and
* timestamp from the frame and creates a wrapping frameset instance.
*/
std::shared_ptr<FrameSet> FrameSet::fromFrame(Frame &f) {
auto sptr = std::make_shared<FrameSet>(f.pool(), f.id()|0xFF, f.timestamp());
sptr->frames.push_back(std::move(f));
sptr->count = 1;
sptr->mask = 1;
return sptr;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment