11 #include <so_5_extra/error_ranges.hpp> 13 #include <so_5/disp_binder.hpp> 14 #include <so_5/send_functions.hpp> 16 #include <so_5/disp/reuse/actual_work_thread_factory_to_use.hpp> 17 #include <so_5/disp/reuse/work_thread_activity_tracking.hpp> 18 #include <so_5/disp/reuse/data_source_prefix_helpers.hpp> 19 #include <so_5/disp/reuse/work_thread_factory_params.hpp> 21 #include <so_5/stats/repository.hpp> 22 #include <so_5/stats/messages.hpp> 23 #include <so_5/stats/std_names.hpp> 24 #include <so_5/stats/impl/activity_tracking.hpp> 26 #include <so_5/details/invoke_noexcept_code.hpp> 27 #include <so_5/details/rollback_on_exception.hpp> 28 #include <so_5/details/abort_on_fatal_error.hpp> 30 #include <so_5/outliving.hpp> 32 #include <asio/io_context.hpp> 33 #include <asio/io_context_strand.hpp> 34 #include <asio/post.hpp> 88 swap( a.m_thread_count, b.m_thread_count );
89 swap( a.m_io_context, b.m_io_context );
96 m_thread_count = count;
104 return m_thread_count;
126 ::asio::io_context & service )
128 m_io_context = std::shared_ptr< ::asio::io_context >(
129 std::addressof( service ),
131 [](::asio::io_context *) {} );
143 std::shared_ptr< ::asio::io_context > service )
145 m_io_context = std::move(service);
159 m_io_context = std::make_shared< ::asio::io_context >();
250 impl::basic_dispatcher_iface_shptr_t dispatcher )
noexcept 257 empty()
const noexcept {
return !m_dispatcher; }
300 return m_dispatcher->binder_with_external_strand( strand );
340 return m_dispatcher->binder_with_own_strand();
352 return m_dispatcher->io_context();
366 reset()
noexcept { m_dispatcher.reset(); }
501 on_demand( execution_demand_t demand )
noexcept = 0;
522 template<
typename Derived,
typename... Args >
528 ::asio::io_context & io_svc,
550 log_stream <<
"An exception caught in work thread " 551 "of so_5::extra::disp::asio_thread_pool dispatcher." 561 log_stream <<
"An unknown exception caught in work thread " 562 "of so_5::extra::disp::asio_thread_pool dispatcher." 573 ptr()->on_demand( std::move(demand) );
595 on_demand( execution_demand_t demand )
noexcept override 597 demand.call_handler( thread_id() );
636 m_thread_id = std::move(tid);
654 m_work_activity.start();
663 m_work_activity.stop();
672 ::so_5::stats::work_thread_activity_stats_t result;
673 result.m_working_stats = m_work_activity.take_stats();
697 outliving_reference_t< work_thread_activity_collector_t > activity_stats )
701 m_activity_stats.get().setup_thread_id( thread_id() );
708 on_demand( execution_demand_t demand )
noexcept override 710 m_activity_stats.get().activity_started();
712 demand.call_handler( thread_id() );
714 m_activity_stats.get().activity_finished();
736 actual_dispatcher_shptr_t dispatcher )
756 agent_t & agent )
noexcept override 759 m_dispatcher->agent_bound();
761 agent.so_bind_to_dispatcher( *
this );
769 m_dispatcher->agent_bound();
801 template<
typename Derived >
808 return static_cast< Derived & >( *
this );
815 push( execution_demand_t demand )
override 858 class binder_with_external_strand_t
final 859 :
public binder_template_t< binder_with_external_strand_t >
865 actual_dispatcher_shptr_t dispatcher,
866 outliving_reference_t< ::asio::io_context::strand > strand )
872 strand()
noexcept {
return m_strand.get(); }
890 class binder_with_own_strand_t
final 891 :
public binder_template_t< binder_with_own_strand_t >
897 actual_dispatcher_shptr_t dispatcher )
932 ::so_5::environment_t & env,
949 return { std::make_shared< binder_with_external_strand_t >(
951 outliving_mutable(strand) )
959 return { std::make_shared< binder_with_own_strand_t >(
965 io_context()
const noexcept override {
return *m_io_context; }
982 return m_demands_counter;
989 std::string_view data_sources_name_base )
991 data_source().set_data_sources_name_base( data_sources_name_base );
992 data_source().start( env.stats_repository() );
994 ::so_5::details::do_with_rollback_on_exception(
995 [&] { launch_work_threads(env); },
996 [
this] { data_source().stop(); } );
1002 ::so_5::details::invoke_noexcept_code( [
this] {
1004 m_io_context->stop();
1011 ::so_5::details::invoke_noexcept_code( [
this] {
1013 wait_work_threads();
1015 data_source().stop();
1044 #if defined(__clang__
) 1045 #pragma clang diagnostic push 1046 #pragma clang diagnostic ignored "-Wnon-virtual-dtor" 1086 const auto agents_count = m_dispatcher.m_agents_bound.load(
1087 std::memory_order_acquire );
1089 const auto demands_count = m_dispatcher.m_demands_counter.load(
1090 std::memory_order_acquire );
1092 send< ::so_5::stats::messages::quantity< std::size_t > >(
1095 ::so_5::stats::suffixes::agent_count(),
1101 send< ::so_5::stats::messages::quantity< std::size_t > >(
1104 ::so_5::stats::suffixes::work_thread_queue_size(),
1110 std::string_view name_base )
1112 using namespace ::so_5::disp::reuse;
1114 m_base_prefix = make_disp_prefix(
1121 start( ::so_5::stats::repository_t & repo )
1124 m_stats_repo = &repo;
1130 m_stats_repo->remove( *
this );
1134 #if defined(__clang__
) 1135 #pragma clang diagnostic pop 1162 environment_t & env ) = 0;
1194 environment_t & env,
1206 environment_t & env,
1207 ::asio::io_context & io_svc,
1211 work_thread_t::run< work_thread_without_activity_tracking_t >(
1240 #if defined(__clang__
) 1241 #pragma clang diagnostic push 1242 #pragma clang diagnostic ignored "-Wnon-virtual-dtor" 1263 std::size_t thread_count )
1267 for(
auto & c : m_collectors )
1268 c = std::make_unique< work_thread_activity_collector_t >();
1274 disp_data_source_t::distribute( mbox );
1276 for( std::size_t i = 0; i != m_collectors.size(); ++i )
1277 distribute_stats_for_work_thread_at( mbox, i );
1286 return *(m_collectors[index]);
1292 const mbox_t & mbox,
1295 std::ostringstream ss;
1296 ss << base_prefix().c_str() <<
"/wt-" << index;
1298 const ::so_5::stats::prefix_t prefix{ ss.str() };
1299 auto & collector = collector_at( index );
1301 so_5::send< ::so_5::stats::messages::work_thread_activity >(
1304 ::so_5::stats::suffixes::work_thread_activity(),
1305 collector.thread_id(),
1306 collector.take_activity_stats() );
1310 #if defined(__clang__
) 1311 #pragma clang diagnostic pop 1316 environment_t & env,
1330 environment_t & env,
1332 ::asio::io_context & io_svc,
1338 work_thread_t::run< work_thread_with_activity_tracking_t >(
1342 self.m_actual_data_source.collector_at(index) ) );
1369 typename Basic_Skeleton >
1375 outliving_reference_t< environment_t > env,
1378 std::string_view data_sources_name_base,
1401 environment_t & env )
override 1403 using namespace std;
1469 make( actual_dispatcher_shptr_t disp )
noexcept 1471 return { std::move( disp ) };
1490 auto c = std::thread::hardware_concurrency();
1564 environment_t & env,
1567 const std::string_view data_sources_name_base,
1575 "io_context is not set in disp_params" );
dispatcher_handle_t() noexcept=default
::asio::io_context & io_context() noexcept
Get reference to io_context from that dispatcher.
outliving_reference_t< ::asio::io_context::strand > m_strand
Strand to be used with this event_queue.
A handle for asio_thread_pool dispatcher.
void reset() noexcept
Drop the content of handle.
disp_binder_shptr_t binder(::asio::io_context::strand &strand) const
Get a binder for that dispatcher.
Ranges for error codes of each submodules.
::asio::io_context::strand & strand() noexcept
bool empty() const noexcept
Is this handle empty?
::asio::io_context::strand & strand() noexcept
operator bool() const noexcept
Is this handle empty?
impl::basic_dispatcher_iface_shptr_t m_dispatcher
A reference to actual implementation of a dispatcher.
bool operator!() const noexcept
Does this handle contain a reference to dispatcher?
static dispatcher_handle_t make(actual_dispatcher_shptr_t disp) noexcept
::asio::io_context::strand m_strand
Strand to be used with this event_queue.
binder_with_own_strand_t(actual_dispatcher_shptr_t dispatcher)
binder_with_external_strand_t(actual_dispatcher_shptr_t dispatcher, outliving_reference_t< ::asio::io_context::strand > strand)
dispatcher_handle_t(impl::basic_dispatcher_iface_shptr_t dispatcher) noexcept
disp_binder_shptr_t binder() const
Get a binder for that dispatcher.