diff --git a/src/protocol/websocket.cpp b/src/protocol/websocket.cpp index 6a157f095cc33dfdb5038f044903c7352d8161cf..15a77cce8c9d1468e5017b473022399633098526 100644 --- a/src/protocol/websocket.cpp +++ b/src/protocol/websocket.cpp @@ -254,7 +254,7 @@ void WebSocketBase<SocketT>::connect(const ftl::URI& uri, int timeout) { } if (sscanf(line, "HTTP/1.1 %d", &status) != 1 || status != 101) { throw FTL_Error("ERROR: Got bad status connecting to: " - + uri.getHost() + ": " + line); + + uri.getBaseURI() + ": " + line); } std::unordered_map<std::string, std::string> headers; diff --git a/src/quic/src/proxy.cpp b/src/quic/src/proxy.cpp index 7f200ace7137765d11824aebdb8371234293cd28..48bd99f84cbff06c59215bdbbbab872a5eb2b867 100644 --- a/src/quic/src/proxy.cpp +++ b/src/quic/src/proxy.cpp @@ -40,7 +40,7 @@ struct ProxyClient::Writer : public beyond_impl::IMsQuicStreamHandler void OnWriteCancelled(int32_t id) override { - // FIXME return context + // FIXME return context auto Lk = std::unique_lock(mMtx); mBuffers.pop_front(); } @@ -102,6 +102,8 @@ void ProxyClient::OnCertificateReceived(beyond_impl::MsQuicConnection* Connectio beyond_impl::MsQuicStreamPtr ProxyClient::OpenStream(const std::string& name) { + // Proxy server expects msgpack format data in array [command, args], easiest to use std::tuple + // for sending and manually unpack on the server (via msgpack::object). Writer Handler; auto msg = std::make_tuple(MsgCtr++, "connect", name); diff --git a/src/quic/src/proxy.hpp b/src/quic/src/proxy.hpp index 936dba776b06be87041d0af17da480dfdddc325b..786bec9506139919d7bb7656964b19197c855626 100644 --- a/src/quic/src/proxy.hpp +++ b/src/quic/src/proxy.hpp @@ -10,6 +10,11 @@ namespace beyond_impl class QuicUniverseImpl; } +// Quic proxy client prototype. On creation of a Quic stream, server is expected to proxy the stream to a node +// (creating a new quic stream from server to target node) and then passing the data between the two nodes. +// after the stream is created on server, the communication can proceed as usual. Additional commands can be +// sent to the server via a dedicated control quic stream (prototype/proof of concept). + class ProxyClient final : public beyond_impl::IMsQuicConnectionHandler, private beyond_impl::IMsQuicStreamHandler { public: @@ -19,8 +24,12 @@ public: void SetConnection(beyond_impl::MsQuicConnectionPtr ConnectionIn) ; beyond_impl::MsQuicConnection* ConnectionHandle() { return mConnection.get(); }; + // Connect to another node via proxy using given name (set on remote using SetLocalName). + // New stream is created, msgpack message is sent telling the proxy the requested name. beyond_impl::MsQuicStreamPtr OpenStream(const std::string& name); + // Set local name, proxy must connect streams with this name to this instance, + // assuming the name was not already in use. bool SetLocalName(const std::string& Name); private: @@ -34,6 +43,10 @@ private: void OnConnect(beyond_impl::MsQuicConnection* Connection) override; void OnDisconnect(beyond_impl::MsQuicConnection* Connection) override; + + // Proxy created a new quic stream on behalf of another client. + // A new Peer instance is created and behaves exactly as directly connected Peer would + // (the call is passed to OnStreamCreate of QuicUniverse). void OnStreamCreate(beyond_impl::MsQuicConnection* Connection, std::unique_ptr<beyond_impl::MsQuicStream> Stream) override; void OnCertificateReceived(beyond_impl::MsQuicConnection* Connection, QUIC_BUFFER* Certificate, QUIC_BUFFER* Chain) override; diff --git a/src/quic/src/quic_peer.cpp b/src/quic/src/quic_peer.cpp index 1fb249ae207d435626902c57c6bee8a81cc4cb4a..e6fab1937f287f0d0710b9fcebbd64ebd93c49b8 100644 --- a/src/quic/src/quic_peer.cpp +++ b/src/quic/src/quic_peer.cpp @@ -93,6 +93,11 @@ void QuicPeerStream::close(bool reconnect) void QuicPeerStream::OnShutdown(MsQuicStream* stream) { LOG(WARNING) << "TODO: Event QuicStreamShutdown"; + std::unique_lock<MUTEX_T> lk_send(send_mtx_, std::defer_lock); + std::unique_lock<MUTEX_T> lk_recv(recv_mtx_, std::defer_lock); + std::lock(lk_send, lk_recv); + reset(); // discard any data not yet sent (further + // sends not possible) } void QuicPeerStream::OnShutdownComplete(MsQuicStream* stream) @@ -101,9 +106,9 @@ void QuicPeerStream::OnShutdownComplete(MsQuicStream* stream) std::unique_lock<MUTEX_T> lk_recv(recv_mtx_, std::defer_lock); std::lock(lk_send, lk_recv); - // MsQuic releases stream instanca after this callback. Any use of it later is a bug. + // MsQuic releases stream instance after this callback. stream_ = nullptr; - // notify... + // notify...? } #ifdef ENABLE_PROFILER @@ -386,7 +391,7 @@ size_t QuicPeerStream::ws_recv_(QUIC_BUFFER buffer_in, size_t& size_consumed) { if (header.OpCode == OpCodeType::CLOSE) { - LOG(WARNING) << "[Quic/WebSocket] Received close control frame. Closing connection."; + LOG(WARNING) << "[Quic] Received close control frame (websocket). Closing connection."; close(); } diff --git a/src/quic/src/quic_peer.hpp b/src/quic/src/quic_peer.hpp index 453996d5b3f69b8130eefcc1d53b9bf0e37cc106..90624293bd0583b316b121660f155bc12f97f479 100644 --- a/src/quic/src/quic_peer.hpp +++ b/src/quic/src/quic_peer.hpp @@ -49,13 +49,13 @@ protected: void OnWriteComplete(MsQuicStream* stream, void* Context, bool Cancelled) override; private: - // release buffer back to free list + // release buffer back to free queue void release_buffer_(msgpack_buffer_t&&); - // try to flush all pending sends to network + // try to flush all pending sends to msquic void flush_send_queue_(); - // discard all queued sends (not yet passed to network) + // discard all queued sends (not yet passed to msquic) void discard_queued_sends_(); // TODO: node should tell what type it is (if this code is used) @@ -73,10 +73,16 @@ private: struct SendEvent { SendEvent(msgpack_buffer_t buffer); + // Each event has its own buffer to simplify compatibility with old tcp peer. + // A simple buffer per peer is be possible, but has to allow certain concurrent + // modifications (free at head, reserve at tail) and likely needs a custom + // msgpack_buffer_t type and packer. The user of PeerBase must consider any + // ordering requirements if uses the send methods concurrently (must synchronize + // its own calls: next send can't begin before previous returns). msgpack_buffer_t buffer; - bool pending; - bool complete; + bool pending; // set to true after buffers are passed to msquic + bool complete; // set to true once msquic reports write complete int t;