SObjectizer 5.8
Loading...
Searching...
No Matches
thread_pool/pub.hpp
Go to the documentation of this file.
1/*
2 * SObjectizer-5
3 */
4
5/*!
6 * \file
7 * \brief Public interface of thread pool dispatcher.
8 *
9 * \since v.5.4.0
10 */
11
12#pragma once
13
14#include <so_5/declspec.hpp>
15
16#include <so_5/disp_binder.hpp>
17
18#include <so_5/disp/mpmc_queue_traits/pub.hpp>
19
20#include <so_5/disp/reuse/work_thread_activity_tracking.hpp>
21#include <so_5/disp/reuse/work_thread_factory_params.hpp>
22#include <so_5/disp/reuse/default_thread_pool_size.hpp>
23
24#include <string_view>
25#include <thread>
26#include <utility>
27
28namespace so_5
29{
30
31namespace disp
32{
33
34namespace thread_pool
35{
36
37/*!
38 * \brief Alias for namespace with traits of event queue.
39 *
40 * \since v.5.5.11
41 */
42namespace queue_traits = so_5::disp::mpmc_queue_traits;
43
44//
45// disp_params_t
46//
47/*!
48 * \brief Parameters for %thread_pool dispatcher.
49 *
50 * \since v.5.5.11
51 */
55 {
56 using activity_tracking_mixin_t = so_5::disp::reuse::
58 using thread_factory_mixin_t = so_5::disp::reuse::
60
61 public :
62 //! Default constructor.
63 disp_params_t() = default;
64
65 friend inline void
67 disp_params_t & a, disp_params_t & b ) noexcept
68 {
69 using std::swap;
70
71 swap(
72 static_cast< activity_tracking_mixin_t & >(a),
73 static_cast< activity_tracking_mixin_t & >(b) );
74
75 swap(
76 static_cast< work_thread_factory_mixin_t & >(a),
77 static_cast< work_thread_factory_mixin_t & >(b) );
78
81 }
82
83 //! Setter for thread count.
85 thread_count( std::size_t count )
86 {
87 m_thread_count = count;
88 return *this;
89 }
90
91 //! Getter for thread count.
92 std::size_t
94 {
95 return m_thread_count;
96 }
97
98 //! Setter for queue parameters.
101 {
102 m_queue_params = std::move(p);
103 return *this;
104 }
105
106 //! Tuner for queue parameters.
107 /*!
108 * Accepts lambda-function or functional object which tunes
109 * queue parameters.
110 \code
111 using namespace so_5::disp::thread_pool;
112 auto disp = make_dispatcher( env,
113 "workers_disp",
114 disp_params_t{}
115 .thread_count( 10 )
116 .tune_queue_params(
117 []( queue_traits::queue_params_t & p ) {
118 p.lock_factory( queue_traits::simple_lock_factory() );
119 } ) );
120 \endcode
121 */
122 template< typename L >
125 {
127 return *this;
128 }
129
130 //! Getter for queue parameters.
131 const queue_traits::queue_params_t &
133 {
134 return m_queue_params;
135 }
136
137 private :
138 //! Count of working threads.
139 /*!
140 * Value 0 means that actual thread will be detected automatically.
141 */
142 std::size_t m_thread_count = { 0 };
143 //! Queue parameters.
145 };
146
147//
148// fifo_t
149//
150/*!
151 * \brief Type of FIFO mechanism for agent's demands.
152 *
153 * \since v.5.4.0
154 */
155enum class fifo_t
156 {
157 //! A FIFO for demands for all agents from the same cooperation.
158 /*!
159 * It means that agents from the same cooperation for which this
160 * FIFO mechanism is used will be worked on the same thread.
161 *
162 * If the same disp_binder with fifo_t::cooperation is used for
163 * several cooperations then each coop will have a separate
164 * event queue (thus agents from different coops may work on
165 * different worker threads).
166 */
168 //! A FIFO for demands only for one agent.
169 /*!
170 * It means that FIFO is only supported for the concrete agent.
171 * If several agents from a cooperation have this FIFO type they
172 * will process demands independently and on different threads.
173 */
175 };
176
177//
178// bind_params_t
179//
180/*!
181 * \brief Parameters for binding agents to %thread_pool dispatcher.
182 *
183 * \since v.5.5.11
184 */
186 {
187 public :
188 //! Set FIFO type.
191 {
192 m_fifo = v;
193 return *this;
194 }
195
196 //! Get FIFO type.
197 [[nodiscard]]
198 fifo_t
200 {
201 return m_fifo;
202 }
203
204 //! Set maximum count of demands to be processed at once.
206 max_demands_at_once( std::size_t v )
207 {
209 return *this;
210 }
211
212 //! Get maximum count of demands to do processed at once.
213 [[nodiscard]]
214 std::size_t
216 {
218 }
219
220 private :
221 //! FIFO type.
223
224 //! Maximum count of demands to be processed at once.
225 std::size_t m_max_demands_at_once = { 4 };
226 };
227
228//
229// default_thread_pool_size
230//
231using so_5::disp::reuse::default_thread_pool_size;
232
233namespace impl {
234
236
237//
238// basic_dispatcher_iface_t
239//
240/*!
241 * \brief The very basic interface of %thread_pool dispatcher.
242 *
243 * This class contains a minimum that is necessary for implementation
244 * of dispatcher_handle class.
245 *
246 * \since v.5.6.0
247 */
249 : public std::enable_shared_from_this<actual_dispatcher_iface_t>
250 {
251 public :
252 virtual ~basic_dispatcher_iface_t() noexcept = default;
253
254 [[nodiscard]]
255 virtual disp_binder_shptr_t
256 binder( bind_params_t params ) = 0;
257 };
258
259using basic_dispatcher_iface_shptr_t =
260 std::shared_ptr< basic_dispatcher_iface_t >;
261
263
264} /* namespace impl */
265
266//
267// dispatcher_handle_t
268//
269
270/*!
271 * \brief A handle for %thread_pool dispatcher.
272 *
273 * \since v.5.6.0
274 */
275class [[nodiscard]] dispatcher_handle_t
276 {
278
279 //! A reference to actual implementation of a dispatcher.
280 impl::basic_dispatcher_iface_shptr_t m_dispatcher;
281
283 impl::basic_dispatcher_iface_shptr_t dispatcher ) noexcept
284 : m_dispatcher{ std::move(dispatcher) }
285 {}
286
287 //! Is this handle empty?
288 bool
289 empty() const noexcept { return !m_dispatcher; }
290
291 public :
292 dispatcher_handle_t() noexcept = default;
293
294 //! Get a binder for that dispatcher.
295 /*!
296 * Usage example:
297 * \code
298 * using namespace so_5::disp::thread_pool;
299 *
300 * so_5::environment_t & env = ...;
301 * auto disp = make_dispatcher( env );
302 * bind_params_t params;
303 * params.fifo( fifo_t::individual );
304 *
305 * env.introduce_coop( [&]( so_5::coop_t & coop ) {
306 * coop.make_agent_with_binder< some_agent_type >(
307 * disp.binder( params ),
308 * ... );
309 *
310 * coop.make_agent_with_binder< another_agent_type >(
311 * disp.binder( params ),
312 * ... );
313 *
314 * ...
315 * } );
316 * \endcode
317 *
318 * \attention
319 * An attempt to call this method on empty handle is UB.
320 */
321 [[nodiscard]]
322 disp_binder_shptr_t
324 bind_params_t params ) const
325 {
326 return m_dispatcher->binder( params );
327 }
328
329 //! Create a binder for that dispatcher.
330 /*!
331 * This method allows parameters tuning via lambda-function
332 * or other functional objects.
333 *
334 * Usage example:
335 * \code
336 * using namespace so_5::disp::thread_pool;
337 *
338 * so_5::environment_t & env = ...;
339 * env.introduce_coop( [&]( so_5::coop_t & coop ) {
340 * coop.make_agent_with_binder< some_agent_type >(
341 * // Create dispatcher instance.
342 * make_dispatcher( env )
343 * // Make and tune binder for that dispatcher.
344 * .binder( []( auto & params ) {
345 * params.fifo( fifo_t::individual );
346 * } ),
347 * ... );
348 * \endcode
349 *
350 * \attention
351 * An attempt to call this method on empty handle is UB.
352 */
353 template< typename Setter >
354 [[nodiscard]]
355 std::enable_if_t<
356 std::is_invocable_v< Setter, bind_params_t& >,
357 disp_binder_shptr_t >
359 //! Function for the parameters tuning.
360 Setter && params_setter ) const
361 {
363 params_setter( p );
364
365 return this->binder( p );
366 }
367
368 //! Get a binder for that dispatcher with default binding params.
369 /*!
370 * \attention
371 * An attempt to call this method on empty handle is UB.
372 */
373 [[nodiscard]]
374 disp_binder_shptr_t
375 binder() const
376 {
377 return this->binder( bind_params_t{} );
378 }
379
380 //! Is this handle empty?
381 operator bool() const noexcept { return empty(); }
382
383 //! Does this handle contain a reference to dispatcher?
384 bool
385 operator!() const noexcept { return !empty(); }
386
387 //! Drop the content of handle.
388 void
389 reset() noexcept { m_dispatcher.reset(); }
390 };
391
392//
393// make_dispatcher
394//
395/*!
396 * \brief Create an instance %thread_pool dispatcher.
397 *
398 * \par Usage sample
399\code
400using namespace so_5::disp::thread_pool;
401auto disp = make_dispatcher(
402 env,
403 "db_workers_pool",
404 disp_params_t{}
405 .thread_count( 16 )
406 .tune_queue_params( []( queue_traits::queue_params_t & params ) {
407 params.lock_factory( queue_traits::simple_lock_factory() );
408 } ) );
409auto coop = env.make_coop(
410 // The main dispatcher for that coop will be
411 // this instance of thread_pool dispatcher.
412 disp.binder() );
413\endcode
414 *
415 * \since v.5.6.0
416 */
417[[nodiscard]]
420 //! SObjectizer Environment to work in.
421 environment_t & env,
422 //! Value for creating names of data sources for
423 //! run-time monitoring.
424 const std::string_view data_sources_name_base,
425 //! Parameters for the dispatcher.
426 disp_params_t disp_params );
427
428//
429// make_dispatcher
430//
431/*!
432 * \brief Create an instance of %thread_pool dispatcher.
433 *
434 * \par Usage sample
435\code
436auto disp = so_5::disp::thread_pool::make_dispatcher(
437 env,
438 "db_workers_pool",
439 16 );
440auto coop = env.make_coop(
441 // The main dispatcher for that coop will be
442 // this instance of thread_pool dispatcher.
443 disp.binder() );
444\endcode
445 *
446 * \since v.5.6.0
447 */
448[[nodiscard]]
451 //! SObjectizer Environment to work in.
452 environment_t & env,
453 //! Value for creating names of data sources for
454 //! run-time monitoring.
455 const std::string_view data_sources_name_base,
456 //! Count of working threads.
457 std::size_t thread_count )
458 {
459 return make_dispatcher(
460 env,
461 data_sources_name_base,
462 disp_params_t{}.thread_count( thread_count ) );
463 }
464
465/*!
466 * \brief Create an instance of %thread_pool dispatcher.
467 *
468 * \par Usage sample
469\code
470auto disp = so_5::disp::thread_pool::make_dispatcher( env, 16 );
471
472auto coop = env.make_coop(
473 // The main dispatcher for that coop will be
474 // this instance of thread_pool dispatcher.
475 disp.binder() );
476\endcode
477 *
478 * \since v.5.6.0
479 */
480[[nodiscard]]
483 //! SObjectizer Environment to work in.
484 environment_t & env,
485 //! Count of working threads.
486 std::size_t thread_count )
487 {
488 return make_dispatcher( env, std::string_view{}, thread_count );
489 }
490
491//
492// make_dispatcher
493//
494/*!
495 * \brief Create an instance of %thread_pool dispatcher with the default
496 * count of working threads.
497 *
498 * Count of work threads will be detected by default_thread_pool_size()
499 * function.
500 *
501 * \par Usage sample
502\code
503auto disp = so_5::disp::thread_pool::make_instance( env );
504
505auto coop = env.make_coop(
506 // The main dispatcher for that coop will be
507 // this instance of thread_pool dispatcher.
508 disp.binder() );
509\endcode
510 *
511 * \since v.5.6.0
512 */
513[[nodiscard]]
516 //! SObjectizer Environment to work in.
517 environment_t & env )
518 {
519 return make_dispatcher(
520 env,
521 std::string_view{},
523 }
524
525} /* namespace thread_pool */
526
527} /* namespace disp */
528
529} /* namespace so_5 */
A base class for agents.
Definition agent.hpp:673
void so_bind_to_dispatcher(event_queue_t &queue) noexcept
Binding agent to the dispatcher.
Definition agent.cpp:872
Container for storing parameters for MPMC queue.
queue_params_t & operator=(queue_params_t &&o) noexcept
Move operator.
friend void swap(queue_params_t &a, queue_params_t &b) noexcept
friend void swap(work_thread_activity_tracking_flag_mixin_t &a, work_thread_activity_tracking_flag_mixin_t &b) noexcept
Mixin that holds optional work thread factory.
friend void swap(work_thread_factory_mixin_t &a, work_thread_factory_mixin_t &b) noexcept
Parameters for binding agents to thread_pool dispatcher.
std::size_t m_max_demands_at_once
Maximum count of demands to be processed at once.
bind_params_t & max_demands_at_once(std::size_t v)
Set maximum count of demands to be processed at once.
fifo_t query_fifo() const
Get FIFO type.
bind_params_t & fifo(fifo_t v)
Set FIFO type.
std::size_t query_max_demands_at_once() const
Get maximum count of demands to do processed at once.
Alias for namespace with traits of event queue.
queue_traits::queue_params_t m_queue_params
Queue parameters.
disp_params_t & thread_count(std::size_t count)
Setter for thread count.
disp_params_t()=default
Default constructor.
disp_params_t & tune_queue_params(L tunner)
Tuner for queue parameters.
const queue_traits::queue_params_t & queue_params() const
Getter for queue parameters.
disp_params_t & set_queue_params(queue_traits::queue_params_t p)
Setter for queue parameters.
friend void swap(disp_params_t &a, disp_params_t &b) noexcept
std::size_t m_thread_count
Count of working threads.
std::size_t thread_count() const
Getter for thread count.
A handle for thread_pool dispatcher.
void reset() noexcept
Drop the content of handle.
disp_binder_shptr_t binder() const
Get a binder for that dispatcher with default binding params.
bool empty() const noexcept
Is this handle empty?
bool operator!() const noexcept
Does this handle contain a reference to dispatcher?
impl::basic_dispatcher_iface_shptr_t m_dispatcher
A reference to actual implementation of a dispatcher.
operator bool() const noexcept
Is this handle empty?
disp_binder_shptr_t binder(bind_params_t params) const
Get a binder for that dispatcher.
dispatcher_handle_t(impl::basic_dispatcher_iface_shptr_t dispatcher) noexcept
std::enable_if_t< std::is_invocable_v< Setter, bind_params_t & >, disp_binder_shptr_t > binder(Setter &&params_setter) const
Create a binder for that dispatcher.
void unbind(agent_t &agent) noexcept override
Unbind agent from dispatcher.
void undo_preallocation(agent_t &agent) noexcept override
Undo resources allocation.
void bind(agent_t &agent) noexcept override
Bind agent to dispatcher.
void preallocate_resources(agent_t &agent) override
Allocate resources in dispatcher for new agent.
const bind_params_t m_params
Binding parameters.
actual_binder_t(actual_dispatcher_iface_shptr_t disp, bind_params_t params) noexcept
actual_dispatcher_iface_shptr_t m_disp
Dispatcher to be used.
An actual interface of thread-pool dispatcher.
virtual void undo_preallocation_for_agent(agent_t &agent) noexcept=0
Undo preallocation of resources for a new agent.
virtual event_queue_t * query_resources_for_agent(agent_t &agent) noexcept=0
Get resources allocated for an agent.
virtual void preallocate_resources_for_agent(agent_t &agent, const bind_params_t &params)=0
Preallocate all necessary resources for a new agent.
virtual void unbind_agent(agent_t &agent) noexcept=0
Unbind agent from the dispatcher.
void undo_preallocation_for_agent(agent_t &agent) noexcept override
Undo preallocation of resources for a new agent.
dispatcher_template_t< Work_Thread > m_impl
Real dispatcher.
disp_binder_shptr_t binder(bind_params_t params) override
void preallocate_resources_for_agent(agent_t &agent, const bind_params_t &params) override
Preallocate all necessary resources for a new agent.
void unbind_agent(agent_t &agent) noexcept override
Unbind agent from the dispatcher.
actual_dispatcher_implementation_t(outliving_reference_t< environment_t > env, const std::string_view name_base, disp_params_t params)
event_queue_t * query_resources_for_agent(agent_t &agent) noexcept override
Get resources allocated for an agent.
The very basic interface of thread_pool dispatcher.
virtual disp_binder_shptr_t binder(bind_params_t params)=0
static dispatcher_handle_t make(actual_dispatcher_iface_shptr_t disp) noexcept
Interface for dispatcher binders.
SObjectizer Environment.
An interface of event queue for agent.
Helper class for indication of long-lived reference via its type.
Definition outliving.hpp:98
T & get() const noexcept
#define SO_5_FUNC
Definition declspec.hpp:48
Various stuff related to MPMC event queue implementation and tuning.
Reusable components for dispatchers.
std::size_t default_thread_pool_size()
A helper function for detecting default thread count for thread pool.
std::unique_ptr< Disp_Iface_Type > make_actual_dispatcher(outliving_reference_t< environment_t > env, const std::string_view name_base, Disp_Params_Type disp_params, Args &&...args)
Helper function for creation of dispatcher instance with respect to work thread activity tracking fla...
void adjust_thread_count(disp_params_t &params)
Sets the thread count to default value if used do not specify actual thread count.
Internal implementation details of thread pool dispatcher.
Thread pool dispatcher.
fifo_t
Type of FIFO mechanism for agent's demands.
@ cooperation
A FIFO for demands for all agents from the same cooperation.
@ individual
A FIFO for demands only for one agent.
dispatcher_handle_t make_dispatcher(environment_t &env, std::size_t thread_count)
Create an instance of thread_pool dispatcher.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance thread_pool dispatcher.
dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, std::size_t thread_count)
Create an instance of thread_pool dispatcher.
dispatcher_handle_t make_dispatcher(environment_t &env)
Create an instance of thread_pool dispatcher with the default count of working threads.
Event dispatchers.
Private part of message limit implementation.
Definition agent.cpp:33
outliving_reference_t< T > outliving_mutable(T &r)
Make outliving_reference wrapper for mutable reference.