diff --git a/src/quic/src/quic_peer.cpp b/src/quic/src/quic_peer.cpp index d6e54839c0ce7a6a1f2ba05560e5dbdc871a1183..499a1ca6da7a3cd769c4a37afca82747316c11ec 100644 --- a/src/quic/src/quic_peer.cpp +++ b/src/quic/src/quic_peer.cpp @@ -353,16 +353,16 @@ size_t QuicPeerStream::ws_recv_(QUIC_BUFFER buffer_in, size_t& size_consumed) while (buffer_in.Length > 0) { // Complete frame has not yet been received - if (ws_recv_param.ws_payload_remaining_ > 0) + /*if (ws_recv_struct.ws_payload_remaining_ > 0) { - auto recv_size = std::min<int32_t>(buffer_in.Length, ws_recv_param.ws_payload_remaining_); + auto recv_size = std::min<int32_t>(buffer_in.Length, ws_recv_struct.ws_payload_remaining_); auto ws_buffer = nonstd::span<char>((char*)buffer_in.Buffer, recv_size); - if (ws_recv_param.ws_mask_) { Mask(ws_buffer.data(), ws_buffer.size(), ws_recv_param.ws_mask_key_ - , ws_recv_param.ws_payload_recvd_); } + if (ws_recv_struct.ws_mask_) { Mask(ws_buffer.data(), ws_buffer.size(), ws_recv_struct.ws_mask_key_ + , ws_recv_struct.ws_payload_recvd_); } - ws_recv_param.ws_payload_recvd_ += recv_size; - ws_recv_param.ws_payload_remaining_ -= recv_size; + ws_recv_struct.ws_payload_recvd_ += recv_size; + ws_recv_struct.ws_payload_remaining_ -= recv_size; // Maryam: recv_buffer as argument (char*) memcpy(recv_buffer_.buffer() + size_consumed, ws_buffer.data(), ws_buffer.size()); @@ -373,19 +373,22 @@ size_t QuicPeerStream::ws_recv_(QUIC_BUFFER buffer_in, size_t& size_consumed) // Process any remaining buffer continue; - } + }*/ + size_t buffer_in_consumed = 0; + ws_recv_struct.websocket_recv_(buffer_in, buffer_in_consumed, recv_buffer_.buffer() + , recv_buffer_.buffer_capacity(), size_consumed); // No previous frames or previous frame is complete, this buffer must contain header WebSocketHeader header; WsParseHeaderStatus status = INVALID; - if (ws_recv_param.ws_partial_header_recvd_ > 0) + if (ws_recv_struct.ws_partial_header_recvd_ > 0) { // Read remaining header - CHECK(ws_recv_param.ws_partial_header_recvd_ < (int)ws_recv_param.ws_header_.size()); - size_t cpy_size = std::min<size_t>(ws_recv_param.ws_header_.size() - ws_recv_param.ws_partial_header_recvd_, buffer_in.Length); - memcpy(ws_recv_param.ws_header_.data() + ws_recv_param.ws_partial_header_recvd_, buffer_in.Buffer, cpy_size); - status = WsParseHeader(ws_recv_param.ws_header_.data(), ws_recv_param.ws_partial_header_recvd_ + cpy_size, header); + CHECK(ws_recv_struct.ws_partial_header_recvd_ < (int)ws_recv_struct.ws_header_.size()); + size_t cpy_size = std::min<size_t>(ws_recv_struct.ws_header_.size() - ws_recv_struct.ws_partial_header_recvd_, buffer_in.Length); + memcpy(ws_recv_struct.ws_header_.data() + ws_recv_struct.ws_partial_header_recvd_, buffer_in.Buffer, cpy_size); + status = WsParseHeader(ws_recv_struct.ws_header_.data(), ws_recv_struct.ws_partial_header_recvd_ + cpy_size, header); // FIXME: In principle it is possible that the header still is not complete, // but rest of the code has to be checked for correctness (unit tests) @@ -403,10 +406,10 @@ size_t QuicPeerStream::ws_recv_(QUIC_BUFFER buffer_in, size_t& size_consumed) } else if (status == NOT_ENOUGH_DATA) { - CHECK(buffer_in.Length < ws_recv_param.ws_header_.size() && ws_recv_param.ws_partial_header_recvd_ == 0); - size_t cpy_size = std::min<size_t>(ws_recv_param.ws_header_.size() - ws_recv_param.ws_partial_header_recvd_, buffer_in.Length); - memcpy(ws_recv_param.ws_header_.data() + ws_recv_param.ws_partial_header_recvd_, buffer_in.Buffer, cpy_size); - ws_recv_param.ws_partial_header_recvd_ = std::min<uint32_t>(ws_recv_param.ws_header_.size(), ws_recv_param.ws_partial_header_recvd_ + buffer_in.Length); + CHECK(buffer_in.Length < ws_recv_struct.ws_header_.size() && ws_recv_struct.ws_partial_header_recvd_ == 0); + size_t cpy_size = std::min<size_t>(ws_recv_struct.ws_header_.size() - ws_recv_struct.ws_partial_header_recvd_, buffer_in.Length); + memcpy(ws_recv_struct.ws_header_.data() + ws_recv_struct.ws_partial_header_recvd_, buffer_in.Buffer, cpy_size); + ws_recv_struct.ws_partial_header_recvd_ = std::min<uint32_t>(ws_recv_struct.ws_header_.size(), ws_recv_struct.ws_partial_header_recvd_ + buffer_in.Length); size_ws += cpy_size; buffer_in.Length -= cpy_size; @@ -420,17 +423,17 @@ size_t QuicPeerStream::ws_recv_(QUIC_BUFFER buffer_in, size_t& size_consumed) close(); } - auto offset = header.HeaderSize - ws_recv_param.ws_partial_header_recvd_; + auto offset = header.HeaderSize - ws_recv_struct.ws_partial_header_recvd_; size_ws += offset; buffer_in.Length -= offset; buffer_in.Buffer += offset; - ws_recv_param.ws_mask_ = header.Mask; - ws_recv_param.ws_mask_key_ = header.MaskingKey; + ws_recv_struct.ws_mask_ = header.Mask; + ws_recv_struct.ws_mask_key_ = header.MaskingKey; - ws_recv_param.ws_partial_header_recvd_ = 0; - ws_recv_param.ws_payload_recvd_ = 0; - ws_recv_param.ws_payload_remaining_ = header.PayloadLength; + ws_recv_struct.ws_partial_header_recvd_ = 0; + ws_recv_struct.ws_payload_recvd_ = 0; + ws_recv_struct.ws_payload_remaining_ = header.PayloadLength; if (header.OpCode != OpCodeType::BINARY_FRAME) { @@ -444,10 +447,10 @@ size_t QuicPeerStream::ws_recv_(QUIC_BUFFER buffer_in, size_t& size_consumed) } // Maryam: Wrong (recv buffer may be smaller than payload) // BufferIn.Length == ws_size + size_consumed (at call) does not hold - size_ws += ws_recv_param.ws_payload_remaining_; - buffer_in.Length -= ws_recv_param.ws_payload_remaining_; - buffer_in.Buffer += ws_recv_param.ws_payload_remaining_; - ws_recv_param.ws_payload_remaining_ = 0; + size_ws += ws_recv_struct.ws_payload_remaining_; + buffer_in.Length -= ws_recv_struct.ws_payload_remaining_; + buffer_in.Buffer += ws_recv_struct.ws_payload_remaining_; + ws_recv_struct.ws_payload_remaining_ = 0; } } } diff --git a/src/quic/src/quic_peer.hpp b/src/quic/src/quic_peer.hpp index c8e1865cb57457c552c0a59ee9bfcbd2add37e6f..a00e710b4290e6e8007a007190736b581ab691c2 100644 --- a/src/quic/src/quic_peer.hpp +++ b/src/quic/src/quic_peer.hpp @@ -110,7 +110,7 @@ private: // Maryam: size_consumed_out and sum of them output and move the function out - WebSocketRecv ws_recv_param; + WebSocketRecv ws_recv_struct; size_t ws_recv_(QUIC_BUFFER buffer_in, size_t& size_consumed_out); // Send limits. Send methods will block if limits are exceeded (with warning message). diff --git a/src/quic/src/websocket.cpp b/src/quic/src/websocket.cpp index a2661690d8ce865491980f2e7f6840f7a75c714f..bda5f07e59d93565211a572af9d2ad5e05929db3 100644 --- a/src/quic/src/websocket.cpp +++ b/src/quic/src/websocket.cpp @@ -137,3 +137,33 @@ WsParseHeaderStatus WsParseHeader(uint8_t* const Data, size_t Length, WebSocketH return OK; } + +void WebSocketRecv::websocket_recv_(QUIC_BUFFER buffer_in, size_t& buffer_in_consumed, char* buffer_out + , size_t buffer_out_size, size_t& buffer_out_consumed){ + // Amount of bytes for websocket header consumed + while (buffer_in.Length > 0 && buffer_out_consumed < buffer_out_size) + { + // Complete frame has not yet been received + if (ws_payload_remaining_ > 0) + { + auto recv_size = std::min<int32_t>(buffer_in.Length, ws_payload_remaining_); + auto ws_buffer = nonstd::span<char>((char*)buffer_in.Buffer, recv_size); + + if (ws_mask_) { Mask(ws_buffer.data(), ws_buffer.size(), ws_mask_key_ + , ws_payload_recvd_); } + + ws_payload_recvd_ += recv_size; + ws_payload_remaining_ -= recv_size; + + memcpy(buffer_out + buffer_out_consumed, ws_buffer.data(), ws_buffer.size()); + buffer_out_consumed += ws_buffer.size(); + + buffer_in.Length -= recv_size; + buffer_in.Buffer += recv_size; + buffer_in_consumed += recv_size; + + // Process any remaining buffer + continue; + } + } +} diff --git a/src/quic/src/websocket.hpp b/src/quic/src/websocket.hpp index e9785b8e69c20978f888c0f02edc3a042d8fdefc..ed5efa0b7ef0abef3cf258f37ae6e85ed545ab73 100644 --- a/src/quic/src/websocket.hpp +++ b/src/quic/src/websocket.hpp @@ -6,6 +6,8 @@ #include <deque> #include <cstring> #include <cstdlib> +#include "quic.hpp" +#include "loguru.hpp" struct WebSocketRecv { int ws_payload_recvd_ = 0; @@ -15,7 +17,7 @@ struct WebSocketRecv { std::array<unsigned char, 4> ws_mask_key_; std::array<uint8_t, 14> ws_header_; - size_t ws_recv_(QUIC_BUFFER buffer_in, size_t& buffer_in_consumed, char* buffer_out + void websocket_recv_(QUIC_BUFFER buffer_in, size_t& buffer_in_consumed, char* buffer_out , size_t buffer_out_size, size_t& buffer_out_consumed); };