SObjectizer-5 Extra
pub.hpp
Go to the documentation of this file.
1 /*!
2  * \file
3  * \brief Implementation of Asio's Thread Pool dispatcher.
4  *
5  * \since
6  * v.1.0.2
7  */
8 
9 #pragma once
10 
11 #include <so_5_extra/error_ranges.hpp>
12 
13 #include <so_5/disp_binder.hpp>
14 #include <so_5/send_functions.hpp>
15 
16 #include <so_5/disp/reuse/actual_work_thread_factory_to_use.hpp>
17 #include <so_5/disp/reuse/work_thread_activity_tracking.hpp>
18 #include <so_5/disp/reuse/data_source_prefix_helpers.hpp>
19 #include <so_5/disp/reuse/work_thread_factory_params.hpp>
20 
21 #include <so_5/stats/repository.hpp>
22 #include <so_5/stats/messages.hpp>
23 #include <so_5/stats/std_names.hpp>
24 #include <so_5/stats/impl/activity_tracking.hpp>
25 
26 #include <so_5/details/invoke_noexcept_code.hpp>
27 #include <so_5/details/rollback_on_exception.hpp>
28 #include <so_5/details/abort_on_fatal_error.hpp>
29 
30 #include <so_5/outliving.hpp>
31 
32 #include <asio/io_context.hpp>
33 #include <asio/io_context_strand.hpp>
34 #include <asio/post.hpp>
35 
36 namespace so_5 {
37 
38 namespace extra {
39 
40 namespace disp {
41 
42 namespace asio_thread_pool {
43 
44 namespace errors {
45 
46 //! Asio IoService is not set for asio_thread_pool dispatcher.
49 
50 } /* namespace errors */
51 
52 //
53 // disp_params_t
54 //
55 /*!
56  * \brief Parameters for %asio_thread_pool dispatcher.
57  *
58  * \since
59  * v.1.0.2
60  */
64  {
69 
70  public :
71  //! Default constructor.
72  disp_params_t() = default;
73 
74  friend inline void
76  disp_params_t & a, disp_params_t & b ) noexcept
77  {
78  using std::swap;
79 
80  swap(
81  static_cast< activity_tracking_mixin_t & >(a),
82  static_cast< activity_tracking_mixin_t & >(b) );
83 
84  swap(
85  static_cast< thread_factory_mixin_t & >(a),
86  static_cast< thread_factory_mixin_t & >(b) );
87 
88  swap( a.m_thread_count, b.m_thread_count );
89  swap( a.m_io_context, b.m_io_context );
90  }
91 
92  //! Setter for thread count.
94  thread_count( std::size_t count )
95  {
96  m_thread_count = count;
97  return *this;
98  }
99 
100  //! Getter for thread count.
101  std::size_t
102  thread_count() const
103  {
104  return m_thread_count;
105  }
106 
107  //! Use external Asio io_context object with dispatcher.
108  /*!
109  * Usage example:
110  * \code
111  * int main() {
112  * asio::io_context svc;
113  * so_5::launch( [&](so_5::environment_t & env) {
114  * namespace asio_tp = so_5::extra::disp::asio_thread_pool;
115  * auto disp = asio_tp::create_private_disp(
116  * env, "asio_tp",
117  * asio_tp::disp_params_t{}.use_external_io_context(
118  * so_5::outliving_mutable(svc) ) );
119  * ...
120  * } );
121  * }
122  * \endcode
123  */
124  disp_params_t &
126  ::asio::io_context & service )
127  {
128  m_io_context = std::shared_ptr< ::asio::io_context >(
129  std::addressof( service ),
130  // Empty deleter.
131  [](::asio::io_context *) {} );
132  return *this;
133  }
134 
135  //! Use external Asio io_context object with dispatcher.
136  /*!
137  * \note
138  * Ownership of this io_context object must be shared with
139  * others.
140  */
141  disp_params_t &
143  std::shared_ptr< ::asio::io_context > service )
144  {
145  m_io_context = std::move(service);
146  return *this;
147  }
148 
149  //! Use own Asio io_context object.
150  /*!
151  * Note this object will be dynamically created at the start
152  * of the dispatcher. And will be destroyed with the dispatcher object.
153  *
154  * A created io_context can be accessed later via io_context() method.
155  */
156  disp_params_t &
158  {
159  m_io_context = std::make_shared< ::asio::io_context >();
160  return *this;
161  }
162 
163  //! Get the io_context.
165  io_context() const noexcept
166  {
167  return m_io_context;
168  }
169 
170  private :
171  //! Count of working threads.
172  /*!
173  * Value 0 means that actual thread will be detected automatically.
174  */
176 
177  //! Asio's io_context which must be used with this dispatcher.
179  };
180 
181 namespace impl {
182 
184 
185 //
186 // basic_dispatcher_iface_t
187 //
188 /*!
189  * \brief The very basic interface of %asio_thread_pool dispatcher.
190  *
191  * This class contains a minimum that is necessary for implementation
192  * of dispatcher_handle class.
193  *
194  * \since
195  * v.1.3.0
196  */
199  {
200  public :
201  virtual ~basic_dispatcher_iface_t() noexcept = default;
202 
203  //! Create a binder for that dispatcher.
204  /*!
205  * The binder will use an external strand object.
206  */
207  [[nodiscard]]
208  virtual disp_binder_shptr_t
210 
211  //! Create a binder for that dispatcher.
212  /*!
213  * The binder will use an internal (automatically created)
214  * strand object.
215  */
216  [[nodiscard]]
217  virtual disp_binder_shptr_t
219 
220  //! Get reference to io_context from that dispatcher.
221  virtual ::asio::io_context &
222  io_context() const noexcept = 0;
223  };
224 
227 
229 
230 } /* namespace impl */
231 
232 //
233 // dispatcher_handle_t
234 //
235 
236 /*!
237  * \brief A handle for %asio_thread_pool dispatcher.
238  *
239  * \since
240  * v.1.3.0
241  */
242 class [[nodiscard]] dispatcher_handle_t
243  {
245 
246  //! A reference to actual implementation of a dispatcher.
248 
250  impl::basic_dispatcher_iface_shptr_t dispatcher ) noexcept
252  {}
253 
254  //! Is this handle empty?
255  [[nodiscard]]
256  bool
257  empty() const noexcept { return !m_dispatcher; }
258 
259  public :
260  dispatcher_handle_t() noexcept = default;
261 
262  //! Get a binder for that dispatcher.
263  /*!
264  * This method requires a reference to manually created strand
265  * object for protection of agents bound via binder returned.
266  * A user should create this strand object and ensure the right
267  * lifetime of it.
268  *
269  * Usage example:
270  * \code
271  * using namespace so_5::extra::disp::asio_thread_pool;
272  *
273  * asio::io_context io_ctx;
274  * asio::io_context::strand agents_strand{ io_ctx };
275  *
276  * so_5::environment_t & env = ...;
277  * auto disp = make_dispatcher( env, "my_disp", io_ctx );
278  *
279  * env.introduce_coop( [&]( so_5::coop_t & coop ) {
280  * coop.make_agent_with_binder< some_agent_type >(
281  * disp.binder( agents_strand ),
282  * ... );
283  *
284  * coop.make_agent_with_binder< another_agent_type >(
285  * disp.binder( agents_strand ),
286  * ... );
287  *
288  * ...
289  * } );
290  * \endcode
291  *
292  * \attention
293  * An attempt to call this method on empty handle is UB.
294  */
295  [[nodiscard]]
298  ::asio::io_context::strand & strand ) const
299  {
300  return m_dispatcher->binder_with_external_strand( strand );
301  }
302 
303  //! Get a binder for that dispatcher.
304  /*!
305  * This method requires creates an internal strand object by itself.
306  *
307  * Usage example:
308  * \code
309  * using namespace so_5::extra::disp::asio_thread_pool;
310  *
311  * asio::io_context io_ctx;
312  *
313  * so_5::environment_t & env = ...;
314  * auto disp = make_dispatcher( env, "my_disp", io_ctx );
315  *
316  * env.introduce_coop( [&]( so_5::coop_t & coop ) {
317  * // This agent will use its own strand object.
318  * coop.make_agent_with_binder< some_agent_type >(
319  * disp.binder(),
320  * ... );
321  *
322  * // This agent will use its own strand object.
323  * coop.make_agent_with_binder< another_agent_type >(
324  * disp.binder(),
325  * ... );
326  * ...
327  * } );
328  * \endcode
329  *
330  * \attention
331  * An attempt to call this method on empty handle is UB.
332  *
333  * \since
334  * v.1.3.0
335  */
336  [[nodiscard]]
338  binder() const
339  {
340  return m_dispatcher->binder_with_own_strand();
341  }
342 
343  //! Get reference to io_context from that dispatcher.
344  /*!
345  * \attention
346  * An attempt to call this method on empty handle is UB.
347  */
348  [[nodiscard]]
349  ::asio::io_context &
350  io_context() noexcept
351  {
352  return m_dispatcher->io_context();
353  }
354 
355  //! Is this handle empty?
356  [[nodiscard]]
357  operator bool() const noexcept { return empty(); }
358 
359  //! Does this handle contain a reference to dispatcher?
360  [[nodiscard]]
361  bool
362  operator!() const noexcept { return !empty(); }
363 
364  //! Drop the content of handle.
365  void
366  reset() noexcept { m_dispatcher.reset(); }
367  };
368 
369 namespace impl {
370 
371 //
372 // demands_counter_t
373 //
374 /*!
375  * \brief Type of atomic counter for counting waiting demands.
376  *
377  * \since
378  * v.1.0.2
379  */
381 
382 //
383 // actual_dispatcher_iface_t
384 //
385 /*!
386  * \brief An actual interface of thread pool dispatcher.
387  *
388  * \since
389  * v.1.3.0
390  */
392  {
393  public :
394  //! Notification about binding of yet another agent.
395  virtual void
396  agent_bound() noexcept = 0;
397 
398  //! Notification about unbinding of an agent.
399  virtual void
400  agent_unbound() noexcept = 0;
401 
402  //! Get a reference for counter of pending demands.
403  virtual demands_counter_t &
404  demands_counter() noexcept = 0;
405  };
406 
407 //
408 // actual_dispatcher_shptr_t
409 //
412 
413 //
414 // thread_local_ptr_holder_t
415 //
416 /*!
417  * \brief A helper for declaration of static and thread_local pointer
418  * in a header file.
419  *
420  * If non-template class will define a static member in a header file
421  * then there is a possibility to get a link-time error about multiple
422  * definition of that member. But if a static member is defined for
423  * template class then there won't be such problem.
424  *
425  * A typical usage intended to be:
426  * \code
427  * class some_useful_class_t : public thread_local_ptr_holder_t<some_useful_class_t> {
428  * ...
429  * };
430  * \endcode
431  *
432  * \since
433  * v.1.0.2
434  */
435 template< class T >
437  {
438  private :
439  //! Value of the pointer which need to be stored.
440  static thread_local T * m_ptr;
441 
442  protected :
443  //! Access to the current value of the pointer.
444  static T *
445  ptr() noexcept { return m_ptr; }
446 
447  //! Setter for the pointer.
448  static void
449  set_ptr( T * p ) noexcept { m_ptr = p; }
450  };
451 
452 template< class T >
453 thread_local T * thread_local_ptr_holder_t<T>::m_ptr = nullptr;
454 
455 //
456 // work_thread_t
457 //
458 /*!
459  * \brief Base type for implementations of work thread wrappers.
460  *
461  * Work thread wrapper creates an instance of some type on the stack
462  * of the new thread. Then the pointer of this instance is stored in
463  * thread_local variable (as a pointer to work_thread_t). This pointer
464  * then can be retrieved later by demand handlers to get access to
465  * some dispatcher-specific data.
466  *
467  * It is assumed that there will be two derived classes:
468  * 1. One for the case when thread activity should not be tracked.
469  * 2. Another for the case when thread activity must be tracked.
470  *
471  * These derived classes will reuse some functionality from
472  * work_thread_t. And should implement on_demand() method for
473  * actual demands processing.
474  *
475  * \since
476  * v.1.0.2
477  */
479  {
480  private :
481  //! ID of the work thread.
482  /*!
483  * Gets its value in the constructor and doesn't changed later.
484  */
486 
487  protected :
488  // Constructor and destructor are accessible for derived classes only.
491  {}
492 
493  // Just to make compilers happy.
494  virtual ~work_thread_t() = default;
495 
496  //! Actual processing of the demand.
497  /*!
498  * Must be implemented in derived classes.
499  */
500  virtual void
501  on_demand( execution_demand_t demand ) noexcept = 0;
502 
503  //! ID of the work thread.
505  thread_id() const noexcept
506  {
507  return m_thread_id;
508  }
509 
510  public :
511  //! Lauch processing of demand on the context of current thread.
512  /*!
513  * Creates an instance of Derived class, stores pointer to it into
514  * a thread_local static variable, then calls io_svc.run() method.
515  *
516  * \attention
517  * Terminates the whole application if an exception will be thrown.
518  *
519  * \tparam Derived Type of an object to be created on the stack.
520  * \tparam Args Types of arguments for Derived's constructor.
521  */
522  template< typename Derived, typename... Args >
523  static void
525  //! SObjectizer Environment for which work thread was created.
526  environment_t & env,
527  //! Asio IoService to be run on the context of that thread.
528  ::asio::io_context & io_svc,
529  //! Arguments to Derived's constructor.
530  Args &&... args )
531  {
532  // We don't expect any errors here.
533  // But if something happens then there is no way to
534  // recover and the whole application should be aborted.
535  try
536  {
538  // actual_handler must be accessible via thread_local variable.
540 
541  // Prevent return from io_context::run() if there is no
542  // more Asio's events.
543  auto work = ::asio::make_work_guard( io_svc );
544  io_svc.run();
545  }
546  catch( const std::exception & x )
547  {
550  log_stream << "An exception caught in work thread "
551  "of so_5::extra::disp::asio_thread_pool dispatcher."
552  " Exception: "
553  << x.what() << std::endl;
554  }
555  } );
556  }
557  catch( ... )
558  {
561  log_stream << "An unknown exception caught in work thread "
562  "of so_5::extra::disp::asio_thread_pool dispatcher."
563  << std::endl;
564  }
565  } );
566  }
567  }
568 
569  //! An interface method for passing a demand to processing.
570  static void
571  handle_demand( execution_demand_t demand )
572  {
573  ptr()->on_demand( std::move(demand) );
574  }
575  };
576 
577 //
578 // work_thread_without_activity_tracking_t
579 //
580 /*!
581  * \brief An implementation of work thread stuff for the case when
582  * thread activity tracking is not needed.
583  *
584  * \since
585  * v.1.0.2
586  */
587 class work_thread_without_activity_tracking_t final : public work_thread_t
588  {
589  public :
591  ~work_thread_without_activity_tracking_t() override = default;
592 
593  protected :
594  virtual void
595  on_demand( execution_demand_t demand ) noexcept override
596  {
597  demand.call_handler( thread_id() );
598  }
599  };
600 
601 //
602 // work_thread_activity_collector_t
603 //
604 /*!
605  * \brief Type of collector of work thread activity data.
606  *
607  * Objects of this class store also an ID of work thread. This ID is
608  * necessary for so_5::stats::messages::work_thread_activity message.
609  * Because of that a work thread must call setup_thread_id() method
610  * before use of activity collector.
611  *
612  * \since
613  * v.1.0.2
614  */
616  {
617  private :
618  //! ID of thread for which activity stats is collected.
620 
621  //! Collected activity stats.
625 
626  public :
627  /*!
628  * \brief Setup ID of the current work thread.
629  *
630  * \attention
631  * Must be called as soon as possible after the start of the work thread.
632  */
633  void
634  setup_thread_id( current_thread_id_t tid )
635  {
636  m_thread_id = std::move(tid);
637  }
638 
639  /*!
640  * \brief Get the ID of the thread.
641  *
642  * \attention
643  * Returns actual value only after call to setup_thread_id.
644  */
646  thread_id() const noexcept { return m_thread_id; }
647 
648  /*!
649  * \brief Mark start point of new activity.
650  */
651  void
652  activity_started() noexcept
653  {
654  m_work_activity.start();
655  }
656 
657  /*!
658  * \brief Mark completion of the current activity.
659  */
660  void
661  activity_finished() noexcept
662  {
663  m_work_activity.stop();
664  }
665 
666  /*!
667  * \brief Get the current stats.
668  */
671  {
672  ::so_5::stats::work_thread_activity_stats_t result;
673  result.m_working_stats = m_work_activity.take_stats();
674 
675  return result;
676  }
677  };
678 
679 //
680 // work_thread_with_activity_tracking_t
681 //
682 /*!
683  * \brief An implementation of work thread stuff for the case when
684  * thread activity tracking must be used.
685  *
686  * \since
687  * v.1.0.2
688  */
689 class work_thread_with_activity_tracking_t final : public work_thread_t
690  {
691  private :
692  //! Activity statistics.
694 
695  public :
697  outliving_reference_t< work_thread_activity_collector_t > activity_stats )
699  {
700  // Collector must receive ID of this thread.
701  m_activity_stats.get().setup_thread_id( thread_id() );
702  }
703 
704  ~work_thread_with_activity_tracking_t() override = default;
705 
706  protected :
707  virtual void
708  on_demand( execution_demand_t demand ) noexcept override
709  {
710  m_activity_stats.get().activity_started();
711 
712  demand.call_handler( thread_id() );
713 
714  m_activity_stats.get().activity_finished();
715  }
716  };
717 
718 //
719 // class basic_binder_impl_t
720 //
721 /*!
722  * \brief Basic part of implementation of a binder for %asio_thread_pool
723  * dispatcher.
724  *
725  * \since
726  * v.1.3.0
727  */
729  : public disp_binder_t
730  , public event_queue_t
731  {
732  public :
733  //! Initializing constructor.
735  //! The actual dispatcher to be used with that binder.
736  actual_dispatcher_shptr_t dispatcher )
738  {}
739 
740  void
742  agent_t & /*agent*/ ) override
743  {
744  // There is no need to do something.
745  }
746 
747  void
749  agent_t & /*agent*/ ) noexcept override
750  {
751  // There is no need to do something.
752  }
753 
754  void
756  agent_t & agent ) noexcept override
757  {
758  // Dispatcher should know about yet another agent bound.
759  m_dispatcher->agent_bound();
760  // Agent should receive its event_queue.
761  agent.so_bind_to_dispatcher( *this );
762  }
763 
764  void
766  agent_t & /*agent*/ ) noexcept override
767  {
768  // Dispatcher should know about yet another agent unbound.
769  m_dispatcher->agent_bound();
770  }
771 
772  protected :
773  //! The actual dispatcher.
775  };
776 
777 //
778 // binder_template_t
779 //
780 /*!
781  * \brief An implementation of a binder for %asio_thread_pool dispatcher.
782  *
783  * This binder is also an event_queue for the agents bound via that binder.
784  *
785  * There is no such thing as event_queue for %asio_thread_pool dispacher.
786  * All execution demands will be stored inside Asio IoServce and dispatched
787  * for execution via asio::post mechanism. But SObjectizer requires
788  * an implementation of event_queue which must be used for agents bound
789  * to %asio_thread_pool dispatcher. This class implements this event_queue
790  * concepts.
791  *
792  * This templates implements CRTP and should be parametrized by
793  * derived type. The derived type should provide method:
794  * \code
795  * ::asio::io_context::strand & strand() noexcept;
796  * \endcode
797  *
798  * \since
799  * v.1.3.0
800  */
801 template< typename Derived >
803  : public basic_binder_impl_t
804  {
805  auto &
806  self_reference() noexcept
807  {
808  return static_cast< Derived & >( *this );
809  }
810 
811  public :
812  using basic_binder_impl_t::basic_binder_impl_t;
813 
814  void
815  push( execution_demand_t demand ) override
816  {
818 
819  // Another demand will wait for processing.
820  ++counter;
821 
823  [d = std::move(demand), &counter]() mutable {
824  // Another demand will be processed.
825  --counter;
826 
827  // Delegate processing of the demand to actual
828  // work thread.
830  } );
831  }
832 
833  void
834  push_evt_start( execution_demand_t demand ) override
835  {
836  // Just delegate to the ordinary push().
837  this->push( std::move(demand) );
838  }
839 
840  void
841  push_evt_finish( execution_demand_t demand ) noexcept override
842  {
843  // Just delegate to the ordinary push() despite
844  // the fact that push() isn't noexcept.
845  this->push( std::move(demand) );
846  }
847  };
848 
849 //
850 // binder_with_external_strand_t
851 //
852 /*!
853  * \brief An implementation of binder that uses an external strand object.
854  *
855  * \since
856  * v.1.3.0
857  */
858 class binder_with_external_strand_t final
859  : public binder_template_t< binder_with_external_strand_t >
860  {
861  using base_type = binder_template_t< binder_with_external_strand_t >;
862 
863  public :
865  actual_dispatcher_shptr_t dispatcher,
866  outliving_reference_t< ::asio::io_context::strand > strand )
868  , m_strand{ strand }
869  {}
870 
871  ::asio::io_context::strand &
872  strand() noexcept { return m_strand.get(); }
873 
874  private :
875  //! Strand to be used with this event_queue.
877  };
878 
879 //
880 // binder_with_own_strand_t
881 //
882 /*!
883  * \brief An implementation of binder that uses an own strand object.
884  *
885  * This own strand object will be a part of the binder object.
886  *
887  * \since
888  * v.1.3.0
889  */
890 class binder_with_own_strand_t final
891  : public binder_template_t< binder_with_own_strand_t >
892  {
893  using base_type = binder_template_t< binder_with_own_strand_t >;
894 
895  public :
897  actual_dispatcher_shptr_t dispatcher )
900  {}
901 
902  ::asio::io_context::strand &
903  strand() noexcept { return m_strand; }
904 
905  private :
906  //! Strand to be used with this event_queue.
908  };
909 
910 //
911 // basic_dispatcher_skeleton_t
912 //
913 /*!
914  * \brief Basic stuff for all implementations of dispatcher.
915  *
916  * Derived classes should implement the following virtual methods:
917  * - data_source();
918  * - launch_work_threads();
919  * - wait_work_threads().
920  *
921  * \since
922  * v.1.0.2
923  */
925  {
926  protected :
928  friend class disp_data_source_t;
929 
930  public:
932  ::so_5::environment_t & env,
933  disp_params_t params )
938  params,
939  env )
940  )
941  {
942  }
943 
944  [[nodiscard]]
947  ::asio::io_context::strand & strand ) override
948  {
949  return { std::make_shared< binder_with_external_strand_t >(
950  shared_from_this(),
951  outliving_mutable(strand) )
952  };
953  }
954 
955  [[nodiscard]]
958  {
959  return { std::make_shared< binder_with_own_strand_t >(
960  shared_from_this() )
961  };
962  }
963 
964  ::asio::io_context &
965  io_context() const noexcept override { return *m_io_context; }
966 
967  void
968  agent_bound() noexcept override
969  {
970  ++m_agents_bound;
971  }
972 
973  void
974  agent_unbound() noexcept override
975  {
976  --m_agents_bound;
977  }
978 
980  demands_counter() noexcept override
981  {
982  return m_demands_counter;
983  }
984 
985  protected :
986  void
988  environment_t & env,
989  std::string_view data_sources_name_base )
990  {
991  data_source().set_data_sources_name_base( data_sources_name_base );
992  data_source().start( env.stats_repository() );
993 
994  ::so_5::details::do_with_rollback_on_exception(
995  [&] { launch_work_threads(env); },
996  [this] { data_source().stop(); } );
997  }
998 
999  void
1001  {
1002  ::so_5::details::invoke_noexcept_code( [this] {
1003  // Stopping Asio IO service.
1004  m_io_context->stop();
1005  } );
1006  }
1007 
1008  void
1010  {
1011  ::so_5::details::invoke_noexcept_code( [this] {
1012  // Waiting for complete stop of all work threads.
1013  wait_work_threads();
1014  // Stopping data source.
1015  data_source().stop();
1016  } );
1017  }
1018 
1019  protected :
1020  /*!
1021  * \brief Get the count of work threads to be created.
1022  */
1023  [[nodiscard]]
1024  std::size_t
1025  thread_count() const noexcept { return m_thread_count; }
1026 
1027  /*!
1028  * \brief Get access to actual data source instance for that
1029  * dispatcher.
1030  */
1031  [[nodiscard]]
1032  virtual disp_data_source_t &
1033  data_source() noexcept = 0;
1034 
1035  /*!
1036  * \brief Get access to thread factory to be used for that dispatcher.
1037  *
1038  * \since v.1.5.0
1039  */
1040  [[nodiscard]]
1042  thread_factory() const noexcept { return m_thread_factory; }
1043 
1044 #if defined(__clang__)
1045 #pragma clang diagnostic push
1046 #pragma clang diagnostic ignored "-Wnon-virtual-dtor"
1047 #endif
1048 
1049  /*!
1050  * \brief Data source for run-time monitoring of whole dispatcher.
1051  *
1052  * \since
1053  * v.1.0.2
1054  */
1056  : public ::so_5::stats::source_t
1057  {
1058  //! Dispatcher to work with.
1060 
1061  //! Basic prefix for data sources.
1063 
1064  //! Data source repository.
1065  /*!
1066  * Will receive actual value in start() method.
1067  *
1068  * \since
1069  * v.1.3.0
1070  */
1072 
1073  protected :
1074  //! Access to data source prefix for derived classes.
1075  const ::so_5::stats::prefix_t &
1076  base_prefix() const noexcept { return m_base_prefix; }
1077 
1078  public :
1080  : m_dispatcher( disp )
1081  {}
1082 
1083  virtual void
1084  distribute( const mbox_t & mbox ) override
1085  {
1086  const auto agents_count = m_dispatcher.m_agents_bound.load(
1087  std::memory_order_acquire );
1088 
1089  const auto demands_count = m_dispatcher.m_demands_counter.load(
1090  std::memory_order_acquire );
1091 
1092  send< ::so_5::stats::messages::quantity< std::size_t > >(
1093  mbox,
1094  m_base_prefix,
1095  ::so_5::stats::suffixes::agent_count(),
1096  agents_count );
1097 
1098  // Note: because there is no way to detect on which thread a
1099  // demand will be handled, the total number of waiting
1100  // demands is destributed for the whole dispatcher.
1101  send< ::so_5::stats::messages::quantity< std::size_t > >(
1102  mbox,
1103  m_base_prefix,
1104  ::so_5::stats::suffixes::work_thread_queue_size(),
1105  demands_count );
1106  }
1107 
1108  void
1110  std::string_view name_base )
1111  {
1112  using namespace ::so_5::disp::reuse;
1113 
1114  m_base_prefix = make_disp_prefix(
1115  "ext-asio-tp",
1116  name_base,
1117  &m_dispatcher );
1118  }
1119 
1120  void
1121  start( ::so_5::stats::repository_t & repo )
1122  {
1123  repo.add( *this );
1124  m_stats_repo = &repo;
1125  }
1126 
1127  void
1128  stop() noexcept
1129  {
1130  m_stats_repo->remove( *this );
1131  }
1132  };
1133 
1134 #if defined(__clang__)
1135 #pragma clang diagnostic pop
1136 #endif
1137 
1138  private:
1139  //! Count of work threads.
1141 
1142  //! IO Service to work with.
1144 
1145  /*!
1146  * \brief Thread factory to be used with this dispatcher.
1147  *
1148  * \since v.1.5.0
1149  */
1151 
1152  //! Count of agents bound to that dispatcher.
1154 
1155  //! Count of waiting demands.
1157 
1158  //! Start all working threads.
1159  virtual void
1161  //! SObjectizer Environment for which threads will be created.
1162  environment_t & env ) = 0;
1163 
1164  //! Wait for finish of all threads.
1165  /*!
1166  * It is a blocking call. The current thread will be stopped until
1167  * all work thread will finish their work.
1168  */
1169  virtual void
1170  wait_work_threads() noexcept = 0;
1171  };
1172 
1173 //
1174 // dispatcher_skeleton_without_thread_activity_tracking_t
1175 //
1176 /*!
1177  * \brief Extension of basic dispatcher skeleton for the case when
1178  * work thread activity is not collected.
1179  *
1180  * This class contains disp_data_source_t instance and implements
1181  * virtual method data_source() for accessing this instance.
1182  *
1183  * It also provides static method run_work_thread() which must be called
1184  * at the beginnig of work thread.
1185  *
1186  * \since
1187  * v.1.0.2
1188  */
1191  {
1192  public :
1194  environment_t & env,
1195  disp_params_t params )
1197  {}
1198 
1199  protected :
1200  virtual disp_data_source_t &
1201  data_source() noexcept { return m_data_source; }
1202 
1203  //! Implementation of main function for a work thread.
1204  static void
1206  environment_t & env,
1207  ::asio::io_context & io_svc,
1209  std::size_t /*index*/ )
1210  {
1211  work_thread_t::run< work_thread_without_activity_tracking_t >(
1212  env, io_svc );
1213  }
1214 
1215  private :
1216  //! Actual data source instance.
1218  };
1219 
1220 //
1221 // dispatcher_skeleton_with_thread_activity_tracking_t
1222 //
1223 /*!
1224  * \brief Extension of basic dispatcher skeleton for the case when
1225  * work thread activity must be collected.
1226  *
1227  * This class defines its own actual_disp_data_source_t type and
1228  * contains an instance of that type. There is also implementation
1229  * of data_source() virtual method for accessing this instance.
1230  *
1231  * It provides static method run_work_thread() which must be called
1232  * at the beginnig of work thread.
1233  *
1234  * \since
1235  * v.1.0.2
1236  */
1239  {
1240 #if defined(__clang__)
1241 #pragma clang diagnostic push
1242 #pragma clang diagnostic ignored "-Wnon-virtual-dtor"
1243 #endif
1244  /*!
1245  * \brief Actual data source type for dispatcher with
1246  * work thread activity tracking.
1247  *
1248  * \since
1249  * v.1.0.2
1250  */
1252  : public disp_data_source_t
1253  {
1254  private :
1255  //! Collectors for run-time stats for every thread.
1256  std::vector<
1259 
1260  public :
1263  std::size_t thread_count )
1264  : disp_data_source_t( disp )
1266  {
1267  for( auto & c : m_collectors )
1268  c = std::make_unique< work_thread_activity_collector_t >();
1269  }
1270 
1271  virtual void
1272  distribute( const mbox_t & mbox ) override
1273  {
1274  disp_data_source_t::distribute( mbox );
1275 
1276  for( std::size_t i = 0; i != m_collectors.size(); ++i )
1277  distribute_stats_for_work_thread_at( mbox, i );
1278  }
1279 
1280  /*!
1281  * \note \a index is not checked for validity!
1282  */
1284  collector_at( std::size_t index ) noexcept
1285  {
1286  return *(m_collectors[index]);
1287  }
1288 
1289  private :
1290  void
1292  const mbox_t & mbox,
1293  std::size_t index )
1294  {
1295  std::ostringstream ss;
1296  ss << base_prefix().c_str() << "/wt-" << index;
1297 
1298  const ::so_5::stats::prefix_t prefix{ ss.str() };
1299  auto & collector = collector_at( index );
1300 
1301  so_5::send< ::so_5::stats::messages::work_thread_activity >(
1302  mbox,
1303  prefix,
1304  ::so_5::stats::suffixes::work_thread_activity(),
1305  collector.thread_id(),
1306  collector.take_activity_stats() );
1307  }
1308  };
1309 
1310 #if defined(__clang__)
1311 #pragma clang diagnostic pop
1312 #endif
1313 
1314  public :
1316  environment_t & env,
1317  disp_params_t params )
1320  {}
1321 
1322  protected :
1323  virtual disp_data_source_t &
1324  data_source() noexcept override { return m_actual_data_source; }
1325 
1326  //! Implementation of main function for a work thread.
1327  static void
1329  //! SObjectizer Environment for which the work thread is created.
1330  environment_t & env,
1331  //! Asio IoService to be used.
1332  ::asio::io_context & io_svc,
1333  //! Dispatcher who owns this thread.
1335  //! Ordinal number of this thread.
1336  std::size_t index )
1337  {
1338  work_thread_t::run< work_thread_with_activity_tracking_t >(
1339  env,
1340  io_svc,
1341  outliving_mutable(
1342  self.m_actual_data_source.collector_at(index) ) );
1343  }
1344 
1345  private :
1346  //! Data source instance.
1348  };
1349 
1350 //
1351 // dispatcher_template_t
1352 //
1353 /*!
1354  * \brief Template-based implementation of dispatcher.
1355  *
1356  * Implements virual methods launch_work_threads() and wait_work_threads()
1357  * from basic_dispatcher_skeleton_t.
1358  *
1359  * \tparam Traits Traits-type to be used.
1360  * \tparam Basic_Skeleton A specific skeleton to be used as base type.
1361  * It expected to be dispatcher_skeleton_with_thread_activity_tracking_t or
1362  * dispatcher_skeleton_without_thread_activity_tracking_t.
1363  *
1364  * \since
1365  * v.1.0.2
1366  */
1367 template<
1368  typename Traits,
1369  typename Basic_Skeleton >
1370 class dispatcher_template_t final : public Basic_Skeleton
1371  {
1372  public:
1374  //! SObjectizer Environment to work in.
1375  outliving_reference_t< environment_t > env,
1376  //! Value for creating names of data sources for
1377  //! run-time monitoring.
1378  std::string_view data_sources_name_base,
1379  //! Parameters for the dispatcher.
1380  disp_params_t params )
1381  : Basic_Skeleton{ env.get(), std::move(params) }
1382  {
1383  this->start( env.get(), data_sources_name_base );
1384  }
1385 
1386  ~dispatcher_template_t() noexcept override
1387  {
1388  this->shutdown();
1389  this->wait();
1390  }
1391 
1392  private:
1393  //! An alias for thread_holder.
1395 
1396  //! Working threads.
1398 
1399  virtual void
1401  environment_t & env ) override
1402  {
1403  using namespace std;
1404 
1405  m_threads.resize( this->thread_count() );
1406 
1408  for( std::size_t i = 0u; i != this->thread_count(); ++i )
1409  {
1410  m_threads[ i ] = this->make_work_thread( env, i );
1411  }
1412  },
1413  [&] {
1414  ::so_5::details::invoke_noexcept_code( [&] {
1415  this->io_context().stop();
1416 
1417  // Shutdown all started threads.
1418  for( auto & t : m_threads )
1419  if( t )
1420  {
1421  t.unchecked_get().join();
1422  t = thread_holder_t{};
1423  }
1424  else
1425  // No more started threads.
1426  break;
1427  } );
1428  } );
1429  }
1430 
1431  virtual void
1432  wait_work_threads() noexcept override
1433  {
1434  for( auto & t : m_threads )
1435  {
1436  t.unchecked_get().join();
1437  t = thread_holder_t{};
1438  }
1439  }
1440 
1443  environment_t & env,
1444  std::size_t index )
1445  {
1446  Basic_Skeleton * self = this;
1448  this->thread_factory()->acquire( env ),
1449  this->thread_factory()
1450  };
1452  [&env, io_svc = &this->io_context(), self, index]()
1453  {
1455  } );
1456 
1457  return work_thread;
1458  }
1459  };
1460 
1461 //
1462 // dispatcher_handle_maker_t
1463 //
1465  {
1466  public :
1467  [[nodiscard]]
1468  static dispatcher_handle_t
1469  make( actual_dispatcher_shptr_t disp ) noexcept
1470  {
1471  return { std::move( disp ) };
1472  }
1473  };
1474 
1475 } /* namespace impl */
1476 
1477 //
1478 // default_thread_pool_size
1479 //
1480 /*!
1481  * \brief A helper function for detecting default thread count for
1482  * thread pool.
1483  *
1484  * \since
1485  * v.1.0.2
1486  */
1487 inline std::size_t
1489  {
1490  auto c = std::thread::hardware_concurrency();
1491  if( !c )
1492  c = 2;
1493 
1494  return c;
1495  }
1496 
1497 //
1498 // default_traits_t
1499 //
1500 /*!
1501  * \brief Default traits of %asio_thread_pool dispatcher.
1502  *
1503  * \note
1504  * This type is empty in v.1.5.0.
1505  * It is left empty intentionally to have a possibility to extend it later, in
1506  * some future version.
1507  *
1508  * \since
1509  * v.1.0.2
1510  */
1512  {
1513  };
1514 
1515 //
1516 // make_dispatcher
1517 //
1518 /*!
1519  * \brief A function for creation an instance of %asio_thread_pool dispatcher.
1520  *
1521  * Usage examples:
1522  * \code
1523  * // Dispatcher which uses own Asio IoContext and default traits.
1524  * namespace asio_tp = so_5::extra::disp::asio_thread_pool;
1525  * asio_tp::disp_params_t params;
1526  * params.use_own_io_context(); // Asio IoContext object will be created here.
1527  * // This object will be accessible later via
1528  * // dispatcher_handle_t::io_context() method.
1529  * auto disp = asio_tp::make_dispatcher(
1530  * env,
1531  * "my_asio_tp",
1532  * std::move(disp_params) );
1533  *
1534  *
1535  * // Dispatcher which uses external Asio IoContext and default traits.
1536  * asio::io_context & io_svc = ...;
1537  * namespace asio_tp = so_5::extra::disp::asio_thread_pool;
1538  * asio_tp::disp_params_t params;
1539  * params.use_external_io_context( io_svc );
1540  * auto disp = asio_tp::make_dispatcher(
1541  * env,
1542  * "my_asio_tp",
1543  * std::move(disp_params) );
1544  * \endcode
1545  *
1546  * \par Requirements for traits type
1547  * The Traits-type is empty in v.1.5.0. There was a possibitily to specify
1548  * a custom thread type in previous versions of so_5_extra, but since v.1.5.0
1549  * custom threads are supported via standard SObjectizer's mechanism based
1550  * on `abstract_work_thread_t`/`abstract_work_thread_factory_t` interfaces.
1551  * But the Traits-type might be extended by some content in future versions.
1552  *
1553  * \tparam Traits Type with traits for a dispatcher. For the requirements
1554  * for \a Traits type see the section "Requirements for traits type" above.
1555  *
1556  * \since
1557  * v.1.0.2
1558  */
1559 template< typename Traits = default_traits_t >
1560 [[nodiscard]]
1561 inline dispatcher_handle_t
1563  //! SObjectizer Environment to work in.
1564  environment_t & env,
1565  //! Value for creating names of data sources for
1566  //! run-time monitoring.
1567  const std::string_view data_sources_name_base,
1568  //! Parameters for the dispatcher.
1569  disp_params_t disp_params )
1570  {
1571  const auto io_svc_ptr = disp_params.io_context();
1572  if( !io_svc_ptr )
1575  "io_context is not set in disp_params" );
1576 
1577  if( !disp_params.thread_count() )
1579 
1582  // Type of result pointer.
1584  // Actual type of dispatcher without thread activity tracking.
1586  Traits,
1588  // Actual type of dispatcher with thread activity tracking.
1590  Traits,
1594  std::move(disp_params) );
1595 
1597  }
1598 
1599 } /* namespace asio_thread_pool */
1600 
1601 } /* namespace disp */
1602 
1603 } /* namespace extra */
1604 
1605 } /* namespace so_5 */
An implementation of a binder for asio_thread_pool dispatcher.
Definition: pub.hpp:802
A helper for declaration of static and thread_local pointer in a header file.
Definition: pub.hpp:436
::so_5::stats::prefix_t m_base_prefix
Basic prefix for data sources.
Definition: pub.hpp:1062
Parameters for asio_thread_pool dispatcher.
Definition: pub.hpp:61
::asio::io_context & io_context() noexcept
Get reference to io_context from that dispatcher.
Definition: pub.hpp:350
static void run_work_thread(environment_t &env, ::asio::io_context &io_svc, dispatcher_skeleton_without_thread_activity_tracking_t &, std::size_t)
Implementation of main function for a work thread.
Definition: pub.hpp:1205
virtual void launch_work_threads(environment_t &env) override
Definition: pub.hpp:1400
outliving_reference_t< ::asio::io_context::strand > m_strand
Strand to be used with this event_queue.
Definition: pub.hpp:876
std::shared_ptr< ::asio::io_context > io_context() const noexcept
Get the io_context.
Definition: pub.hpp:165
std::size_t thread_count() const noexcept
Get the count of work threads to be created.
Definition: pub.hpp:1025
A handle for asio_thread_pool dispatcher.
Definition: pub.hpp:242
void bind(agent_t &agent) noexcept override
Definition: pub.hpp:755
::so_5::stats::activity_tracking_stuff::stats_collector_t< ::so_5::stats::activity_tracking_stuff::internal_lock > m_work_activity
Collected activity stats.
Definition: pub.hpp:624
std::vector< std::unique_ptr< work_thread_activity_collector_t > > m_collectors
Collectors for run-time stats for every thread.
Definition: pub.hpp:1258
Basic stuff for all implementations of dispatcher.
Definition: pub.hpp:924
virtual void agent_unbound() noexcept=0
Notification about unbinding of an agent.
void agent_bound() noexcept override
Notification about binding of yet another agent.
Definition: pub.hpp:968
static void set_ptr(T *p) noexcept
Setter for the pointer.
Definition: pub.hpp:449
void activity_finished() noexcept
Mark completion of the current activity.
Definition: pub.hpp:661
void reset() noexcept
Drop the content of handle.
Definition: pub.hpp:366
::asio::io_context & io_context() const noexcept override
Get reference to io_context from that dispatcher.
Definition: pub.hpp:965
Base type for implementations of work thread wrappers.
Definition: pub.hpp:478
void push(execution_demand_t demand) override
Definition: pub.hpp:815
const std::shared_ptr< ::asio::io_context > m_io_context
IO Service to work with.
Definition: pub.hpp:1143
Template-based implementation of dispatcher.
Definition: pub.hpp:1370
disp_binder_shptr_t binder(::asio::io_context::strand &strand) const
Get a binder for that dispatcher.
Definition: pub.hpp:297
disp_params_t & thread_count(std::size_t count)
Setter for thread count.
Definition: pub.hpp:94
Basic part of implementation of a binder for asio_thread_pool dispatcher.
Definition: pub.hpp:728
An actual interface of thread pool dispatcher.
Definition: pub.hpp:391
virtual disp_data_source_t & data_source() noexcept=0
Get access to actual data source instance for that dispatcher.
const std::size_t m_thread_count
Count of work threads.
Definition: pub.hpp:1140
virtual void on_demand(execution_demand_t demand) noexcept override
Actual processing of the demand.
Definition: pub.hpp:595
static thread_local T * m_ptr
Value of the pointer which need to be stored.
Definition: pub.hpp:440
static void run(environment_t &env, ::asio::io_context &io_svc, Args &&... args)
Lauch processing of demand on the context of current thread.
Definition: pub.hpp:524
std::size_t thread_count() const
Getter for thread count.
Definition: pub.hpp:102
Ranges for error codes of each submodules.
Definition: details.hpp:13
void setup_thread_id(current_thread_id_t tid)
Setup ID of the current work thread.
Definition: pub.hpp:634
std::shared_ptr< ::asio::io_context > m_io_context
Asio&#39;s io_context which must be used with this dispatcher.
Definition: pub.hpp:178
disp_binder_shptr_t binder_with_external_strand(::asio::io_context::strand &strand) override
Create a binder for that dispatcher.
Definition: pub.hpp:946
static T * ptr() noexcept
Access to the current value of the pointer.
Definition: pub.hpp:445
virtual void on_demand(execution_demand_t demand) noexcept=0
Actual processing of the demand.
void start(environment_t &env, std::string_view data_sources_name_base)
Definition: pub.hpp:987
actual_dispatcher_shptr_t m_dispatcher
The actual dispatcher.
Definition: pub.hpp:774
outliving_reference_t< work_thread_activity_collector_t > m_activity_stats
Activity statistics.
Definition: pub.hpp:693
demands_counter_t m_demands_counter
Count of waiting demands.
Definition: pub.hpp:1156
virtual demands_counter_t & demands_counter() noexcept=0
Get a reference for counter of pending demands.
disp_params_t & use_own_io_context()
Use own Asio io_context object.
Definition: pub.hpp:157
disp_params_t & use_external_io_context(::asio::io_context &service)
Use external Asio io_context object with dispatcher.
Definition: pub.hpp:125
static void run_work_thread(environment_t &env, ::asio::io_context &io_svc, dispatcher_skeleton_with_thread_activity_tracking_t &self, std::size_t index)
Implementation of main function for a work thread.
Definition: pub.hpp:1328
std::vector< thread_holder_t > m_threads
Working threads.
Definition: pub.hpp:1397
disp_binder_shptr_t binder_with_own_strand() override
Create a binder for that dispatcher.
Definition: pub.hpp:957
::so_5::stats::repository_t * m_stats_repo
Data source repository.
Definition: pub.hpp:1071
basic_binder_impl_t(actual_dispatcher_shptr_t dispatcher)
Initializing constructor.
Definition: pub.hpp:734
virtual void launch_work_threads(environment_t &env)=0
Start all working threads.
demands_counter_t & demands_counter() noexcept override
Get a reference for counter of pending demands.
Definition: pub.hpp:980
static void handle_demand(execution_demand_t demand)
An interface method for passing a demand to processing.
Definition: pub.hpp:571
std::size_t default_thread_pool_size()
A helper function for detecting default thread count for thread pool.
Definition: pub.hpp:1488
Data source for run-time monitoring of whole dispatcher.
Definition: pub.hpp:1055
basic_dispatcher_skeleton_t(::so_5::environment_t &env, disp_params_t params)
Definition: pub.hpp:931
friend void swap(disp_params_t &a, disp_params_t &b) noexcept
Definition: pub.hpp:75
bool empty() const noexcept
Is this handle empty?
Definition: pub.hpp:257
virtual disp_data_source_t & data_source() noexcept override
Get access to actual data source instance for that dispatcher.
Definition: pub.hpp:1324
work_thread_with_activity_tracking_t(outliving_reference_t< work_thread_activity_collector_t > activity_stats)
Definition: pub.hpp:696
::asio::io_context::strand & strand() noexcept
Definition: pub.hpp:903
operator bool() const noexcept
Is this handle empty?
Definition: pub.hpp:357
disp_params_t & use_external_io_context(std::shared_ptr< ::asio::io_context > service)
Use external Asio io_context object with dispatcher.
Definition: pub.hpp:142
Default traits of asio_thread_pool dispatcher.
Definition: pub.hpp:1511
virtual disp_binder_shptr_t binder_with_own_strand()=0
Create a binder for that dispatcher.
impl::basic_dispatcher_iface_shptr_t m_dispatcher
A reference to actual implementation of a dispatcher.
Definition: pub.hpp:247
dispatcher_template_t(outliving_reference_t< environment_t > env, std::string_view data_sources_name_base, disp_params_t params)
Definition: pub.hpp:1373
std::atomic< std::size_t > m_agents_bound
Count of agents bound to that dispatcher.
Definition: pub.hpp:1153
current_thread_id_t thread_id() const noexcept
Get the ID of the thread.
Definition: pub.hpp:646
virtual disp_binder_shptr_t binder_with_external_strand(::asio::io_context::strand &)=0
Create a binder for that dispatcher.
bool operator!() const noexcept
Does this handle contain a reference to dispatcher?
Definition: pub.hpp:362
static dispatcher_handle_t make(actual_dispatcher_shptr_t disp) noexcept
Definition: pub.hpp:1469
const ::so_5::stats::prefix_t & base_prefix() const noexcept
Access to data source prefix for derived classes.
Definition: pub.hpp:1076
The very basic interface of asio_thread_pool dispatcher.
Definition: pub.hpp:197
Extension of basic dispatcher skeleton for the case when work thread activity must be collected...
Definition: pub.hpp:1237
Type of collector of work thread activity data.
Definition: pub.hpp:615
virtual void agent_bound() noexcept=0
Notification about binding of yet another agent.
virtual ::asio::io_context & io_context() const noexcept=0
Get reference to io_context from that dispatcher.
virtual disp_data_source_t & data_source() noexcept
Get access to actual data source instance for that dispatcher.
Definition: pub.hpp:1201
::asio::io_context::strand m_strand
Strand to be used with this event_queue.
Definition: pub.hpp:907
current_thread_id_t m_thread_id
ID of the work thread.
Definition: pub.hpp:485
Extension of basic dispatcher skeleton for the case when work thread activity is not collected...
Definition: pub.hpp:1189
::so_5::stats::work_thread_activity_stats_t take_activity_stats() noexcept
Get the current stats.
Definition: pub.hpp:670
binder_with_own_strand_t(actual_dispatcher_shptr_t dispatcher)
Definition: pub.hpp:896
virtual void wait_work_threads() noexcept=0
Wait for finish of all threads.
disp_params_t()=default
Default constructor.
virtual void on_demand(execution_demand_t demand) noexcept override
Actual processing of the demand.
Definition: pub.hpp:708
const ::so_5::disp::abstract_work_thread_factory_shptr_t m_thread_factory
Thread factory to be used with this dispatcher.
Definition: pub.hpp:1150
const int rc_io_context_is_not_set
Asio IoService is not set for asio_thread_pool dispatcher.
Definition: pub.hpp:47
current_thread_id_t thread_id() const noexcept
ID of the work thread.
Definition: pub.hpp:505
dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t disp_params)
A function for creation an instance of asio_thread_pool dispatcher.
Definition: pub.hpp:1562
void activity_started() noexcept
Mark start point of new activity.
Definition: pub.hpp:652
void agent_unbound() noexcept override
Notification about unbinding of an agent.
Definition: pub.hpp:974
current_thread_id_t m_thread_id
ID of thread for which activity stats is collected.
Definition: pub.hpp:619
thread_holder_t make_work_thread(environment_t &env, std::size_t index)
Definition: pub.hpp:1442
void push_evt_finish(execution_demand_t demand) noexcept override
Definition: pub.hpp:841
binder_with_external_strand_t(actual_dispatcher_shptr_t dispatcher, outliving_reference_t< ::asio::io_context::strand > strand)
Definition: pub.hpp:864
dispatcher_handle_t(impl::basic_dispatcher_iface_shptr_t dispatcher) noexcept
Definition: pub.hpp:249
std::size_t m_thread_count
Count of working threads.
Definition: pub.hpp:175
disp_binder_shptr_t binder() const
Get a binder for that dispatcher.
Definition: pub.hpp:338
void push_evt_start(execution_demand_t demand) override
Definition: pub.hpp:834
void undo_preallocation(agent_t &) noexcept override
Definition: pub.hpp:748
basic_dispatcher_skeleton_t & m_dispatcher
Dispatcher to work with.
Definition: pub.hpp:1059
::so_5::disp::abstract_work_thread_factory_shptr_t thread_factory() const noexcept
Get access to thread factory to be used for that dispatcher.
Definition: pub.hpp:1042