diff --git a/src/streams/netstream.cpp b/src/streams/netstream.cpp index 7a7403f0fda20010e2ea559cb4946092fbe77e93..590b265b236f043e2d5bd63e0ca4307a89cf9d38 100644 --- a/src/streams/netstream.cpp +++ b/src/streams/netstream.cpp @@ -211,9 +211,13 @@ void Net::_processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket spkt.version = 4; if (p) spkt.hint_peerid = p->localID(); + bool isRequest = host_ && pkt.data.size() == 0 && (spkt.flags & ftl::protocol::kFlagRequest); + FrameID localFrame(spkt.streamID, spkt.frame_number); - seen(localFrame, spkt.channel); + if (!isRequest) { + seen(localFrame, spkt.channel); + } if (paused_) return; @@ -252,7 +256,7 @@ void Net::_processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket // If hosting and no data then it is a request for data // Note: a non host can receive empty data, meaning data is available // but that you did not request it - if (host_ && pkt.data.size() == 0 && (spkt.flags & ftl::protocol::kFlagRequest)) { + if (isRequest) { _processRequest(p, &spkt, pkt); } @@ -293,8 +297,8 @@ bool Net::begin() { net_streams.push_back(uri_); } - net_->broadcast("add_stream", uri_); active_ = true; + net_->broadcast("add_stream", uri_); } else { tally_ = frames_to_request_; @@ -437,6 +441,18 @@ void Net::_cleanUp() { */ bool Net::_processRequest(ftl::net::Peer *p, StreamPacket *spkt, const DataPacket &pkt) { bool found = false; + + if (spkt->streamID == 255 || spkt->frame_number == 255) { + // Generate a batch of requests + ftl::protocol::StreamPacket spkt2 = *spkt; + for (const auto &i : frames()) { + spkt2.streamID = i.frameset(); + spkt2.frame_number = i.source(); + _processRequest(p, &spkt2, pkt); + } + return false; + } + DLOG(INFO) << "processing request: " << int(spkt->streamID) << ", " << int(spkt->channel); const FrameID frameId(spkt->streamID, spkt->frame_number); diff --git a/test/netstream_unit.cpp b/test/netstream_unit.cpp index d2b15271f2724cfb6ce447d98e3941df9eda564b..06d5429f7e03a7ea4ba23140985019ec8aff4326 100644 --- a/test/netstream_unit.cpp +++ b/test/netstream_unit.cpp @@ -145,6 +145,32 @@ TEST_CASE("Net stream sending requests") { REQUIRE( seenReq ); } + SECTION("responds to 255 requests") { + auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), true); + + REQUIRE( s1->begin() ); + s1->seen(FrameID(1, 0), Channel::kEndFrame); + + bool seenReq = false; + + auto h = s1->onRequest([&seenReq](const ftl::protocol::Request &req) { + if (req.id.frameset() == 1) seenReq = true; + return true; + }); + + ftl::protocol::StreamPacketMSGPACK spkt; + ftl::protocol::PacketMSGPACK pkt; + spkt.streamID = 255; + spkt.frame_number = 255; + spkt.channel = Channel::kColour; + spkt.flags = ftl::protocol::kFlagRequest; + writeNotification(0, "ftl://mystream", std::make_tuple(0, spkt, pkt)); + p->data(); + + sleep_for(milliseconds(50)); + REQUIRE( seenReq ); + } + p.reset(); ftl::protocol::reset(); }