SObjectizer 5.8
Loading...
Searching...
No Matches
prio_dedicated_threads/one_per_prio/pub.cpp
Go to the documentation of this file.
1/*
2 SObjectizer 5.
3*/
4
5/*!
6 * \file
7 * \brief Functions for creating and binding of the dispatcher with
8 * dedicated threads per priority.
9 *
10 * \since
11 * v.5.5.8
12 */
13
14#include <so_5/disp/prio_dedicated_threads/one_per_prio/pub.hpp>
15
16#include <so_5/disp/reuse/work_thread/work_thread.hpp>
17
18#include <so_5/disp/reuse/actual_work_thread_factory_to_use.hpp>
19#include <so_5/disp/reuse/data_source_prefix_helpers.hpp>
20#include <so_5/disp/reuse/make_actual_dispatcher.hpp>
21
22#include <so_5/stats/repository.hpp>
23#include <so_5/stats/messages.hpp>
24#include <so_5/stats/std_names.hpp>
25
26#include <so_5/send_functions.hpp>
27
28#include <so_5/details/invoke_noexcept_code.hpp>
29
30#include <algorithm>
31
32namespace so_5 {
33
34namespace disp {
35
37
38namespace one_per_prio {
39
40namespace impl {
41
42namespace stats = so_5::stats;
43
44namespace {
45
46void
48 const so_5::mbox_t &,
49 const stats::prefix_t &,
50 so_5::disp::reuse::work_thread::work_thread_no_activity_tracking_t & )
51 {
52 /* Nothing to do */
53 }
54
55void
57 const so_5::mbox_t & mbox,
58 const stats::prefix_t & prefix,
59 so_5::disp::reuse::work_thread::work_thread_with_activity_tracking_t & wt )
60 {
62 mbox,
63 prefix,
67 }
68
69} /* namespace anonymous */
70
71//
72// dispatcher_template_t
73//
74/*!
75 * \brief An actual implementation of dispatcher with dedicated thread
76 * for every priority in form of a template class.
77 *
78 * \since
79 * v.5.5.8, v.5.5.18, v.5.6.0
80 */
81template< typename Work_Thread >
82class dispatcher_template_t final : public disp_binder_t
83 {
84 public:
87 const std::string_view name_base,
88 disp_params_t params )
91 name_base,
92 outliving_mutable(*this)
93 }
94 {
97 }
98
99 ~dispatcher_template_t() noexcept override
100 {
101 for( auto & t : m_threads )
102 t->shutdown();
103
104 for( auto & t : m_threads )
105 t->wait();
106 }
107
108 void
110 agent_t & /*agent*/ ) override
111 {
112 // Nothing to do.
113 }
114
115 void
117 agent_t & /*agent*/ ) noexcept override
118 {
119 // Nothing to do.
120 }
121
122 void
124 agent_t & agent ) noexcept override
125 {
126 const auto priority = agent.so_priority();
127
129 *(m_threads[ to_size_t(priority) ]->get_agent_binding()) );
130
131 m_agents_per_priority[ to_size_t(priority) ] += 1;
132 }
133
134 void
136 agent_t & agent ) noexcept override
137 {
138 const auto priority = agent.so_priority();
139
140 m_agents_per_priority[ to_size_t(priority) ] -= 1;
141 }
142
143 private:
144 friend class disp_data_source_t;
145
146 /*!
147 * \brief Data source for run-time monitoring of whole dispatcher.
148 *
149 * \since
150 * v.5.5.8
151 */
152 class disp_data_source_t final : public stats::source_t
153 {
154 //! Dispatcher to work with.
155 outliving_reference_t< dispatcher_template_t > m_dispatcher;
156
157 //! Basic prefix for data sources.
159
160 public :
162 const std::string_view name_base,
163 outliving_reference_t< dispatcher_template_t > disp )
164 : m_dispatcher{ disp }
166 "pdt-opp",
167 name_base,
168 &(disp.get()) )
169 }
170 {}
171
172 void
173 distribute( const mbox_t & mbox ) override
174 {
175 std::size_t agents_count = 0;
176
177 auto & disp = m_dispatcher.get();
178
179 so_5::prio::for_each_priority( [&]( priority_t p ) {
180 auto agents = disp.m_agents_per_priority[
181 to_size_t(p) ].load( std::memory_order_acquire );
182
183 agents_count += agents;
184
186 mbox,
187 p,
188 agents,
189 *(disp.m_threads[ to_size_t(p) ]) );
190 } );
191
193 mbox,
196 agents_count );
197 }
198
199 private:
200 void
202 const mbox_t & mbox,
203 priority_t priority,
204 std::size_t agents_count,
205 Work_Thread & wt )
206 {
207 std::ostringstream ss;
208 ss << m_base_prefix.c_str() << "/wt-p" << to_size_t(priority);
209
210 const stats::prefix_t prefix{ ss.str() };
211
212 so_5::send< stats::messages::quantity< std::size_t > >(
213 mbox,
214 prefix,
216 wt.demands_count() );
217
219 mbox,
220 prefix,
222 agents_count );
223
224 send_thread_activity_stats( mbox, prefix, wt );
225 }
226 };
227
228 //! Data source for run-time monitoring.
229 stats::auto_registered_source_holder_t< disp_data_source_t >
231
232 //! Working threads for every priority.
233 std::vector< std::unique_ptr< Work_Thread > > m_threads;
234
235 //! Counters for agent count for every priority.
236 std::array< std::atomic< std::size_t >, so_5::prio::total_priorities_count >
238
239 //! Allocate work threads for dispatcher.
240 void
242 environment_t & env,
243 const disp_params_t & params )
244 {
246 so_5::prio::for_each_priority( [&]( so_5::priority_t ) {
247 auto lock_factory = params.queue_params().lock_factory();
248
249 auto t = std::make_unique< Work_Thread >(
250 acquire_work_thread( params, env ),
251 std::move(lock_factory) );
252
253 m_threads.push_back( std::move(t) );
254 } );
255 }
256
257 //! Start all working threads.
258 void
260 {
261 using namespace std;
262 using namespace so_5::details;
263 using namespace so_5::prio;
264
265 // This helper vector will be used for shutdown of
266 // started threads in the case of an exception.
267 std::array< Work_Thread *, total_priorities_count > started_threads;
268 // Initially all items must be NULL.
269 fill( begin(started_threads), end(started_threads), nullptr );
270
271 do_with_rollback_on_exception( [&] {
272 for( std::size_t i = 0; i != total_priorities_count; ++i )
273 {
274 m_agents_per_priority[ i ].store( 0,
275 std::memory_order_release );
276
277 m_threads[ i ]->start();
278
279 // Thread successfully started. Pointer to it
280 // must be used on rollback.
281 started_threads[ i ] = m_threads[ i ].get();
282 }
283 },
284 [&] {
285 invoke_noexcept_code( [&] {
286 // Shutdown all started threads.
287 for( auto t : started_threads )
288 if( t )
289 {
290 t->shutdown();
291 t->wait();
292 }
293 else
294 // No more started threads.
295 break;
296 } );
297 } );
298 }
299 };
300
301//
302// dispatcher_handle_maker_t
303//
305 {
306 public :
308 make( disp_binder_shptr_t binder ) noexcept
309 {
310 return { std::move( binder ) };
311 }
312 };
313
314} /* namespace impl */
315
316//
317// make_dispatcher
318//
321 environment_t & env,
322 const std::string_view data_sources_name_base,
323 disp_params_t params )
324 {
325 using namespace so_5::disp::reuse;
326
327 using dispatcher_no_activity_tracking_t =
328 impl::dispatcher_template_t<
329 work_thread::work_thread_no_activity_tracking_t >;
330
331 using dispatcher_with_activity_tracking_t =
332 impl::dispatcher_template_t<
333 work_thread::work_thread_with_activity_tracking_t >;
334
335 disp_binder_shptr_t binder = so_5::disp::reuse::make_actual_dispatcher<
337 dispatcher_no_activity_tracking_t,
338 dispatcher_with_activity_tracking_t >(
340 data_sources_name_base,
341 std::move(params) );
342
343 return impl::dispatcher_handle_maker_t::make( std::move(binder) );
344 }
345
346} /* namespace one_per_prio */
347
348} /* namespace prio_dedicated_threads */
349
350} /* namespace disp */
351
352} /* namespace so_5 */
A base class for agents.
Definition agent.hpp:673
priority_t so_priority() const noexcept
Get the priority of the agent.
Definition agent.hpp:2555
void so_bind_to_dispatcher(event_queue_t &queue) noexcept
Binding agent to the dispatcher.
Definition agent.cpp:872
const lock_factory_t & lock_factory() const
Getter for lock factory.
const queue_traits::queue_params_t & queue_params() const
Getter for queue parameters.
void distribute(const mbox_t &mbox) override
Send appropriate notification about the current value.
disp_data_source_t(const std::string_view name_base, outliving_reference_t< dispatcher_template_t > disp)
void distribute_value_for_work_thread(const mbox_t &mbox, priority_t priority, std::size_t agents_count, Work_Thread &wt)
stats::auto_registered_source_holder_t< disp_data_source_t > m_data_source
Data source for run-time monitoring.
std::vector< std::unique_ptr< Work_Thread > > m_threads
Working threads for every priority.
dispatcher_template_t(outliving_reference_t< environment_t > env, const std::string_view name_base, disp_params_t params)
std::array< std::atomic< std::size_t >, so_5::prio::total_priorities_count > m_agents_per_priority
Counters for agent count for every priority.
void allocate_work_threads(environment_t &env, const disp_params_t &params)
Allocate work threads for dispatcher.
void preallocate_resources(agent_t &) override
Allocate resources in dispatcher for new agent.
so_5::stats::work_thread_activity_stats_t take_activity_stats()
Get the activity stats.
so_5::current_thread_id_t thread_id() const
Get ID of work thread.
Interface for dispatcher binders.
SObjectizer Environment.
stats::repository_t & stats_repository()
Access to repository of data sources for run-time monitoring.
Helper class for indication of long-lived reference via its type.
Definition outliving.hpp:98
T & get() const noexcept
A holder for data-souce that should be automatically registered and deregistered in registry.
A type for storing prefix of data_source name.
Definition prefix.hpp:32
constexpr const char * c_str() const noexcept
Access to prefix value.
Definition prefix.hpp:80
prefix_t(const std::string &value) noexcept(noexcept(value.c_str()))
Initializing constructor.
Definition prefix.hpp:73
An interface of data source.
#define SO_5_FUNC
Definition declspec.hpp:48
Some reusable and low-level classes/functions which can be used in public header files.
void send_thread_activity_stats(const so_5::mbox_t &, const stats::prefix_t &, work_thread::work_thread_no_activity_tracking_t &)
void send_thread_activity_stats(const so_5::mbox_t &mbox, const stats::prefix_t &prefix, work_thread::work_thread_with_activity_tracking_t &wt)
Implementation details for dispatcher with one thread per priority.
Dispatcher which creates exactly one thread per priority.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance of one_per_prio dispatcher.
Dispatchers with dedicated threads for every priority.
Implemetation details of dispatcher's working thread.
Reusable components for dispatchers.
work_thread_holder_t acquire_work_thread(const work_thread_factory_mixin_t< Params > &params, environment_t &env)
Helper function for acquiring a new worker thread from an appropriate work thread factory.
so_5::stats::prefix_t make_disp_prefix(const std::string_view disp_type, const std::string_view data_sources_name_base, const void *disp_this_pointer)
Create basic prefix for dispatcher data source names.
std::unique_ptr< Disp_Iface_Type > make_actual_dispatcher(outliving_reference_t< environment_t > env, const std::string_view name_base, Disp_Params_Type disp_params, Args &&...args)
Helper function for creation of dispatcher instance with respect to work thread activity tracking fla...
Event dispatchers.
Helpers for working with priorities.
Definition priority.hpp:73
const unsigned int total_priorities_count
Total count of priorities.
Definition priority.hpp:105
Declarations of messages used by run-time monitoring and statistics.
Definition messages.hpp:36
Predefined suffixes of data-sources.
Definition std_names.hpp:55
SO_5_FUNC suffix_t agent_count()
Suffix for data source with count of agents bound to some entity.
Definition std_names.cpp:66
SO_5_FUNC suffix_t work_thread_queue_size()
Suffix for data source with count of demands in a working thread event queue.
Definition std_names.cpp:78
SO_5_FUNC suffix_t work_thread_activity()
Suffix for data source with work thread activity statistics.
Definition std_names.cpp:84
All stuff related to run-time monitoring and statistics.
Private part of message limit implementation.
Definition agent.cpp:33
std::size_t to_size_t(priority_t priority)
Helper function for conversion from priority to size_t.
Definition priority.hpp:48
priority_t
Definition of supported priorities.
Definition priority.hpp:28
void send(Target &&to, Args &&... args)
A utility function for creating and delivering a message or a signal.
outliving_reference_t< T > outliving_mutable(T &r)
Make outliving_reference wrapper for mutable reference.
A message with value of some quantity.
Definition messages.hpp:60
Information about one work thread activity.
Definition messages.hpp:108