2
3
6
7
8
9
10
11
15#include <so_5/atomic_refcounted.hpp>
17#include <so_5/send_functions.hpp>
19#include <so_5/stats/repository.hpp>
20#include <so_5/stats/messages.hpp>
21#include <so_5/stats/std_names.hpp>
23#include <so_5/disp/reuse/data_source_prefix_helpers.hpp>
36
37
38
39
40
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
91
92
93
94
95
96using queue_description_holder_ref_t =
100
101
102
103
104
105inline queue_description_holder_ref_t
109 std::size_t agent_count )
113 std::ostringstream ss;
114 ss << prefix
.c_str() <<
"/cq/" << coop_id;
124
125
126
127
128
129
130
131inline queue_description_holder_ref_t
138 std::ostringstream ss;
150
151
152
153
154
155
173
174
175
176
177
178
182 const so_5::current_thread_id_t & thread_id,
188
189
190
191
192
193
194
195
208
209
210
211
212
229 const std::string_view disp_type,
231 const std::string_view name_basic,
234 const void * disp_pointer )
242 const mbox_t & mbox )
override
262 [
this, &mbox](
const so_5::current_thread_id_t & thread_id,
297
298
299
300
307 const so_5::current_thread_id_t & thread_id,
315
316
317
318
319 using wt_activity_info_container_t =
329
330
331
332
333
334
335
336
337
340#if defined(__clang__
)
341#pragma clang diagnostic push
342#pragma clang diagnostic ignored "-Wnon-virtual-dtor"
351 wt_activity_info_container_t & wt_activity_holder )
366 auto current = holder;
376 std::size_t thread_count )
override
401 const so_5::current_thread_id_t & thread_id,
420 template<
typename Lambda >
434 template<
typename Lambda >
453#if defined(__clang__
)
454#pragma clang diagnostic pop
460 std::ostringstream ss;
#define SO_5_CHECK_INVARIANT(what, data)
static execution_hint_t so_create_execution_hint(execution_demand_t &demand)
Create execution hint for the specified demand.
coop_handle_t so_coop() const
Get a handle of agent's coop.
void so_bind_to_dispatcher(event_queue_t &queue) noexcept
Binding agent to the dispatcher.
The base class for the object with a reference counting.
auto id() const noexcept
Get the ID of the coop.
Parameters for binding agents to adv_thread_pool dispatcher.
bind_params_t & fifo(fifo_t v)
Set FIFO type.
fifo_t query_fifo() const
Get FIFO type.
Alias for namespace with traits of event queue.
std::size_t thread_count() const
Getter for thread count.
disp_params_t & thread_count(std::size_t count)
Setter for thread count.
disp_params_t & tune_queue_params(L tunner)
Tuner for queue parameters.
std::size_t m_thread_count
Count of working threads.
queue_traits::queue_params_t m_queue_params
Queue parameters.
const queue_traits::queue_params_t & queue_params() const
Getter for queue parameters.
disp_params_t()
Default constructor.
disp_params_t & set_queue_params(queue_traits::queue_params_t p)
Setter for queue parameters.
friend void swap(disp_params_t &a, disp_params_t &b) noexcept
A handle for adv_thread_pool dispatcher.
disp_binder_shptr_t binder(bind_params_t params) const
Get a binder for that dispatcher.
std::enable_if_t< std::is_invocable_v< Setter, bind_params_t & >, disp_binder_shptr_t > binder(Setter &¶ms_setter) const
Create a binder for that dispatcher.
disp_binder_shptr_t binder() const
Get a binder for that dispatcher with default binding params.
bool operator!() const noexcept
Does this handle contain a reference to dispatcher?
bool empty() const noexcept
Is this handle empty?
impl::basic_dispatcher_iface_shptr_t m_dispatcher
A reference to actual implementation of a dispatcher.
dispatcher_handle_t() noexcept=default
dispatcher_handle_t(impl::basic_dispatcher_iface_shptr_t dispatcher) noexcept
void reset() noexcept
Drop the content of handle.
operator bool() const noexcept
Is this handle empty?
void bind(agent_t &agent) noexcept override
Bind agent to dispatcher.
const bind_params_t m_params
Binding parameters.
void unbind(agent_t &agent) noexcept override
Unbind agent from dispatcher.
actual_binder_t(actual_dispatcher_iface_shptr_t disp, bind_params_t params) noexcept
actual_dispatcher_iface_shptr_t m_disp
Dispatcher to be used.
void preallocate_resources(agent_t &agent) override
Allocate resources in dispatcher for new agent.
void undo_preallocation(agent_t &agent) noexcept override
Undo resources allocation.
An actual interface of thread-pool dispatcher.
virtual event_queue_t * query_resources_for_agent(agent_t &agent) noexcept=0
Get resources allocated for an agent.
virtual void undo_preallocation_for_agent(agent_t &agent) noexcept=0
Undo preallocation of resources for a new agent.
virtual void unbind_agent(agent_t &agent) noexcept=0
Unbind agent from the dispatcher.
virtual void preallocate_resources_for_agent(agent_t &agent, const bind_params_t ¶ms)=0
Preallocate all necessary resources for a new agent.
event_queue_t * query_resources_for_agent(agent_t &agent) noexcept override
Get resources allocated for an agent.
void unbind_agent(agent_t &agent) noexcept override
Unbind agent from the dispatcher.
disp_binder_shptr_t binder(bind_params_t params) override
void undo_preallocation_for_agent(agent_t &agent) noexcept override
Undo preallocation of resources for a new agent.
actual_dispatcher_implementation_t(outliving_reference_t< environment_t > env, const std::string_view name_base, disp_params_t params)
dispatcher_template_t< Work_Thread > m_impl
Real dispatcher.
void preallocate_resources_for_agent(agent_t &agent, const bind_params_t ¶ms) override
Preallocate all necessary resources for a new agent.
~actual_dispatcher_implementation_t() noexcept override
execution_demand_t peek_front()
Get the information about the front demand.
agent_queue_t * intrusive_queue_giveout_next() noexcept
Give away a pointer to the next agent_queue.
void push(execution_demand_t demand) override
Push next demand to queue.
std::size_t size() const noexcept
Get the current size of the queue.
bool empty() const
Is empty queue?
agent_queue_t * m_intrusive_queue_next
The next item in intrusive queue of agent_queues.
void intrusive_queue_set_next(agent_queue_t *next) noexcept
Set a pointer to the next agent_queue.
static constexpr const unsigned int thread_safe_worker
spinlock_t m_lock
Object's lock.
bool active() const
Is active queue?
agent_queue_t(outliving_reference_t< dispatcher_queue_t > disp_queue, const bind_params_t &)
Constructor.
unsigned int m_workers
Count of active workers.
demand_t * m_tail_demand
Tail of the demand's queue.
bool worker_finished(unsigned int type_of_worker)
Signal about finishing of worker of the specified type.
void push_evt_finish(execution_demand_t demand) noexcept override
std::atomic< std::size_t > m_size
Current size of the queue.
~agent_queue_t() override
static constexpr const unsigned int not_thread_safe_worker
bool is_there_not_thread_safe_worker() const
Check the presence of thread unsafe worker.
void delete_head() noexcept
Helper method for deleting queue's head object.
bool worker_started(unsigned int type_of_worker)
Remove the front demand.
bool m_active
Is this queue activated?
demand_t m_head_demand
Head of the demand's queue.
spinlock_t & lock() noexcept
Access to the queue's lock.
void push_evt_start(execution_demand_t demand) override
dispatcher_queue_t & m_disp_queue
bool is_there_any_worker() const
Check the presence of any worker at the moment.
The very basic interface of adv_thread_pool dispatcher.
virtual ~basic_dispatcher_iface_t() noexcept=default
virtual disp_binder_shptr_t binder(bind_params_t params)=0
static dispatcher_handle_t make(actual_dispatcher_iface_shptr_t disp) noexcept
Part of implementation of work thread without activity tracing.
no_activity_tracking_impl_t(outliving_reference_t< dispatcher_queue_t > queue, work_thread_holder_t thread_holder)
Initializing constructor.
void take_activity_stats(L)
Part of implementation of work thread with activity tracing.
so_5::stats::activity_tracking_stuff::stats_collector_t< so_5::stats::activity_tracking_stuff::external_lock<> > m_waiting_stats_collector
A collector for waiting stats.
activity_tracking_traits::lock_t m_stats_lock
Lock for activity statistics.
void take_activity_stats(L lambda)
with_activity_tracking_impl_t(outliving_reference_t< dispatcher_queue_t > queue, work_thread_holder_t thread_holder)
Initializing constructor.
so_5::stats::activity_tracking_stuff::stats_collector_t< so_5::stats::activity_tracking_stuff::external_lock<> > m_work_activity_collector
A collector for work activity.
void process_queue(agent_queue_t &queue)
Processing of demands from agent queue.
so_5::current_thread_id_t thread_id() const
Get ID of work thread.
void start()
Launch work thread.
work_thread_template_t(outliving_reference_t< dispatcher_queue_t > queue, work_thread_holder_t thread_holder)
Initializing constructor.
agent_queue_t * pop_agent_queue() noexcept
An attempt of extraction of non-empty agent queue.
void body()
Thread body method.
An interface for somethine like condition variable for waiting on MPMC queue lock.
virtual void notify() noexcept=0
Notification for waiting customer.
virtual void wait() noexcept=0
Waiting on condition.
An interface for lock for MPMC queue.
virtual condition_unique_ptr_t allocate_condition()=0
Create condition object for another MPMC queue's customer.
Container for storing parameters for MPMC queue.
queue_params_t & operator=(queue_params_t &&o) noexcept
Move operator.
friend void swap(queue_params_t &a, queue_params_t &b) noexcept
std::size_t next_thread_wakeup_threshold() const
Getter for thread wakeup threshold value.
const lock_factory_t & lock_factory() const
Getter for lock factory.
Multi-producer/Multi-consumer queue of pointers to event queues.
T * m_tail
The current tail of the intrusive queue.
T * try_switch_to_another(T *current) noexcept
Switch the current non-empty queue to another one if it is possible.
void try_wakeup_someone_if_possible() noexcept
An attempt to wakeup another sleeping thread if it's necessary and possible.
const std::size_t m_max_thread_count
Maximum count of working threads to be used with that mpmc_queue.
so_5::disp::mpmc_queue_traits::lock_unique_ptr_t m_lock
Object's lock.
std::size_t m_queue_size
The current size of the intrusive queue.
void schedule(T *queue) noexcept
Schedule execution of demands from the queue.
T * m_head
The current head of the intrusive queue.
queue_of_queues_t(const so_5::disp::mpmc_queue_traits::queue_params_t &queue_params, std::size_t thread_count)
void shutdown() noexcept
Initiate shutdown for working threads.
void pop_and_notify_one_waiting_customer() noexcept
bool m_shutdown
Shutdown flag.
const std::size_t m_next_thread_wakeup_threshold
Threshold for wake up next working thread if there are non-empty agent queues.
so_5::disp::mpmc_queue_traits::condition_unique_ptr_t allocate_condition()
T * pop(so_5::disp::mpmc_queue_traits::condition_t &condition) noexcept
Get next active queue.
void push_to_queue(T *new_tail) noexcept
Helper method that pushes a new item to the end of the queue.
bool m_wakeup_in_progress
Is some working thread in wakeup process now?
T * pop_head() noexcept
Helper method that extracts the head item from the queue.
std::vector< so_5::disp::mpmc_queue_traits::condition_t * > m_waiting_customers
Waiting threads.
Actual type of statical information collector.
std::size_t m_agent_count
void for_each_thread_activity(Lambda lambda) const
std::size_t m_thread_count
std::size_t thread_count() const
void for_each_queue(Lambda lambda) const
std::size_t agent_count() const
intrusive_ptr_t< queue_description_holder_t > m_queue_desc_head
collector_t(wt_activity_info_container_t &wt_activity_holder)
virtual void set_thread_count(std::size_t thread_count) override
Informs consumer about actual actual thread count.
wt_activity_info_container_t & m_wt_activity
virtual void add_work_thread_activity(const so_5::current_thread_id_t &thread_id, const stats::work_thread_activity_stats_t &stats) override
Informs consumer about yet another working thread activity.
virtual void add_queue(const intrusive_ptr_t< queue_description_holder_t > &info) override
Informs counsumer about yet another event queue.
intrusive_ptr_t< queue_description_holder_t > m_queue_desc_tail
Type of data source for thread-pool-like dispatchers.
wt_activity_info_container_t m_wt_activity
Container for collecting work activity stats from working threads.
data_source_t(stats_supplier_t &supplier)
Initializing constructor.
const stats::prefix_t & prefix() const
Basic prefix for data source names.
stats::prefix_t make_work_thread_prefix(const so_5::current_thread_id_t &tid)
stats::prefix_t m_prefix
Prefix for data-source names.
void distribute(const mbox_t &mbox) override
Distribution of statistical information.
void set_data_sources_name_base(const std::string_view disp_type, const std::string_view name_basic, const void *disp_pointer)
Setting of data-source basic name.
stats_supplier_t & m_supplier
Statistical information supplier.
An interface of collector of information about thread-pool-like dispatcher state.
virtual void add_work_thread_activity(const so_5::current_thread_id_t &thread_id, const so_5::stats::work_thread_activity_stats_t &stats)=0
Informs consumer about yet another working thread activity.
virtual void set_thread_count(std::size_t value)=0
Informs consumer about actual actual thread count.
virtual void add_queue(const intrusive_ptr_t< queue_description_holder_t > &queue_desc)=0
Informs counsumer about yet another event queue.
An interface of supplier of information about thread-pool-like dispatcher state.
virtual void supply(stats_consumer_t &consumer)=0
virtual ~stats_supplier_t()
Mixin with thread activity tracking flag.
friend void swap(work_thread_activity_tracking_flag_mixin_t &a, work_thread_activity_tracking_flag_mixin_t &b) noexcept
Mixin that holds optional work thread factory.
friend void swap(work_thread_factory_mixin_t &a, work_thread_factory_mixin_t &b) noexcept
void undo_preallocation_for_agent(agent_t &agent) noexcept
Undo preallocation of resources for a new agent.
void preallocate_resources_for_agent(agent_t &agent, const Bind_Params ¶ms)
Preallocate all necessary resources for a new agent.
void bind_agent_with_cooperation_fifo(agent_ref_t agent, const Bind_Params ¶ms)
Creation event queue for an agent with individual FIFO.
void bind_agent_with_inidividual_fifo(agent_ref_t agent, const Bind_Params ¶ms)
Creation event queue for an agent with individual FIFO.
const std::size_t m_thread_count
Count of working threads.
virtual void supply(tp_stats::stats_consumer_t &consumer) override
Implementation of stats_supplier-related stuff.
agent_map_t m_agents
Information of agents.
tp_stats::stats_supplier_t & stats_supplier()
Helper method for casting to stats_supplier-object.
void shutdown_then_wait() noexcept
Dispatcher_Queue m_queue
Queue for active agent's queues.
void start(environment_t &env)
event_queue_t * query_resources_for_agent(agent_t &agent) noexcept
Get resources allocated for an agent.
std::vector< std::unique_ptr< Work_Thread > > m_threads
Pool of work threads.
void unbind_agent(agent_t &agent) noexcept
Unbind agent from the dispatcher.
dispatcher_t(const dispatcher_t &)=delete
dispatcher_t(environment_t &env, const so_5::disp::reuse::work_thread_factory_mixin_t< Dispatcher_Params > &disp_params, const std::string_view name_base, std::size_t thread_count, const so_5::disp::mpmc_queue_traits::queue_params_t &queue_params)
Constructor.
agent_queue_ref_t make_new_agent_queue(const Bind_Params ¶ms)
Helper method for creating event queue for agents/cooperations.
std::mutex m_lock
Object's lock.
dispatcher_t & operator=(const dispatcher_t &)=delete
cooperation_map_t m_cooperations
Information about cooperations.
stats::manually_registered_source_holder_t< tp_stats::data_source_t > m_data_source
Data source for the run-time monitoring.
An analog of unique_ptr for abstract_work_thread.
work_thread_holder_t(work_thread_holder_t &&o) noexcept
Interface for dispatcher binders.
stats::repository_t & stats_repository()
Access to repository of data sources for run-time monitoring.
An interface of event queue for agent.
bool is_thread_safe() const
Is thread safe handler?
void exec(current_thread_id_t working_thread_id) const
Call event handler directly.
Template class for smart reference wrapper on the atomic_refcounted_t.
intrusive_ptr_t(T *obj) noexcept
Constructor for a raw pointer.
void reset() noexcept
Drop controlled object.
T * operator->() const noexcept
intrusive_ptr_t & operator=(const intrusive_ptr_t &o) noexcept
Copy operator.
Helper class for indication of long-lived reference via its type.
Base for the case of externals stats lock.
Helper for collecting activity stats.
so_5::stats::activity_stats_t take_stats()
An addition to auto_registered_source_holder for the cases where manual registration of data_source s...
Data_Source & get() noexcept
void start(outliving_reference_t< repository_t > repo)
A type for storing prefix of data_source name.
constexpr const char * c_str() const noexcept
Access to prefix value.
prefix_t(const std::string &value) noexcept(noexcept(value.c_str()))
Initializing constructor.
An interface of data source.
Some reusable and low-level classes/functions which can be used in public header files.
void adjust_thread_count(disp_params_t ¶ms)
Sets the thread count to default value if used do not specify actual thread count.
Internal implementation details of advanced thread pool dispatcher.
Advanced thread pool dispatcher.
dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, std::size_t thread_count)
Create an instance of adv_thread_pool dispatcher.
dispatcher_handle_t make_dispatcher(environment_t &env, std::size_t thread_count)
Create an instance of adv_thread_pool dispatcher.
dispatcher_handle_t make_dispatcher(environment_t &env)
Create an instance of adv_thread_pool dispatcher with the default count of work threads.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance of adv_thread_pool dispatcher.
fifo_t
Type of FIFO mechanism for agent's demands.
@ cooperation
A FIFO for demands for all agents from the same cooperation.
@ individual
A FIFO for demands only for one agent.
Various stuff related to MPMC event queue implementation and tuning.
Helper tools for implementation of run-time monitoring for thread-pool-like dispatchers.
queue_description_holder_ref_t make_queue_desc_holder(const stats::prefix_t &prefix, coop_id_t coop_id, std::size_t agent_count)
Helper function for creating queue_description_holder object.
queue_description_holder_ref_t make_queue_desc_holder(const stats::prefix_t &prefix, const void *agent)
Helper function for creating queue_description_holder object.
Reusable components for dispatchers.
so_5::stats::prefix_t make_disp_prefix(const std::string_view disp_type, const std::string_view data_sources_name_base, const void *disp_this_pointer)
Create basic prefix for dispatcher data source names.
std::size_t default_thread_pool_size()
A helper function for detecting default thread count for thread pool.
std::unique_ptr< Disp_Iface_Type > make_actual_dispatcher(outliving_reference_t< environment_t > env, const std::string_view name_base, Disp_Params_Type disp_params, Args &&...args)
Helper function for creation of dispatcher instance with respect to work thread activity tracking fla...
Reusable implementation of some thread pool dispatcher functionality which can be used in other threa...
Details of SObjectizer run-time implementations.
void ensure_join_from_different_thread(current_thread_id_t thread_to_be_joined)
Ensures that join will be called from different thread.
Declarations of messages used by run-time monitoring and statistics.
Predefined suffixes of data-sources.
SO_5_FUNC suffix_t agent_count()
Suffix for data source with count of agents bound to some entity.
SO_5_FUNC suffix_t work_thread_queue_size()
Suffix for data source with count of demands in a working thread event queue.
SO_5_FUNC suffix_t work_thread_activity()
Suffix for data source with work thread activity statistics.
SO_5_FUNC suffix_t disp_thread_count()
Suffix for data source with count of work threads for dispatcher.
All stuff related to run-time monitoring and statistics.
Private part of message limit implementation.
std::thread::id raw_id_from_current_thread_id(const current_thread_id_t &w)
Get the raw thread id from current_thread_id.
void send(Target &&to, Args &&... args)
A utility function for creating and delivering a message or a signal.
current_thread_id_t query_current_thread_id()
Get the ID of the current thread.
outliving_reference_t< T > outliving_mutable(T &r)
Make outliving_reference wrapper for mutable reference.
Helper for showing pointer value.
pointer(const void *what)
Adaptation of common implementation of thread-pool-like dispatcher to the specific of this thread-poo...
static bool is_individual_fifo(const bind_params_t ¶ms) noexcept
static constexpr std::string_view dispatcher_type_name() noexcept
static void wait_for_queue_emptyness(agent_queue_t &) noexcept
Actual demand in event queue.
demand_t * m_next
Next item in queue.
execution_demand_t m_demand
Actual demand.
demand_t(execution_demand_t &&original)
Main data for work_thread.
common_data_t(outliving_reference_t< dispatcher_queue_t > queue, work_thread_holder_t thread_holder)
work_thread_holder_t m_thread_holder
Actual thread.
so_5::current_thread_id_t m_thread_id
ID of thread.
dispatcher_queue_t * m_disp_queue
Dispatcher's queue.
so_5::disp::mpmc_queue_traits::condition_unique_ptr_t m_condition
Waiting object for long wait.
Activity stats for a particular work thread.
so_5::current_thread_id_t m_thread_id
wt_activity_info_t(const so_5::current_thread_id_t &thread_id, const stats::work_thread_activity_stats_t &stats)
stats::work_thread_activity_stats_t m_stats
A holder of one event queue information block.
intrusive_ptr_t< queue_description_holder_t > m_next
Next item in the chain of queues descriptions.
queue_description_t m_desc
Actual description for the event queue.
Description of one event queue.
stats::prefix_t m_prefix
Prefix for data-sources related to that queue.
std::size_t m_agent_count
Count of agents bound to that queue.
std::size_t m_queue_size
Current queue size.
tp_stats::queue_description_holder_ref_t m_queue_desc
Description of that queue for run-time monitoring.
void update_queue_stats()
Update queue description with current information.
bool cooperation_fifo() const
Does agent use cooperation FIFO?
agent_data_t(agent_queue_ref_t queue, const stats::prefix_t &data_source_name_prefix, const agent_t *agent_ptr)
Constructor for the case when agent uses individual FIFO.
agent_queue_ref_t m_queue
Event queue for the agent.
agent_data_t(agent_queue_ref_t queue)
Constructor for the case when agent uses cooperation FIFO.
Data for one cooperation.
tp_stats::queue_description_holder_ref_t m_queue_desc
Description of that queue for run-time monitoring.
agent_queue_ref_t m_queue
Event queue for the cooperation.
cooperation_data_t(agent_queue_ref_t queue, std::size_t agents, const stats::prefix_t &data_source_name_prefix, coop_id_t coop_id)
void update_queue_stats()
Update queue information for run-time monitoring.
std::size_t m_agents
Count of agents form that cooperation.
A description of event execution demand.
agent_t * m_receiver
Receiver of demand.
Various traits of activity tracking implementation.
A message with value of some quantity.
Information about one work thread activity.
Stats for a work thread activity.
activity_stats_t m_working_stats
Stats for processed events.
activity_stats_t m_waiting_stats
Stats for waiting periods.