Skip to content
Snippets Groups Projects
Commit 632f5637 authored by Sebastian Hahta's avatar Sebastian Hahta
Browse files

documentation

parent 1862c265
Branches
No related tags found
No related merge requests found
...@@ -254,7 +254,7 @@ void WebSocketBase<SocketT>::connect(const ftl::URI& uri, int timeout) { ...@@ -254,7 +254,7 @@ void WebSocketBase<SocketT>::connect(const ftl::URI& uri, int timeout) {
} }
if (sscanf(line, "HTTP/1.1 %d", &status) != 1 || status != 101) { if (sscanf(line, "HTTP/1.1 %d", &status) != 1 || status != 101) {
throw FTL_Error("ERROR: Got bad status connecting to: " throw FTL_Error("ERROR: Got bad status connecting to: "
+ uri.getHost() + ": " + line); + uri.getBaseURI() + ": " + line);
} }
std::unordered_map<std::string, std::string> headers; std::unordered_map<std::string, std::string> headers;
......
...@@ -102,6 +102,8 @@ void ProxyClient::OnCertificateReceived(beyond_impl::MsQuicConnection* Connectio ...@@ -102,6 +102,8 @@ void ProxyClient::OnCertificateReceived(beyond_impl::MsQuicConnection* Connectio
beyond_impl::MsQuicStreamPtr ProxyClient::OpenStream(const std::string& name) beyond_impl::MsQuicStreamPtr ProxyClient::OpenStream(const std::string& name)
{ {
// Proxy server expects msgpack format data in array [command, args], easiest to use std::tuple
// for sending and manually unpack on the server (via msgpack::object).
Writer Handler; Writer Handler;
auto msg = std::make_tuple(MsgCtr++, "connect", name); auto msg = std::make_tuple(MsgCtr++, "connect", name);
......
...@@ -10,6 +10,11 @@ namespace beyond_impl ...@@ -10,6 +10,11 @@ namespace beyond_impl
class QuicUniverseImpl; class QuicUniverseImpl;
} }
// Quic proxy client prototype. On creation of a Quic stream, server is expected to proxy the stream to a node
// (creating a new quic stream from server to target node) and then passing the data between the two nodes.
// after the stream is created on server, the communication can proceed as usual. Additional commands can be
// sent to the server via a dedicated control quic stream (prototype/proof of concept).
class ProxyClient final : public beyond_impl::IMsQuicConnectionHandler, private beyond_impl::IMsQuicStreamHandler class ProxyClient final : public beyond_impl::IMsQuicConnectionHandler, private beyond_impl::IMsQuicStreamHandler
{ {
public: public:
...@@ -19,8 +24,12 @@ public: ...@@ -19,8 +24,12 @@ public:
void SetConnection(beyond_impl::MsQuicConnectionPtr ConnectionIn) ; void SetConnection(beyond_impl::MsQuicConnectionPtr ConnectionIn) ;
beyond_impl::MsQuicConnection* ConnectionHandle() { return mConnection.get(); }; beyond_impl::MsQuicConnection* ConnectionHandle() { return mConnection.get(); };
// Connect to another node via proxy using given name (set on remote using SetLocalName).
// New stream is created, msgpack message is sent telling the proxy the requested name.
beyond_impl::MsQuicStreamPtr OpenStream(const std::string& name); beyond_impl::MsQuicStreamPtr OpenStream(const std::string& name);
// Set local name, proxy must connect streams with this name to this instance,
// assuming the name was not already in use.
bool SetLocalName(const std::string& Name); bool SetLocalName(const std::string& Name);
private: private:
...@@ -34,6 +43,10 @@ private: ...@@ -34,6 +43,10 @@ private:
void OnConnect(beyond_impl::MsQuicConnection* Connection) override; void OnConnect(beyond_impl::MsQuicConnection* Connection) override;
void OnDisconnect(beyond_impl::MsQuicConnection* Connection) override; void OnDisconnect(beyond_impl::MsQuicConnection* Connection) override;
// Proxy created a new quic stream on behalf of another client.
// A new Peer instance is created and behaves exactly as directly connected Peer would
// (the call is passed to OnStreamCreate of QuicUniverse).
void OnStreamCreate(beyond_impl::MsQuicConnection* Connection, std::unique_ptr<beyond_impl::MsQuicStream> Stream) override; void OnStreamCreate(beyond_impl::MsQuicConnection* Connection, std::unique_ptr<beyond_impl::MsQuicStream> Stream) override;
void OnCertificateReceived(beyond_impl::MsQuicConnection* Connection, QUIC_BUFFER* Certificate, QUIC_BUFFER* Chain) override; void OnCertificateReceived(beyond_impl::MsQuicConnection* Connection, QUIC_BUFFER* Certificate, QUIC_BUFFER* Chain) override;
......
...@@ -93,6 +93,11 @@ void QuicPeerStream::close(bool reconnect) ...@@ -93,6 +93,11 @@ void QuicPeerStream::close(bool reconnect)
void QuicPeerStream::OnShutdown(MsQuicStream* stream) void QuicPeerStream::OnShutdown(MsQuicStream* stream)
{ {
LOG(WARNING) << "TODO: Event QuicStreamShutdown"; LOG(WARNING) << "TODO: Event QuicStreamShutdown";
std::unique_lock<MUTEX_T> lk_send(send_mtx_, std::defer_lock);
std::unique_lock<MUTEX_T> lk_recv(recv_mtx_, std::defer_lock);
std::lock(lk_send, lk_recv);
reset(); // discard any data not yet sent (further
// sends not possible)
} }
void QuicPeerStream::OnShutdownComplete(MsQuicStream* stream) void QuicPeerStream::OnShutdownComplete(MsQuicStream* stream)
...@@ -101,9 +106,9 @@ void QuicPeerStream::OnShutdownComplete(MsQuicStream* stream) ...@@ -101,9 +106,9 @@ void QuicPeerStream::OnShutdownComplete(MsQuicStream* stream)
std::unique_lock<MUTEX_T> lk_recv(recv_mtx_, std::defer_lock); std::unique_lock<MUTEX_T> lk_recv(recv_mtx_, std::defer_lock);
std::lock(lk_send, lk_recv); std::lock(lk_send, lk_recv);
// MsQuic releases stream instanca after this callback. Any use of it later is a bug. // MsQuic releases stream instance after this callback.
stream_ = nullptr; stream_ = nullptr;
// notify... // notify...?
} }
#ifdef ENABLE_PROFILER #ifdef ENABLE_PROFILER
...@@ -386,7 +391,7 @@ size_t QuicPeerStream::ws_recv_(QUIC_BUFFER buffer_in, size_t& size_consumed) ...@@ -386,7 +391,7 @@ size_t QuicPeerStream::ws_recv_(QUIC_BUFFER buffer_in, size_t& size_consumed)
{ {
if (header.OpCode == OpCodeType::CLOSE) if (header.OpCode == OpCodeType::CLOSE)
{ {
LOG(WARNING) << "[Quic/WebSocket] Received close control frame. Closing connection."; LOG(WARNING) << "[Quic] Received close control frame (websocket). Closing connection.";
close(); close();
} }
......
...@@ -49,13 +49,13 @@ protected: ...@@ -49,13 +49,13 @@ protected:
void OnWriteComplete(MsQuicStream* stream, void* Context, bool Cancelled) override; void OnWriteComplete(MsQuicStream* stream, void* Context, bool Cancelled) override;
private: private:
// release buffer back to free list // release buffer back to free queue
void release_buffer_(msgpack_buffer_t&&); void release_buffer_(msgpack_buffer_t&&);
// try to flush all pending sends to network // try to flush all pending sends to msquic
void flush_send_queue_(); void flush_send_queue_();
// discard all queued sends (not yet passed to network) // discard all queued sends (not yet passed to msquic)
void discard_queued_sends_(); void discard_queued_sends_();
// TODO: node should tell what type it is (if this code is used) // TODO: node should tell what type it is (if this code is used)
...@@ -73,10 +73,16 @@ private: ...@@ -73,10 +73,16 @@ private:
struct SendEvent { struct SendEvent {
SendEvent(msgpack_buffer_t buffer); SendEvent(msgpack_buffer_t buffer);
// Each event has its own buffer to simplify compatibility with old tcp peer.
// A simple buffer per peer is be possible, but has to allow certain concurrent
// modifications (free at head, reserve at tail) and likely needs a custom
// msgpack_buffer_t type and packer. The user of PeerBase must consider any
// ordering requirements if uses the send methods concurrently (must synchronize
// its own calls: next send can't begin before previous returns).
msgpack_buffer_t buffer; msgpack_buffer_t buffer;
bool pending; bool pending; // set to true after buffers are passed to msquic
bool complete; bool complete; // set to true once msquic reports write complete
int t; int t;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment