SObjectizer-5 Extra
simple_mtsafe.hpp
Go to the documentation of this file.
1 /*!
2  * \file
3  * \brief Implementation of Asio-based simple thread safe
4  * environment infrastructure.
5  */
6 
7 #pragma once
8 
9 #include <so_5_extra/env_infrastructures/asio/impl/common.hpp>
10 
11 #include <so_5/rt/impl/h/st_env_infrastructure_reuse.hpp>
12 #include <so_5/details/h/sync_helpers.hpp>
13 #include <so_5/details/h/at_scope_exit.hpp>
14 #include <so_5/details/h/invoke_noexcept_code.hpp>
15 
16 #include <so_5/h/stdcpp.hpp>
17 
18 #include <asio.hpp>
19 
20 namespace so_5 {
21 
22 namespace extra {
23 
24 namespace env_infrastructures {
25 
26 namespace asio {
27 
28 namespace simple_mtsafe {
29 
30 namespace impl {
31 
32 //! A short name for namespace with reusable stuff.
34 
35 //! A short name for namespace with common stuff.
37 
38 //
39 // shutdown_status_t
40 //
42 
43 //
44 // coop_repo_t
45 //
46 /*!
47  * \brief Implementation of coop_repository for
48  * simple thread-safe single-threaded environment infrastructure.
49  */
51 
52 //
53 // stats_controller_t
54 //
55 /*!
56  * \brief Implementation of stats_controller for that type of
57  * single-threaded environment.
58  */
59 using stats_controller_t =
61 
62 //
63 // event_queue_impl_t
64 //
65 /*!
66  * \brief Implementation of event_queue interface for the default dispatcher.
67  *
68  * \tparam Activity_Tracker A type for tracking work thread activity.
69  */
70 template< typename Activity_Tracker >
72  {
74 
75  public :
76  //! Type for representation of statistical data for this event queue.
77  struct stats_t
78  {
79  //! The current size of the demands queue.
81  };
82 
83  //! Initializing constructor.
85  //! Asio's io_context to be used for dispatching.
86  outliving_reference_t<::asio::io_context> io_svc,
87  //! Actual activity tracker.
88  outliving_reference_t<Activity_Tracker> activity_tracker )
89  : m_io_svc(io_svc)
92  {
93  }
94 
95  virtual void
96  push( execution_demand_t demand ) override
97  {
98  // Statistics must be updated.
100 
101  // Now we can schedule processing of the demand.
102  // It ::asio::post fails then statistics must be reverted.
104  [&] {
105  ::asio::post(
106  m_io_svc.get(),
107  [this, d = std::move(demand)]() mutable {
108  // Statistics must be updated.
110 
111  // Update wait statistics.
113  const auto wait_starter = ::so_5::details::at_scope_exit(
114  [this]{ m_activity_tracker.get().wait_started(); } );
115 
116  // The demand can be handled now.
117  // With working time tracking.
119  {
120  // For the case if call_handler will throw.
121  const auto stopper = ::so_5::details::at_scope_exit(
122  [this]{ m_activity_tracker.get().work_stopped(); });
123 
125  }
126  } );
127  },
128  [this] {
130  } );
131  }
132 
133  //! Notification that event queue work is started.
134  void
136  //! ID of the main working thread.
137  current_thread_id_t thread_id )
138  {
140 
141  // There is no any pending demand now. We can start counting
142  // the waiting time.
144  }
145 
146  //! Get the current statistics.
147  stats_t
148  query_stats() const noexcept
149  {
151  }
152 
153  private :
156 
158 
160  };
161 
162 //
163 // disp_ds_name_parts_t
164 //
165 /*!
166  * \brief A class with major part of dispatcher name.
167  */
168 struct disp_ds_name_parts_t final
169  {
170  static const char * disp_type_part() noexcept { return "asio_mtsafe"; }
171  };
172 
173 //
174 // default_disp_impl_basis_t
175 //
176 /*!
177  * \brief A basic part of implementation of dispatcher interface to be used in
178  * places where default dispatcher is needed.
179  */
180 template< typename Activity_Tracker >
183 
184 //
185 // default_disp_binder_t
186 //
187 /*!
188  * \brief An implementation of disp_binder interface for default dispatcher
189  * for this environment infrastructure.
190  */
191 template< typename Activity_Tracker >
192 using default_disp_binder_t =
195 
196 //
197 // default_disp_impl_t
198 //
199 /*!
200  * \brief An implementation of dispatcher interface to be used in
201  * places where default dispatcher is needed.
202  *
203  * \tparam Activity_Tracker a type of activity tracker to be used
204  * for run-time statistics.
205  *
206  * \since
207  * v.5.5.19
208  */
209 template< typename Activity_Tracker >
210 using default_disp_impl_t =
215 
216 //
217 // env_infrastructure_t
218 //
219 /*!
220  * \brief Default implementation of not-thread safe single-threaded environment
221  * infrastructure.
222  *
223  * \attention
224  * This object doesn't have any mutexes. All synchronization is done via delegation
225  * mutating operations to Asio's context (asio::post and asio::dispatch are used).
226  *
227  * \tparam Activity_Tracker A type of activity tracker to be used.
228  */
229 template< typename Activity_Tracker >
232  {
233  public :
235  //! Asio's io_context to be used.
236  outliving_reference_t<::asio::io_context> io_svc,
237  //! Environment to work in.
238  environment_t & env,
239  //! Cooperation action listener.
240  coop_listener_unique_ptr_t coop_listener,
241  //! Mbox for distribution of run-time stats.
242  mbox_t stats_distribution_mbox );
243 
244  virtual void
245  launch( env_init_t init_fn ) override;
246 
247  virtual void
248  stop() override;
249 
250  virtual void
252  coop_unique_ptr_t coop ) override;
253 
254  virtual void
256  nonempty_name_t name,
257  coop_dereg_reason_t dereg_reason ) override;
258 
259  virtual void
261  coop_t * coop ) override;
262 
263  virtual bool
265  std::string coop_name ) override;
266 
267  virtual so_5::timer_id_t
269  const std::type_index & type_wrapper,
270  const message_ref_t & msg,
271  const mbox_t & mbox,
273  std::chrono::steady_clock::duration period ) override;
274 
275  virtual void
276  single_timer(
277  const std::type_index & type_wrapper,
278  const message_ref_t & msg,
279  const mbox_t & mbox,
280  std::chrono::steady_clock::duration pause ) override;
281 
282  virtual stats::controller_t &
283  stats_controller() noexcept override;
284 
285  virtual stats::repository_t &
286  stats_repository() noexcept override;
287 
288  virtual dispatcher_t &
289  query_default_dispatcher() override;
290 
292  query_coop_repository_stats() override;
293 
294  virtual timer_thread_stats_t
295  query_timer_thread_stats() override;
296 
298  make_default_disp_binder() override;
299 
300  private :
301  //! Asio's io_context to be used.
303 
304  //! Actual SObjectizer Environment.
306 
307  //! Status of shutdown procedure.
309 
310  //! Repository of registered coops.
312 
313  //! Actual activity tracker.
314  Activity_Tracker m_activity_tracker;
315 
316  //! Event queue which is necessary for the default dispatcher.
317  event_queue_impl_t< Activity_Tracker > m_event_queue;
318 
319  //! Dispatcher to be used as default dispatcher.
321 
322  //! Stats controller for this environment.
324 
325  //! Counter of cooperations which are waiting for final deregistration
326  //! step.
327  /*!
328  * It is necessary for building correct run-time stats.
329  */
331 
332  void
333  run_default_dispatcher_and_go_further( env_init_t init_fn );
334 
335  /*!
336  * \note Calls m_io_svc.stop() and m_default_disp.shutdown() if necessary.
337  *
338  * \attention Must be called only for locked object!
339  */
340  void
342  };
343 
344 template< typename Activity_Tracker >
346  outliving_reference_t<::asio::io_context> io_svc,
347  environment_t & env,
348  coop_listener_unique_ptr_t coop_listener,
349  mbox_t stats_distribution_mbox )
350  : m_io_svc( io_svc )
351  , m_env( env )
355  , m_default_disp(
359  m_env,
363  {}
364 
365 template< typename Activity_Tracker >
366 void
367 env_infrastructure_t<Activity_Tracker>::launch( env_init_t init_fn )
368  {
369  // Post initial operation to Asio event loop.
370  ::asio::post( m_io_svc.get(), [this, init = std::move(init_fn)] {
372  } );
373 
374  // Tell that there is a work to do.
375  auto work = ::asio::make_work_guard( m_io_svc.get() );
376 
377  // Launch Asio event loop.
378  m_io_svc.get().run();
379  }
380 
381 template< typename Activity_Tracker >
382 void
383 env_infrastructure_t<Activity_Tracker>::stop()
384  {
385  // NOTE: if the code below throws then we don't know the actual
386  // state of env_infrastructure. Because of that we just terminate
387  // the whole application the the case of an exception.
393  {
394  // All registered cooperations must be deregistered now.
395  ::asio::dispatch( m_io_svc.get(),
396  [this] {
398 
400 
402  } );
403  }
404  else
405  // Check for shutdown completeness must be performed only
406  // on the main Asio's thread.
407  ::asio::dispatch( m_io_svc.get(), [this] {
409  } );
410  } );
411  }
412 
413 template< typename Activity_Tracker >
414 void
416  coop_unique_ptr_t coop )
417  {
419  }
420 
421 template< typename Activity_Tracker >
422 void
424  nonempty_name_t name,
425  coop_dereg_reason_t dereg_reason )
426  {
428  }
429 
430 template< typename Activity_Tracker >
431 void
433  coop_t * coop )
434  {
436 
437  ::asio::post( m_io_svc.get(), [this, coop] {
440  } );
441  }
442 
443 template< typename Activity_Tracker >
444 bool
446  std::string coop_name )
447  {
450  }
451 
452 template< typename Activity_Tracker >
455  const std::type_index & type_index,
456  const message_ref_t & msg,
457  const mbox_t & mbox,
460  {
461  using namespace asio_common;
462 
463  // We do not control shutdown_status_t here. Because it seems
464  // to be safe to call schedule_timer after call to stop().
465  // New timer will simply ignored during shutdown process.
467  if( period != std::chrono::steady_clock::duration::zero() )
468  {
471  m_io_svc.get(),
472  type_index,
473  msg,
474  mbox,
475  period ) };
476 
477  result = timer_id_t{
479 
481  }
482  else
483  {
486  m_io_svc.get(),
487  type_index,
488  msg,
489  mbox ) };
490 
491  result = timer_id_t{
493 
495  }
496 
497  return result;
498  }
499 
500 template< typename Activity_Tracker >
501 void
503  const std::type_index & type_index,
504  const message_ref_t & msg,
505  const mbox_t & mbox,
506  std::chrono::steady_clock::duration pause )
507  {
508  using namespace asio_common;
509 
510  // We do not control shutdown_status_t here. Because it seems
511  // to be safe to call schedule_timer after call to stop().
512  // New timer will simply ignored during shutdown process.
513 
516  m_io_svc.get(),
517  type_index,
518  msg,
519  mbox ) };
520 
522  }
523 
524 template< typename Activity_Tracker >
526 env_infrastructure_t<Activity_Tracker>::stats_controller() noexcept
527  {
528  return m_stats_controller;
529  }
530 
531 template< typename Activity_Tracker >
533 env_infrastructure_t<Activity_Tracker>::stats_repository() noexcept
534  {
535  return m_stats_controller;
536  }
537 
538 template< typename Activity_Tracker >
539 dispatcher_t &
541  {
542  return m_default_disp;
543  }
544 
545 template< typename Activity_Tracker >
548  {
549  const auto stats = m_coop_repo.query_stats();
550 
556  };
557  }
558 
559 template< typename Activity_Tracker >
562  {
563  // NOTE: this type of environment_infrastructure doesn't support
564  // statistics for timers.
565  return { 0, 0 };
566  }
567 
568 template< typename Activity_Tracker >
571  {
574  }
575 
576 template< typename Activity_Tracker >
577 void
579  env_init_t init_fn )
580  {
581  bool default_disp_started = false;
582 
583  try
584  {
585  // Event queue must know ID of the current thread.
586  // It also must start counting the waiting time.
588 
589  // Now we can start the default dispatcher.
592 
593  // Now, if init_fn will throw we must call shutdown() for
594  // the default dispatcher.
595  default_disp_started = true;
596 
597  // User-supplied init can be called now.
598  init_fn();
599  }
600  catch(...)
601  {
604 
605  throw;
606  }
607  }
608 
609 template< typename Activity_Tracker >
610 void
612  {
614  {
615  // If there is no more live coops then shutdown must be
616  // completed.
617  if( !m_coop_repo.has_live_coop() )
618  {
620  // Asio's event loop must be broken here!
621  m_io_svc.get().stop();
622 
623  // Default dispatcher can be shut down now.
625  }
626  }
627  }
628 
629 } /* namespace impl */
630 
631 //
632 // factory
633 //
634 /*!
635  * \brief A factory for creation of environment infrastructure based on
636  * Asio's event loop.
637  *
638  * \attention
639  * This environment infrastructure is not a thread safe.
640  *
641  * Usage example:
642  * \code
643 int main()
644 {
645  asio::io_context io_svc;
646 
647  so_5::launch( [](so_5::environment_t & env) {
648  ... // Some initialization stuff.
649  },
650  [&io_svc](so_5::environment_params_t & params) {
651  using asio_env = so_5::extra::env_infrastructures::asio::simple_mtsafe;
652 
653  params.infrastructure_factory( asio_env::factory(io_svc) );
654  } );
655 
656  return 0;
657 }
658  * \endcode
659  */
662  {
663  using namespace impl;
664 
665  return [&io_svc](
666  environment_t & env,
667  environment_params_t & env_params,
668  mbox_t stats_distribution_mbox )
669  {
670  environment_infrastructure_t * obj = nullptr;
671 
672  // Create environment infrastructure object in dependence of
673  // work thread activity tracking flag.
674  const auto tracking = env_params.work_thread_activity_tracking();
675  if( work_thread_activity_tracking_t::on == tracking )
676  obj = new env_infrastructure_t< reusable::real_activity_tracker_t >(
677  outliving_mutable(io_svc),
678  env,
679  env_params.so5__giveout_coop_listener(),
680  std::move(stats_distribution_mbox) );
681  else
682  obj = new env_infrastructure_t< reusable::fake_activity_tracker_t >(
683  outliving_mutable(io_svc),
684  env,
685  env_params.so5__giveout_coop_listener(),
686  std::move(stats_distribution_mbox) );
687 
688  return environment_infrastructure_unique_ptr_t(
689  obj,
690  environment_infrastructure_t::default_deleter() );
691  };
692  }
693 
694 } /* namespace simple_mtsafe */
695 
696 } /* namespace asio */
697 
698 } /* namespace env_infrastructures */
699 
700 } /* namespace extra */
701 
702 } /* namespace so_5 */
Type for representation of statistical data for this event queue.
env_infrastructure_t(outliving_reference_t<::asio::io_context > io_svc, environment_t &env, coop_listener_unique_ptr_t coop_listener, mbox_t stats_distribution_mbox)
environment_infrastructure_factory_t factory(::asio::io_context &io_svc)
A factory for creation of environment infrastructure based on Asio&#39;s event loop.
Implementation of event_queue interface for the default dispatcher.
virtual so_5::timer_id_t schedule_timer(const std::type_index &type_wrapper, const message_ref_t &msg, const mbox_t &mbox, std::chrono::steady_clock::duration pause, std::chrono::steady_clock::duration period) override
Ranges for error codes of each submodules.
Definition: details.hpp:14
stats_t query_stats() const noexcept
Get the current statistics.
void start(current_thread_id_t thread_id)
Notification that event queue work is started.
std::atomic< std::size_t > m_final_dereg_coop_count
Counter of cooperations which are waiting for final deregistration step.
event_queue_impl_t(outliving_reference_t<::asio::io_context > io_svc, outliving_reference_t< Activity_Tracker > activity_tracker)
Initializing constructor.
virtual so_5::environment_infrastructure_t::coop_repository_stats_t query_coop_repository_stats() override
event_queue_impl_t< Activity_Tracker > m_event_queue
Event queue which is necessary for the default dispatcher.
stats_controller_t m_stats_controller
Stats controller for this environment.
virtual void deregister_coop(nonempty_name_t name, coop_dereg_reason_t dereg_reason) override
Default implementation of not-thread safe single-threaded environment infrastructure.
std::atomic< shutdown_status_t > m_shutdown_status
Status of shutdown procedure.
default_disp_impl_t< Activity_Tracker > m_default_disp
Dispatcher to be used as default dispatcher.
outliving_reference_t< ::asio::io_context > m_io_svc
Asio&#39;s io_context to be used.
virtual void single_timer(const std::type_index &type_wrapper, const message_ref_t &msg, const mbox_t &mbox, std::chrono::steady_clock::duration pause) override