SObjectizer 5.8
Loading...
Searching...
No Matches
prio_one_thread/reuse/work_thread.hpp
Go to the documentation of this file.
1/*
2 SObjectizer 5.
3*/
4
5/*!
6 * \file
7 * \brief A working thread for dispatcher with one common working
8 * thread and support of demands priority.
9 *
10 * \since v.5.5.8
11 */
12
13#pragma once
14
15#include <so_5/current_thread_id.hpp>
16
17#include <so_5/disp/abstract_work_thread.hpp>
18
19#include <so_5/stats/work_thread_activity.hpp>
20#include <so_5/stats/impl/activity_tracking.hpp>
21
22#include <so_5/details/at_scope_exit.hpp>
23
24#include <so_5/impl/thread_join_stuff.hpp>
25
26#include <thread>
27
28namespace so_5 {
29
30namespace disp {
31
32namespace prio_one_thread {
33
34namespace reuse {
35
37
38//
39// common_data_t
40//
41/*!
42 * \brief A common data for all work thread implementations.
43 *
44 * \since v.5.5.18
45 */
46template< typename Demand_Queue >
48 {
49 //! Demands queue to work for.
50 Demand_Queue & m_queue;
51
52 //! Thread object.
54
55 //! ID of the work thread.
56 /*!
57 * \note Receives actual value only after successful start
58 * of the thread.
59 */
60 so_5::current_thread_id_t m_thread_id;
61
63 work_thread_holder_t thread_holder,
64 Demand_Queue & queue )
65 : m_queue( queue )
66 , m_thread_holder{ std::move(thread_holder) }
67 {}
68 };
69
70//
71// no_activity_tracking_impl_t
72//
73/*!
74 * \brief A part of implementation of work thread without activity tracking.
75 *
76 * \since v.5.5.18
77 */
78template< typename Demand_Queue >
79class no_activity_tracking_impl_t : protected common_data_t< Demand_Queue >
80 {
81 using base_type_t = common_data_t< Demand_Queue >;
82
83 public :
85 work_thread_holder_t thread_holder,
86 Demand_Queue & queue )
87 : base_type_t( std::move(thread_holder), queue )
88 {}
89
90 protected :
91 void
92 work_started() { /* Nothing to do. */ }
93
94 void
95 work_finished() { /* Nothing to do. */ }
96
97 void
98 wait_started() { /* Nothing to do. */ }
99
100 void
101 wait_finished() { /* Nothing to do. */ }
102 };
103
104//
105// with_activity_tracking_impl_t
106//
107/*!
108 * \brief A part of implementation of work thread with activity tracking.
109 *
110 * \since v.5.5.18
111 */
112template< typename Demand_Queue >
113class with_activity_tracking_impl_t : protected common_data_t< Demand_Queue >
114 {
115 using base_type_t = common_data_t< Demand_Queue >;
116
117 public :
119 work_thread_holder_t thread_holder,
120 Demand_Queue & queue )
121 : base_type_t( std::move(thread_holder), queue )
122 {}
123
134
135 protected :
136 //! Statictics for work activity.
140
141 //! Statictics for wait activity.
145
146 void
148
149 void
151
152 void
154
155 void
157 };
158
159} /* namespace work_thread_details */
160
161//
162// work_thread_template_t
163//
164/*!
165 * \brief A working thread for dispatcher with one common working
166 * thread and support of demands priority.
167 *
168 * \since v.5.5.8, v.5.5.18
169 */
170template<
171 typename Demand_Queue,
172 template<class> class Work_Thread >
173class work_thread_template_t : public Work_Thread< Demand_Queue >
174 {
175 using base_type_t = Work_Thread< Demand_Queue >;
176
177 public :
178 //! Initializing constructor.
180 work_thread_holder_t thread_holder,
181 Demand_Queue & queue )
182 : base_type_t( std::move(thread_holder), queue )
183 {}
184
185 void
187 {
188 this->m_thread_holder.unchecked_get().start( [this]() { body(); } );
189 }
190
191 void
193 {
195 this->m_thread_holder.unchecked_get().join();
196 }
197
198 [[nodiscard]]
199 so_5::current_thread_id_t
200 thread_id() const
201 {
202 return this->m_thread_id;
203 }
204
205 private :
206 void
208 {
209 this->m_thread_id = so_5::query_current_thread_id();
210
211 try
212 {
213 for(;;)
214 {
215 auto d = this->pop_demand();
216 this->call_handler( *d );
217 }
218 }
219 catch( const typename Demand_Queue::shutdown_ex_t & )
220 {}
221 }
222
223 [[nodiscard]]
224 auto
225 pop_demand() -> decltype(std::declval<Demand_Queue>().pop())
226 {
227 this->wait_started();
228 auto wait_meter_stopper = so_5::details::at_scope_exit(
229 [this] { this->wait_finished(); } );
230
231 return this->m_queue.pop();
232 }
233
234 void
236 {
237 this->work_started();
238 auto work_meter_stopper = so_5::details::at_scope_exit(
239 [this] { this->work_finished(); } );
240
241 demand.call_handler( this->m_thread_id );
242 }
243 };
244
245//
246// work_thread_no_activity_tracking_t
247//
248template< typename Demand_Queue >
249using work_thread_no_activity_tracking_t =
251 Demand_Queue,
253
254//
255// work_thread_with_activity_tracking_t
256//
257template< typename Demand_Queue >
258using work_thread_with_activity_tracking_t =
260 Demand_Queue,
262
263} /* namespace reuse */
264
265} /* namespace prio_one_thread */
266
267} /* namespace disp */
268
269} /* namespace so_5 */
A base class for agents.
Definition agent.hpp:673
priority_t so_priority() const noexcept
Get the priority of the agent.
Definition agent.hpp:2555
void so_bind_to_dispatcher(event_queue_t &queue) noexcept
Binding agent to the dispatcher.
Definition agent.cpp:872
An analog of std::lock_guard for MPSC queue lock.
Container for storing parameters for MPSC queue.
const lock_factory_t & lock_factory() const
Getter for lock factory.
queue_params_t & operator=(queue_params_t &&o) noexcept
Move operator.
friend void swap(queue_params_t &a, queue_params_t &b) noexcept
An analog of std::unique_lock for MPSC queue lock.
disp_params_t & set_queue_params(queue_traits::queue_params_t p)
Setter for queue parameters.
const queue_traits::queue_params_t & queue_params() const
Getter for queue parameters.
bool operator!() const noexcept
Does this handle contain a reference to dispatcher?
disp_binder_shptr_t binder() const noexcept
Get a binder for that dispatcher.
A demand queue for dispatcher with one common working thread and round-robin processing of prioritise...
void agent_unbound(priority_t priority)
Notification about detachment of an agent from the queue.
demand_queue_t(queue_traits::lock_unique_ptr_t lock, const quotes_t &quotes)
queue_for_one_priority_t m_priorities[static_cast< std::size_t >(priority_t::p_max)+1]
Subqueues for priorities.
void agent_bound(priority_t priority)
Notification about attachment of yet another agent to the queue.
event_queue_t & event_queue_by_priority(priority_t priority)
Get queue for the priority specified.
void cleanup_queue(queue_for_one_priority_t &queue_info)
Destroy all demands in the queue specified.
void push(queue_for_one_priority_t *subqueue, demand_unique_ptr_t demand)
Push a new demand to the queue.
void add_demand_to_queue(queue_for_one_priority_t &queue, demand_unique_ptr_t demand)
Add a new demand to the tail of the queue specified.
disp_data_source_t(const std::string_view name_base, outliving_reference_t< dispatcher_template_t > disp)
void distribute_value_for_priority(const mbox_t &mbox, priority_t priority, std::size_t quote, std::size_t agents_count, std::size_t demands_count)
void distribute(const mbox_t &mbox) override
Send appropriate notification about the current value.
dispatcher_template_t(outliving_reference_t< environment_t > env, const std::string_view name_base, disp_params_t params, const quotes_t &quotes)
void preallocate_resources(agent_t &) override
Allocate resources in dispatcher for new agent.
stats::auto_registered_source_holder_t< disp_data_source_t > m_data_source
Data source for run-time monitoring.
size_t query(priority_t prio) const
Get the quote for a priority.
Definition quotes.hpp:89
quotes_t & set(priority_t prio, std::size_t quote)
Set a new quote for a priority.
Definition quotes.hpp:76
std::size_t m_quotes[so_5::prio::total_priorities_count]
Quotes for every priority.
Definition quotes.hpp:96
static void ensure_quote_not_zero(std::size_t value)
Definition quotes.hpp:99
so_5::stats::activity_tracking_stuff::stats_collector_t< so_5::stats::activity_tracking_stuff::internal_lock > m_working_stats
Statictics for work activity.
so_5::stats::activity_tracking_stuff::stats_collector_t< so_5::stats::activity_tracking_stuff::internal_lock > m_waiting_stats
Statictics for wait activity.
A working thread for dispatcher with one common working thread and support of demands priority.
auto pop_demand() -> decltype(std::declval< Demand_Queue >().pop())
work_thread_template_t(work_thread_holder_t thread_holder, Demand_Queue &queue)
Initializing constructor.
friend void swap(work_thread_activity_tracking_flag_mixin_t &a, work_thread_activity_tracking_flag_mixin_t &b) noexcept
Mixin that holds optional work thread factory.
friend void swap(work_thread_factory_mixin_t &a, work_thread_factory_mixin_t &b) noexcept
An analog of unique_ptr for abstract_work_thread.
work_thread_holder_t(work_thread_holder_t &&o) noexcept
Interface for dispatcher binders.
SObjectizer Environment.
stats::repository_t & stats_repository()
Access to repository of data sources for run-time monitoring.
An interface of event queue for agent.
Helper class for indication of long-lived reference via its type.
Definition outliving.hpp:98
T & get() const noexcept
Base for the case of internal stats lock.
A holder for data-souce that should be automatically registered and deregistered in registry.
A type for storing prefix of data_source name.
Definition prefix.hpp:32
constexpr const char * c_str() const noexcept
Access to prefix value.
Definition prefix.hpp:80
prefix_t(const std::string &value) noexcept(noexcept(value.c_str()))
Initializing constructor.
Definition prefix.hpp:73
An interface of data source.
#define SO_5_FUNC
Definition declspec.hpp:48
#define SO_5_THROW_EXCEPTION(error_code, desc)
Definition exception.hpp:74
Some reusable and low-level classes/functions which can be used in public header files.
Various stuff related to MPSC event queue implementation and tuning.
void send_thread_activity_stats(const so_5::mbox_t &, const stats::prefix_t &, so_5::disp::prio_one_thread::reuse::work_thread_no_activity_tracking_t< demand_queue_t > &)
Implementation details for dispatcher with round-robin policy of handling prioritized events.
Dispatcher which handles events of different priorities in round-robin maner.
dispatcher_handle_t make_dispatcher(environment_t &env, const quotes_t &quotes)
Create an instance of quoted_round_robin dispatcher.
dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, const quotes_t &quotes)
Create an instance of quoted_round_robin dispatcher.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, const quotes_t &quotes, disp_params_t params)
Create an instance of quoted_round_robin dispatcher.
Reusable code for dispatchers with one working thread for events of all priorities.
Dispatcher with one working thread for events of all priorities.
Reusable components for dispatchers.
work_thread_holder_t acquire_work_thread(const work_thread_factory_mixin_t< Params > &params, environment_t &env)
Helper function for acquiring a new worker thread from an appropriate work thread factory.
so_5::stats::prefix_t make_disp_prefix(const std::string_view disp_type, const std::string_view data_sources_name_base, const void *disp_this_pointer)
Create basic prefix for dispatcher data source names.
std::unique_ptr< Disp_Iface_Type > make_actual_dispatcher(outliving_reference_t< environment_t > env, const std::string_view name_base, Disp_Params_Type disp_params, Args &&...args)
Helper function for creation of dispatcher instance with respect to work thread activity tracking fla...
Event dispatchers.
Details of SObjectizer run-time implementations.
Definition agent.cpp:780
void ensure_join_from_different_thread(current_thread_id_t thread_to_be_joined)
Ensures that join will be called from different thread.
Helpers for working with priorities.
Definition priority.hpp:73
void for_each_priority(Lambda l)
Does enumeration of all priorities.
Definition priority.hpp:193
const unsigned int total_priorities_count
Total count of priorities.
Definition priority.hpp:105
Declarations of messages used by run-time monitoring and statistics.
Definition messages.hpp:36
Predefined suffixes of data-sources.
Definition std_names.hpp:55
SO_5_FUNC suffix_t agent_count()
Suffix for data source with count of agents bound to some entity.
Definition std_names.cpp:66
SO_5_FUNC suffix_t work_thread_queue_size()
Suffix for data source with count of demands in a working thread event queue.
Definition std_names.cpp:78
SO_5_FUNC suffix_t work_thread_activity()
Suffix for data source with work thread activity statistics.
Definition std_names.cpp:84
SO_5_FUNC suffix_t demand_quote()
Suffix for data source with size of quote for demands processing.
All stuff related to run-time monitoring and statistics.
Private part of message limit implementation.
Definition agent.cpp:33
std::size_t to_size_t(priority_t priority)
Helper function for conversion from priority to size_t.
Definition priority.hpp:48
priority_t
Definition of supported priorities.
Definition priority.hpp:28
const int rc_priority_quote_illegal_value
Illegal value of quote for a priority.
Definition ret_code.hpp:174
void send(Target &&to, Args &&... args)
A utility function for creating and delivering a message or a signal.
current_thread_id_t query_current_thread_id()
Get the ID of the current thread.
outliving_reference_t< T > outliving_mutable(T &r)
Make outliving_reference wrapper for mutable reference.
A description of event execution demand.
void call_handler(current_thread_id_t thread_id)
Helper method to simplify demand execution.
A message with value of some quantity.
Definition messages.hpp:60
Information about one work thread activity.
Definition messages.hpp:108
activity_stats_t m_working_stats
Stats for processed events.
activity_stats_t m_waiting_stats
Stats for waiting periods.