#include <iostream>
#include <fmt/format.h>
#include <fmt/ostream.h>
using namespace std::chrono_literals;
using router_t = rr::express_router_t<>;
using traits_t =
router_t >;
class actual_handler_t;
using actual_handler_shptr_t = std::shared_ptr< actual_handler_t >;
using ws_registry_t = std::map< std::uint64_t, actual_handler_shptr_t >;
class actual_handler_t final
: public std::enable_shared_from_this< actual_handler_t >
{
private :
asio::steady_timer m_timer;
ws_registry_t & m_registry;
std::chrono::steady_clock::time_point m_last_activity_at;
std::string m_continuation_frame_buffer;
public :
actual_handler_t(
asio::io_context & io_ctx,
ws_registry_t & registry )
: m_timer{ io_ctx }
, m_registry{ registry }
{
std::cout << this << ": actual_handler created!" << std::endl;
}
~actual_handler_t()
{
std::cout << this << ": actual_handler destroyed!" << std::endl;
}
{
m_wsh = std::move(wsh);
activate( *m_wsh );
m_last_activity_at = std::chrono::steady_clock::now();
schedule_next_ping();
}
{
m_last_activity_at = std::chrono::steady_clock::now();
reschedule_timer();
if( m->opcode() == rws::opcode_t::continuation_frame )
{
m_continuation_frame_buffer.append( m->payload() );
if( m->is_final() )
wsh->send_message(
m_continued_frame_opcode,
m_continuation_frame_buffer );
}
else if( m->opcode() == rws::opcode_t::ping_frame )
{
auto pong = *m;
pong.set_opcode( rws::opcode_t::pong_frame );
wsh->send_message( pong );
}
else if( m->opcode() == rws::opcode_t::pong_frame )
{
}
else if( m->opcode() == rws::opcode_t::connection_close_frame )
{
const auto id = wsh->connection_id();
wsh->send_message( *m );
wsh->shutdown();
m_registry.erase( id );
}
else
{
if( m->is_final() )
wsh->send_message( *m );
else
{
m_continued_frame_opcode = m->opcode();
m_continuation_frame_buffer.append( m->payload() );
}
}
}
private :
const std::chrono::steady_clock::duration m_ping_pause{ 5s };
void schedule_next_ping()
{
m_timer.expires_after( m_ping_pause );
m_timer.async_wait( [this]( auto ec ) {
if( ec ) return;
const auto kill_deadline = m_last_activity_at + 2 * m_ping_pause;
const auto now = std::chrono::steady_clock::now();
if( now >= kill_deadline )
{
std::cout << this << ": client seems to be dead" << std::endl;
const auto id = m_wsh->connection_id();
m_wsh->kill();
m_registry.erase( id );
}
else
{
std::cout << this << ": have to send a ping to client" << std::endl;
m_wsh->send_message(
rws::opcode_t::ping_frame,
this->schedule_next_ping();
}
} );
}
void reschedule_timer()
{
m_timer.cancel();
schedule_next_ping();
}
};
struct handler_proxy_t
{
actual_handler_t & m_handler;
handler_proxy_t(
actual_handler_t & handler )
: m_handler{ handler }
{}
{
m_handler.on_message( std::move(wsh), std::move(m) );
}
};
auto server_handler( asio::io_context & io_ctx, ws_registry_t & registry )
{
auto router = std::make_unique< router_t >();
router->http_get(
"/",
[&io_ctx, ®istry]( auto req, auto ){
{
auto handler = std::make_shared< actual_handler_t >(
io_ctx, registry );
auto wsh = rws::upgrade< traits_t >(
*req,
rws::activation_t::delayed,
handler_proxy_t{ *handler } );
registry.emplace( wsh->connection_id(), handler );
handler->start( wsh );
}
} );
return router;
}
int main()
{
using namespace std::chrono;
try
{
asio::io_context io_ctx;
ws_registry_t registry;
io_ctx,
restinio::on_this_thread<traits_t>()
.address( "localhost" )
.port( 9001 )
.request_handler( server_handler( io_ctx, registry ) )
.read_next_http_message_timelimit( 10s )
.write_http_response_timelimit( 1s )
.handle_request_timeout( 1s )
.cleanup_func( [&]{ registry.clear(); } ) );
}
catch( const std::exception & ex )
{
std::cerr << "Error: " << ex.what() << std::endl;
return 1;
}
return 0;
}