Skip to content
Snippets Groups Projects
Commit aa870aca authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Immediately process packets for tally

parent ccb0c3e1
No related branches found
No related tags found
No related merge requests found
......@@ -207,20 +207,9 @@ bool Net::post(const StreamPacket &spkt, const DataPacket &pkt) {
return true;
}
void Net::_processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket &spkt_raw, DataPacket &pkt) {
int64_t now = time_point_cast<milliseconds>(high_resolution_clock::now()).time_since_epoch().count();
void Net::_earlyProcessPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket &spkt, DataPacket &pkt) {
if (!active_) return;
ftl::protocol::PacketPair pair;
StreamPacket &spkt = pair.first;
spkt = spkt_raw;
spkt.localTimestamp = now - int64_t(ttimeoff);
spkt.hint_capability = 0;
spkt.hint_source_total = 0;
spkt.version = 4;
if (p) spkt.hint_peerid = p->localID();
bool isRequest = host_ && pkt.data.size() == 0 && (spkt.flags & ftl::protocol::kFlagRequest);
FrameID localFrame(spkt.streamID, spkt.frame_number);
......@@ -247,6 +236,23 @@ void Net::_processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket
--tally_[localFrame.frameset()];
}
}
}
void Net::_processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket &spkt_raw, DataPacket &pkt) {
int64_t now = time_point_cast<milliseconds>(high_resolution_clock::now()).time_since_epoch().count();
if (!active_) return;
ftl::protocol::PacketPair pair;
StreamPacket &spkt = pair.first;
spkt = spkt_raw;
spkt.localTimestamp = now - int64_t(ttimeoff);
spkt.hint_capability = 0;
spkt.hint_source_total = 0;
spkt.version = 4;
if (p) spkt.hint_peerid = p->localID();
bool isRequest = host_ && pkt.data.size() == 0 && (spkt.flags & ftl::protocol::kFlagRequest);
bytes_received_ += pkt.data.size();
......@@ -257,19 +263,8 @@ void Net::_processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket
_processRequest(p, &spkt, pkt);
}
/*if (!host_) {
pair.second = std::move(pkt);
mgr_.submit(pair, [this, now, ttimeoff, p](const ftl::protocol::PacketPair &pair) {
const StreamPacket &spkt = pair.first;
const DataPacket &pkt = pair.second;
trigger(spkt, pkt);
if (pkt.data.size() > 0) _checkRXRate(pkt.data.size(), now-(spkt.timestamp+ttimeoff), spkt.timestamp);
});
} else {*/
trigger(spkt, pkt);
if (pkt.data.size() > 0) _checkRXRate(pkt.data.size(), now-(spkt.timestamp+ttimeoff), spkt.timestamp);
//}
trigger(spkt, pkt);
if (pkt.data.size() > 0) _checkRXRate(pkt.data.size(), now-(spkt.timestamp+ttimeoff), spkt.timestamp);
}
void Net::inject(const ftl::protocol::StreamPacket &spkt, ftl::protocol::DataPacket &pkt) {
......@@ -310,6 +305,7 @@ void Net::_run() {
if (state->active > 0) {
LOG(WARNING) << "Previous frame still processing: " << nextTs;
nextTs = now + 1;
hasNext = true;
continue;
}
......@@ -333,7 +329,7 @@ void Net::_run() {
}
ats = raw_ats - state->base_pkt_ts_ + buffering_;
} else {
LOG(WARNING) << "No packets to present: " << cts;
// LOG(WARNING) << "No packets to present: " << cts;
continue;
}
......@@ -365,7 +361,11 @@ void Net::_run() {
++state->active;
ftl::pool.push([this, buf = &*current, spkt, pkt, state](int ix) {
_processPacket(buf->peer, 0, *spkt, *pkt);
try {
_processPacket(buf->peer, 0, *spkt, *pkt);
} catch (const std::exception &e) {
LOG(ERROR) << "Packet processing error: " << e.what();
}
buf->done = true;
--state->active;
});
......@@ -442,6 +442,8 @@ bool Net::begin() {
PacketMSGPACK &pkt) {
auto *state = _getFrameState(FrameID(spkt_raw.streamID, spkt_raw.frame_number));
_earlyProcessPacket(&p, ttimeoff, spkt_raw, pkt);
if (!host_) {
UNIQUE_LOCK(state->mtx, lk);
state->timestamps.insert(spkt_raw.timestamp);
......
......@@ -149,6 +149,7 @@ class Net : public Stream {
bool doreset = false);
void _cleanUp();
void _processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket &spkt_raw, DataPacket &pkt);
void _earlyProcessPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket &spkt_raw, DataPacket &pkt);
void _run();
};
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment