SObjectizer 5.8
Loading...
Searching...
No Matches
nef_thread_pool/pub.cpp
Go to the documentation of this file.
1/*
2 * SObjectizer-5
3 */
4
5/*!
6 * \file
7 * \brief Public interface of thread pool dispatcher that
8 * provides noexcept guarantee for scheduling evt_finish demand.
9 *
10 * \since v.5.8.0
11 */
12
13#include <so_5/disp/nef_thread_pool/pub.hpp>
14
15#include <so_5/disp/thread_pool/impl/work_thread_template.hpp>
16#include <so_5/disp/thread_pool/impl/basic_event_queue.hpp>
17
18#include <so_5/disp/reuse/make_actual_dispatcher.hpp>
19
20#include <so_5/ret_code.hpp>
21
22#include <so_5/disp_binder.hpp>
23#include <so_5/environment.hpp>
24
25namespace so_5
26{
27
28namespace disp
29{
30
32{
33
34namespace impl
35{
36
37using so_5::disp::thread_pool::impl::work_thread_no_activity_tracking_t;
38using so_5::disp::thread_pool::impl::work_thread_with_activity_tracking_t;
39
40class agent_queue_with_preallocated_finish_demand_t;
41
42//
43// dispatcher_queue_t
44//
45using dispatcher_queue_t = so_5::disp::reuse::queue_of_queues_t<
46 agent_queue_with_preallocated_finish_demand_t >;
47
48//
49// agent_queue_with_preallocated_finish_demand_t
50//
51/*!
52 * \brief Specail implementation of event queue for nef-thread-pool dispatcher.
53 *
54 * An instance of agent_queue_with_preallocated_finish_demand_t creates
55 * a separate demand for evt_finish event in the constructor and uses it
56 * then in push_evt_finish().
57 *
58 * \attention
59 * It's expected that an instance of agent_queue_with_preallocated_finish_demand_t
60 * will be used for just one agent. It guarantees that push_evt_finish()
61 * will be called at most once.
62 */
63class agent_queue_with_preallocated_finish_demand_t final
65 , private so_5::atomic_refcounted_t
66 {
67 friend class so_5::intrusive_ptr_t< agent_queue_with_preallocated_finish_demand_t >;
68
69 //! Short alias for the main base type.
70 using base_type_t = so_5::disp::thread_pool::impl::basic_event_queue_t;
71
72 public:
73 //! Initializing constructor.
75 //! Dispatcher queue to work with.
76 outliving_reference_t< dispatcher_queue_t > disp_queue,
77 //! Parameters for the queue.
78 const bind_params_t & params )
79 : base_type_t{ params.query_max_demands_at_once() }
80 , m_disp_queue{ disp_queue.get() }
81 , m_finish_demand{ std::make_unique< base_type_t::demand_t >() }
82 {}
83
84 /*!
85 * \note
86 * Uses preallocated demand in m_finish_demand. Leaves m_finish_demand
87 * empty after the completion.
88 */
89 void
90 push_evt_finish( execution_demand_t demand ) noexcept override
91 {
92 // It's assumed that m_finish_demand isn't empty.
93 *(m_finish_demand) = std::move(demand);
94
95 // Just delegate the work.
97 }
98
99 /*!
100 * \brief Give away a pointer to the next agent_queue.
101 *
102 * \note
103 * This method is a part of interface required by
104 * so_5::disp::reuse::queue_of_queues_t.
105 *
106 * \since v.5.8.0
107 */
108 [[nodiscard]]
109 agent_queue_with_preallocated_finish_demand_t *
111 {
112 auto * r = m_intrusive_queue_next;
113 m_intrusive_queue_next = nullptr;
114 return r;
115 }
116
117 /*!
118 * \brief Set a pointer to the next agent_queue.
119 *
120 * \note
121 * This method is a part of interface required by
122 * so_5::disp::reuse::queue_of_queues_t.
123 *
124 * \since v.5.8.0
125 */
126 void
128 agent_queue_with_preallocated_finish_demand_t * next ) noexcept
129 {
131 }
132
133 protected:
134 void
135 schedule_on_disp_queue() noexcept override
136 {
138 }
139
140 private :
141 //! Dispatcher queue with that the agent queue has to be used.
142 dispatcher_queue_t & m_disp_queue;
143
144 //! A preallocated demand for evt_finish.
145 /*!
146 * It will be created empty in the agent queue's constructor.
147 * The content will be set for it in push_evt_finish() method.
148 */
149 std::unique_ptr< base_type_t::demand_t > m_finish_demand;
150
151 /*!
152 * \brief The next item in intrusive queue of agent_queues.
153 *
154 * This field is necessary to implement interface required by
155 * so_5::disp::reuse::queue_of_queues_t
156 *
157 * \since v.5.8.0
158 */
159 agent_queue_with_preallocated_finish_demand_t * m_intrusive_queue_next{ nullptr };
160 };
161
162//
163// adaptation_t
164//
165/*!
166 * \brief Adaptation of common implementation of thread-pool-like dispatcher
167 * to the specific of this thread-pool dispatcher.
168 *
169 * \since v.5.5.4
170 */
172 {
173 [[nodiscard]]
174 static constexpr std::string_view
176 {
177 return { "nef_tp" }; // nef_thread_pool.
178 }
179
180 [[nodiscard]]
181 static bool
182 is_individual_fifo( const bind_params_t & /*params*/ ) noexcept
183 {
184 // NOTE: all agents use individual fifo.
185 return true;
186 }
187
188 static void
190 agent_queue_with_preallocated_finish_demand_t & queue ) noexcept
191 {
193 }
194 };
195
196//
197// dispatcher_template_t
198//
199/*!
200 * \brief Template for dispatcher.
201 *
202 * This template depends on work_thread type (with or without activity
203 * tracking).
204 *
205 * \since v.5.5.18
206 */
207template< typename Work_Thread >
208using dispatcher_template_t =
210 Work_Thread,
211 dispatcher_queue_t,
213 adaptation_t >;
214
215//
216// actual_dispatcher_iface_t
217//
218/*!
219 * \brief An actual interface of nef-thread-pool dispatcher.
220 *
221 * This interface defines a set of methods necessary for binder.
222 *
223 * \since v.5.8.0
224 */
226 {
227 public :
228 //! Preallocate all necessary resources for a new agent.
229 virtual void
231 agent_t & agent,
232 const bind_params_t & params ) = 0;
233
234 //! Undo preallocation of resources for a new agent.
235 virtual void
237 agent_t & agent ) noexcept = 0;
238
239 //! Get resources allocated for an agent.
240 [[nodiscard]]
241 virtual event_queue_t *
242 query_resources_for_agent( agent_t & agent ) noexcept = 0;
243
244 //! Unbind agent from the dispatcher.
245 virtual void
246 unbind_agent( agent_t & agent ) noexcept = 0;
247 };
248
249//
250// actual_dispatcher_iface_shptr_t
251//
252using actual_dispatcher_iface_shptr_t =
253 std::shared_ptr< actual_dispatcher_iface_t >;
254
255//
256// actual_binder_t
257//
258/*!
259 * \brief Actual implementation of dispatcher binder for %nef_thread_pool dispatcher.
260 *
261 * \since v.5.8.0
262 */
263class actual_binder_t final : public disp_binder_t
264 {
265 //! Dispatcher to be used.
266 actual_dispatcher_iface_shptr_t m_disp;
267 //! Binding parameters.
269
270 public :
272 actual_dispatcher_iface_shptr_t disp,
273 bind_params_t params ) noexcept
274 : m_disp{ std::move(disp) }
275 , m_params{ params }
276 {}
277
278 void
280 agent_t & agent ) override
281 {
283 }
284
285 void
287 agent_t & agent ) noexcept override
288 {
290 }
291
292 void
294 agent_t & agent ) noexcept override
295 {
296 auto queue = m_disp->query_resources_for_agent( agent );
297 agent.so_bind_to_dispatcher( *queue );
298 }
299
300 void
302 agent_t & agent ) noexcept override
303 {
304 m_disp->unbind_agent( agent );
305 }
306 };
307
308//
309// actual_dispatcher_implementation_t
310//
311/*!
312 * \brief Actual implementation of binder for %nef_thread_pool dispatcher.
313 *
314 * \since v.5.8.0
315 */
316template< typename Work_Thread >
317class actual_dispatcher_implementation_t final
319 {
320 //! Real dispatcher.
321 dispatcher_template_t< Work_Thread > m_impl;
322
323 public :
325 //! SObjectizer Environment to work in.
327 //! Base part of data sources names.
328 const std::string_view name_base,
329 //! Dispatcher's parameters.
330 disp_params_t params )
331 : m_impl{
332 env.get(),
333 params,
334 name_base,
335 params.thread_count(),
336 params.queue_params()
337 }
338 {
339 m_impl.start( env.get() );
340 }
341
343 {
344 m_impl.shutdown_then_wait();
345 }
346
347 [[nodiscard]]
348 disp_binder_shptr_t
349 binder( bind_params_t params ) override
350 {
351 return std::make_shared< actual_binder_t >(
352 this->shared_from_this(),
353 params );
354 }
355
356 void
358 agent_t & agent,
359 const bind_params_t & params ) override
360 {
361 m_impl.preallocate_resources_for_agent( agent, params );
362 }
363
364 void
366 agent_t & agent ) noexcept override
367 {
368 m_impl.undo_preallocation_for_agent( agent );
369 }
370
372 query_resources_for_agent( agent_t & agent ) noexcept override
373 {
374 return m_impl.query_resources_for_agent( agent );
375 }
376
377 void
378 unbind_agent( agent_t & agent ) noexcept override
379 {
380 m_impl.unbind_agent( agent );
381 }
382 };
383
384//
385// dispatcher_handle_maker_t
386//
388 {
389 public :
391 make( actual_dispatcher_iface_shptr_t disp ) noexcept
392 {
393 return { std::move( disp ) };
394 }
395 };
396
397} /* namespace impl */
398
399namespace
400{
401
402using namespace so_5::disp::nef_thread_pool::impl;
403
404/*!
405 * \brief Sets the thread count to default value if used do not
406 * specify actual thread count.
407 *
408 * \since v.5.8.0
409 */
410inline void
416
417} /* namespace anonymous */
418
419//
420// make_dispatcher
421//
424 environment_t & env,
425 const std::string_view data_sources_name_base,
426 disp_params_t params )
427 {
428 using namespace so_5::disp::reuse;
429
430 adjust_thread_count( params );
431
432 using dispatcher_no_activity_tracking_t =
433 impl::actual_dispatcher_implementation_t<
434 impl::work_thread_no_activity_tracking_t<
435 impl::dispatcher_queue_t
436 >
437 >;
438
439 using dispatcher_with_activity_tracking_t =
440 impl::actual_dispatcher_implementation_t<
441 impl::work_thread_with_activity_tracking_t<
442 impl::dispatcher_queue_t
443 >
444 >;
445
448 dispatcher_no_activity_tracking_t,
449 dispatcher_with_activity_tracking_t >(
451 data_sources_name_base,
452 std::move(params) );
453
454 return impl::dispatcher_handle_maker_t::make( std::move(binder) );
455 }
456
457} /* namespace nef_thread_pool */
458
459} /* namespace disp */
460
461} /* namespace so_5 */
A base class for agents.
Definition agent.hpp:673
void so_bind_to_dispatcher(event_queue_t &queue) noexcept
Binding agent to the dispatcher.
Definition agent.cpp:872
The base class for the object with a reference counting.
Parameters for binding agents to nef_thread_pool dispatcher.
std::size_t query_max_demands_at_once() const
Get maximum count of demands to do processed at once.
Alias for namespace with traits of event queue.
const queue_traits::queue_params_t & queue_params() const
Getter for queue parameters.
disp_params_t & thread_count(std::size_t count)
Setter for thread count.
std::size_t thread_count() const
Getter for thread count.
A handle for nef_thread_pool dispatcher.
dispatcher_handle_t(impl::basic_dispatcher_iface_shptr_t dispatcher) noexcept
void bind(agent_t &agent) noexcept override
Bind agent to dispatcher.
actual_binder_t(actual_dispatcher_iface_shptr_t disp, bind_params_t params) noexcept
const bind_params_t m_params
Binding parameters.
actual_dispatcher_iface_shptr_t m_disp
Dispatcher to be used.
void preallocate_resources(agent_t &agent) override
Allocate resources in dispatcher for new agent.
void undo_preallocation(agent_t &agent) noexcept override
Undo resources allocation.
void unbind(agent_t &agent) noexcept override
Unbind agent from dispatcher.
An actual interface of nef-thread-pool dispatcher.
virtual event_queue_t * query_resources_for_agent(agent_t &agent) noexcept=0
Get resources allocated for an agent.
virtual void undo_preallocation_for_agent(agent_t &agent) noexcept=0
Undo preallocation of resources for a new agent.
virtual void unbind_agent(agent_t &agent) noexcept=0
Unbind agent from the dispatcher.
virtual void preallocate_resources_for_agent(agent_t &agent, const bind_params_t &params)=0
Preallocate all necessary resources for a new agent.
void undo_preallocation_for_agent(agent_t &agent) noexcept override
Undo preallocation of resources for a new agent.
event_queue_t * query_resources_for_agent(agent_t &agent) noexcept override
Get resources allocated for an agent.
void preallocate_resources_for_agent(agent_t &agent, const bind_params_t &params) override
Preallocate all necessary resources for a new agent.
actual_dispatcher_implementation_t(outliving_reference_t< environment_t > env, const std::string_view name_base, disp_params_t params)
dispatcher_template_t< Work_Thread > m_impl
Real dispatcher.
void unbind_agent(agent_t &agent) noexcept override
Unbind agent from the dispatcher.
void schedule_on_disp_queue() noexcept override
Perform scheduling of processing of this event queue.
dispatcher_queue_t & m_disp_queue
Dispatcher queue with that the agent queue has to be used.
agent_queue_with_preallocated_finish_demand_t * intrusive_queue_giveout_next() noexcept
Give away a pointer to the next agent_queue.
std::unique_ptr< base_type_t::demand_t > m_finish_demand
A preallocated demand for evt_finish.
agent_queue_with_preallocated_finish_demand_t * m_intrusive_queue_next
The next item in intrusive queue of agent_queues.
agent_queue_with_preallocated_finish_demand_t(outliving_reference_t< dispatcher_queue_t > disp_queue, const bind_params_t &params)
Initializing constructor.
void intrusive_queue_set_next(agent_queue_with_preallocated_finish_demand_t *next) noexcept
Set a pointer to the next agent_queue.
The very basic interface of thread_pool dispatcher.
static dispatcher_handle_t make(actual_dispatcher_iface_shptr_t disp) noexcept
Multi-producer/Multi-consumer queue of pointers to event queues.
void schedule(T *queue) noexcept
Schedule execution of demands from the queue.
void push_preallocated(std::unique_ptr< demand_t > tail_demand) noexcept
Helper method that implements pushing of a new preallocated demand to the queue.
void wait_for_emptyness() noexcept
Wait while queue becomes empty.
Interface for dispatcher binders.
SObjectizer Environment.
An interface of event queue for agent.
Template class for smart reference wrapper on the atomic_refcounted_t.
Helper class for indication of long-lived reference via its type.
Definition outliving.hpp:98
T & get() const noexcept
#define SO_5_FUNC
Definition declspec.hpp:48
void adjust_thread_count(disp_params_t &params)
Sets the thread count to default value if used do not specify actual thread count.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance nef_thread_pool dispatcher.
Reusable components for dispatchers.
std::size_t default_thread_pool_size()
A helper function for detecting default thread count for thread pool.
std::unique_ptr< Disp_Iface_Type > make_actual_dispatcher(outliving_reference_t< environment_t > env, const std::string_view name_base, Disp_Params_Type disp_params, Args &&...args)
Helper function for creation of dispatcher instance with respect to work thread activity tracking fla...
Reusable implementation of some thread pool dispatcher functionality which can be used in other threa...
Internal implementation details of thread pool dispatcher.
Thread pool dispatcher.
Event dispatchers.
Private part of message limit implementation.
Definition agent.cpp:33
outliving_reference_t< T > outliving_mutable(T &r)
Make outliving_reference wrapper for mutable reference.
Adaptation of common implementation of thread-pool-like dispatcher to the specific of this thread-poo...
static bool is_individual_fifo(const bind_params_t &) noexcept
static constexpr std::string_view dispatcher_type_name() noexcept
static void wait_for_queue_emptyness(agent_queue_with_preallocated_finish_demand_t &queue) noexcept
A description of event execution demand.