Skip to content
Snippets Groups Projects
Commit 0680a05a authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Merge branch 'bug/netstream-buf-requests' into 'main'

Immediately process packets for tally

See merge request beyondaka/beyond-protocol!68
parents ccb0c3e1 aa870aca
No related branches found
No related tags found
No related merge requests found
......@@ -207,20 +207,9 @@ bool Net::post(const StreamPacket &spkt, const DataPacket &pkt) {
return true;
}
void Net::_processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket &spkt_raw, DataPacket &pkt) {
int64_t now = time_point_cast<milliseconds>(high_resolution_clock::now()).time_since_epoch().count();
void Net::_earlyProcessPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket &spkt, DataPacket &pkt) {
if (!active_) return;
ftl::protocol::PacketPair pair;
StreamPacket &spkt = pair.first;
spkt = spkt_raw;
spkt.localTimestamp = now - int64_t(ttimeoff);
spkt.hint_capability = 0;
spkt.hint_source_total = 0;
spkt.version = 4;
if (p) spkt.hint_peerid = p->localID();
bool isRequest = host_ && pkt.data.size() == 0 && (spkt.flags & ftl::protocol::kFlagRequest);
FrameID localFrame(spkt.streamID, spkt.frame_number);
......@@ -247,6 +236,23 @@ void Net::_processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket
--tally_[localFrame.frameset()];
}
}
}
void Net::_processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket &spkt_raw, DataPacket &pkt) {
int64_t now = time_point_cast<milliseconds>(high_resolution_clock::now()).time_since_epoch().count();
if (!active_) return;
ftl::protocol::PacketPair pair;
StreamPacket &spkt = pair.first;
spkt = spkt_raw;
spkt.localTimestamp = now - int64_t(ttimeoff);
spkt.hint_capability = 0;
spkt.hint_source_total = 0;
spkt.version = 4;
if (p) spkt.hint_peerid = p->localID();
bool isRequest = host_ && pkt.data.size() == 0 && (spkt.flags & ftl::protocol::kFlagRequest);
bytes_received_ += pkt.data.size();
......@@ -257,19 +263,8 @@ void Net::_processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket
_processRequest(p, &spkt, pkt);
}
/*if (!host_) {
pair.second = std::move(pkt);
mgr_.submit(pair, [this, now, ttimeoff, p](const ftl::protocol::PacketPair &pair) {
const StreamPacket &spkt = pair.first;
const DataPacket &pkt = pair.second;
trigger(spkt, pkt);
if (pkt.data.size() > 0) _checkRXRate(pkt.data.size(), now-(spkt.timestamp+ttimeoff), spkt.timestamp);
});
} else {*/
trigger(spkt, pkt);
if (pkt.data.size() > 0) _checkRXRate(pkt.data.size(), now-(spkt.timestamp+ttimeoff), spkt.timestamp);
//}
trigger(spkt, pkt);
if (pkt.data.size() > 0) _checkRXRate(pkt.data.size(), now-(spkt.timestamp+ttimeoff), spkt.timestamp);
}
void Net::inject(const ftl::protocol::StreamPacket &spkt, ftl::protocol::DataPacket &pkt) {
......@@ -310,6 +305,7 @@ void Net::_run() {
if (state->active > 0) {
LOG(WARNING) << "Previous frame still processing: " << nextTs;
nextTs = now + 1;
hasNext = true;
continue;
}
......@@ -333,7 +329,7 @@ void Net::_run() {
}
ats = raw_ats - state->base_pkt_ts_ + buffering_;
} else {
LOG(WARNING) << "No packets to present: " << cts;
// LOG(WARNING) << "No packets to present: " << cts;
continue;
}
......@@ -365,7 +361,11 @@ void Net::_run() {
++state->active;
ftl::pool.push([this, buf = &*current, spkt, pkt, state](int ix) {
_processPacket(buf->peer, 0, *spkt, *pkt);
try {
_processPacket(buf->peer, 0, *spkt, *pkt);
} catch (const std::exception &e) {
LOG(ERROR) << "Packet processing error: " << e.what();
}
buf->done = true;
--state->active;
});
......@@ -442,6 +442,8 @@ bool Net::begin() {
PacketMSGPACK &pkt) {
auto *state = _getFrameState(FrameID(spkt_raw.streamID, spkt_raw.frame_number));
_earlyProcessPacket(&p, ttimeoff, spkt_raw, pkt);
if (!host_) {
UNIQUE_LOCK(state->mtx, lk);
state->timestamps.insert(spkt_raw.timestamp);
......
......@@ -149,6 +149,7 @@ class Net : public Stream {
bool doreset = false);
void _cleanUp();
void _processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket &spkt_raw, DataPacket &pkt);
void _earlyProcessPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket &spkt_raw, DataPacket &pkt);
void _run();
};
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment