SObjectizer-5 Extra
simple_mtsafe.hpp
Go to the documentation of this file.
1 /*!
2  * \file
3  * \brief Implementation of Asio-based simple thread safe
4  * environment infrastructure.
5  */
6 
7 #pragma once
8 
9 #include <so_5_extra/env_infrastructures/asio/impl/common.hpp>
10 
11 #include <so_5/version.hpp>
12 #if SO_5_VERSION < SO_5_VERSION_MAKE(5u, 8u, 0u)
13 #error "SObjectizer-5.8.0 is required"
14 #endif
15 
16 #include <so_5/impl/st_env_infrastructure_reuse.hpp>
17 #include <so_5/impl/internal_env_iface.hpp>
18 #include <so_5/details/sync_helpers.hpp>
19 #include <so_5/details/at_scope_exit.hpp>
20 #include <so_5/details/invoke_noexcept_code.hpp>
21 
22 #include <asio.hpp>
23 
24 #include <string_view>
25 
26 namespace so_5 {
27 
28 namespace extra {
29 
30 namespace env_infrastructures {
31 
32 namespace asio {
33 
34 namespace simple_mtsafe {
35 
36 namespace impl {
37 
38 //! A short name for namespace with reusable stuff.
40 
41 //! A short name for namespace with common stuff.
43 
44 //
45 // shutdown_status_t
46 //
48 
49 //
50 // coop_repo_t
51 //
52 /*!
53  * \brief Implementation of coop_repository for
54  * simple thread-safe single-threaded environment infrastructure.
55  */
57 
58 //
59 // stats_controller_t
60 //
61 /*!
62  * \brief Implementation of stats_controller for that type of
63  * single-threaded environment.
64  */
65 using stats_controller_t =
67 
68 //
69 // event_queue_impl_t
70 //
71 /*!
72  * \brief Implementation of event_queue interface for the default dispatcher.
73  *
74  * \tparam Activity_Tracker A type for tracking work thread activity.
75  */
76 template< typename Activity_Tracker >
78  {
80 
81  public :
82  //! Type for representation of statistical data for this event queue.
83  struct stats_t
84  {
85  //! The current size of the demands queue.
87  };
88 
89  //! Initializing constructor.
91  //! Asio's io_context to be used for dispatching.
92  outliving_reference_t<::asio::io_context> io_svc,
93  //! Actual activity tracker.
94  outliving_reference_t<Activity_Tracker> activity_tracker )
95  : m_io_svc(io_svc)
98  {
99  }
100 
101  virtual void
102  push( execution_demand_t demand ) override
103  {
104  // Statistics must be updated.
106 
107  // Now we can schedule processing of the demand.
108  // It ::asio::post fails then statistics must be reverted.
110  [&] {
111  ::asio::post(
112  m_io_svc.get(),
113  [this, d = std::move(demand)]() mutable {
114  // Statistics must be updated.
116 
117  // Update wait statistics.
119  const auto wait_starter = ::so_5::details::at_scope_exit(
120  [this]{ m_activity_tracker.get().wait_started(); } );
121 
122  // The demand can be handled now.
123  // With working time tracking.
125  {
126  // For the case if call_handler will throw.
127  const auto stopper = ::so_5::details::at_scope_exit(
128  [this]{ m_activity_tracker.get().work_stopped(); });
129 
131  }
132  } );
133  },
134  [this] {
136  } );
137  }
138 
139  void
140  push_evt_start( execution_demand_t demand ) override
141  {
142  // Just delegate to the ordinary push().
143  this->push( std::move(demand) );
144  }
145 
146  void
147  push_evt_finish( execution_demand_t demand ) noexcept override
148  {
149  // Just delegate to the ordinary push() despite
150  // the fact that push() isn't noexcept.
151  this->push( std::move(demand) );
152  }
153 
154  //! Notification that event queue work is started.
155  void
157  //! ID of the main working thread.
158  current_thread_id_t thread_id )
159  {
161 
162  // There is no any pending demand now. We can start counting
163  // the waiting time.
165  }
166 
167  //! Get the current statistics.
168  stats_t
169  query_stats() const noexcept
170  {
172  }
173 
174  private :
177 
179 
181  };
182 
183 //
184 // disp_ds_name_parts_t
185 //
186 /*!
187  * \brief A class with major part of dispatcher name.
188  */
189 struct disp_ds_name_parts_t final
190  {
191  static constexpr std::string_view
192  disp_type_part() noexcept { return { "asio_mtsafe" }; }
193  };
194 
195 //
196 // default_dispatcher_t
197 //
198 /*!
199  * \brief An implementation of dispatcher to be used in
200  * places where default dispatcher is needed.
201  *
202  * \tparam Activity_Tracker a type of activity tracker to be used
203  * for run-time statistics.
204  *
205  * \since
206  * v.1.3.0
207  */
208 template< typename Activity_Tracker >
210  : public reusable::default_dispatcher_t<
214  {
219 
220  public :
222  outliving_reference_t< environment_t > env,
223  outliving_reference_t< event_queue_impl_t<Activity_Tracker> > event_queue,
224  outliving_reference_t< Activity_Tracker > activity_tracker )
226  {
227  // Event queue should be started manually.
228  // We known that the default dispatcher is created on a thread
229  // that will be used for events dispatching.
230  event_queue.get().start( this->thread_id() );
231  }
232  };
233 
234 //
235 // env_infrastructure_t
236 //
237 /*!
238  * \brief Default implementation of not-thread safe single-threaded environment
239  * infrastructure.
240  *
241  * \attention
242  * This object doesn't have any mutexes. All synchronization is done via
243  * delegation mutating operations to Asio's context (asio::post and
244  * asio::dispatch are used).
245  *
246  * \tparam Activity_Tracker A type of activity tracker to be used.
247  */
248 template< typename Activity_Tracker >
251  {
252  public :
254  //! Asio's io_context to be used.
255  outliving_reference_t<::asio::io_context> io_svc,
256  //! Environment to work in.
257  environment_t & env,
258  //! Cooperation action listener.
259  coop_listener_unique_ptr_t coop_listener,
260  //! Mbox for distribution of run-time stats.
261  mbox_t stats_distribution_mbox );
262 
263  void
264  launch( env_init_t init_fn ) override;
265 
266  /*!
267  * \attention
268  * Since SO-5.8.0 this method should be a noexcept, but
269  * Asio's post() can throw. In that case the whole application
270  * will be terminated.
271  */
272  void
273  stop() noexcept override;
274 
275  [[nodiscard]]
277  make_coop(
280 
283  coop_unique_holder_t coop ) override;
284 
285  /*!
286  * \attention
287  * This method should be a noexcept, but it uses Asio's post() and the
288  * post() call can throw. In that case the whole application will be
289  * terminated.
290  */
291  void
293  coop_shptr_t coop ) noexcept override;
294 
295  bool
297  coop_shptr_t coop_name ) noexcept override;
298 
301  const std::type_index & type_wrapper,
302  const message_ref_t & msg,
303  const mbox_t & mbox,
305  std::chrono::steady_clock::duration period ) override;
306 
307  void
308  single_timer(
309  const std::type_index & type_wrapper,
310  const message_ref_t & msg,
311  const mbox_t & mbox,
312  std::chrono::steady_clock::duration pause ) override;
313 
315  stats_controller() noexcept override;
316 
318  stats_repository() noexcept override;
319 
321  query_coop_repository_stats() override;
322 
324  query_timer_thread_stats() override;
325 
327  make_default_disp_binder() override;
328 
329  private :
330  //! Asio's io_context to be used.
332 
333  //! Actual SObjectizer Environment.
335 
336  //! Status of shutdown procedure.
338 
339  //! Repository of registered coops.
341 
342  //! Actual activity tracker.
343  Activity_Tracker m_activity_tracker;
344 
345  //! Event queue which is necessary for the default dispatcher.
346  event_queue_impl_t< Activity_Tracker > m_event_queue;
347 
348  //! Dispatcher to be used as default dispatcher.
349  /*!
350  * \note
351  * Has an actual value only inside launch() method.
352  */
354 
355  //! Stats controller for this environment.
357 
358  //! Counter of cooperations which are waiting for final deregistration
359  //! step.
360  /*!
361  * It is necessary for building correct run-time stats.
362  */
364 
365  //! The pointer to an exception that was thrown during init phase.
366  /*!
367  * This exception is stored inside a callback posted to Asio.
368  * An then this exception will be rethrown from launch() method
369  * after the shutdown of SObjectizer.
370  */
372 
373  void
374  run_default_dispatcher_and_go_further( env_init_t init_fn );
375 
376  /*!
377  * \note Calls m_io_svc.stop() and m_default_disp.shutdown() if necessary.
378  *
379  * \attention Must be called only for locked object!
380  */
381  void
383  };
384 
385 template< typename Activity_Tracker >
387  outliving_reference_t<::asio::io_context> io_svc,
388  environment_t & env,
389  coop_listener_unique_ptr_t coop_listener,
390  mbox_t stats_distribution_mbox )
391  : m_io_svc( io_svc )
392  , m_env( env )
400  {}
401 
402 template< typename Activity_Tracker >
403 void
404 env_infrastructure_t<Activity_Tracker>::launch( env_init_t init_fn )
405  {
406  // Post initial operation to Asio event loop.
407  ::asio::post( m_io_svc.get(), [this, init = std::move(init_fn)] {
409  } );
410 
411  // Default dispatcher should be destroyed on exit from this function.
414  } );
415 
416  // Tell that there is a work to do.
417  auto work = ::asio::make_work_guard( m_io_svc.get() );
418 
419  // Launch Asio event loop.
420  m_io_svc.get().run();
421 
423  // Some exception was thrown during initialization.
424  // It should be rethrown.
426  }
427 
428 template< typename Activity_Tracker >
429 void
430 env_infrastructure_t<Activity_Tracker>::stop() noexcept
431  {
436  {
437  // All registered cooperations must be deregistered now.
438  ::asio::dispatch( m_io_svc.get(),
439  [this] {
441 
443 
445  } );
446  }
447  else
448  // Check for shutdown completeness must be performed only
449  // on the main Asio's thread.
450  ::asio::dispatch( m_io_svc.get(), [this] {
452  } );
453  }
454 
455 template< typename Activity_Tracker >
457 env_infrastructure_t< Activity_Tracker >::make_coop(
460  {
461  return m_coop_repo.make_coop(
462  std::move(parent),
463  std::move(default_binder) );
464  }
465 
466 template< typename Activity_Tracker >
468 env_infrastructure_t< Activity_Tracker >::register_coop(
470  {
471  return m_coop_repo.register_coop( std::move(coop) );
472  }
473 
474 template< typename Activity_Tracker >
475 void
477  coop_shptr_t coop_to_dereg ) noexcept
478  {
480 
481  ::asio::post( m_io_svc.get(), [this, coop = std::move(coop_to_dereg)] {
485  } );
486  }
487 
488 template< typename Activity_Tracker >
489 bool
491  coop_shptr_t coop ) noexcept
492  {
495  }
496 
497 template< typename Activity_Tracker >
500  const std::type_index & type_index,
501  const message_ref_t & msg,
502  const mbox_t & mbox,
505  {
506  using namespace asio_common;
507 
508  // We do not control shutdown_status_t here. Because it seems
509  // to be safe to call schedule_timer after call to stop().
510  // New timer will simply ignored during shutdown process.
512  if( period != std::chrono::steady_clock::duration::zero() )
513  {
516  m_io_svc.get(),
517  type_index,
518  msg,
519  mbox,
520  period ) };
521 
522  result = timer_id_t{
524 
526  }
527  else
528  {
531  m_io_svc.get(),
532  type_index,
533  msg,
534  mbox ) };
535 
536  result = timer_id_t{
538 
540  }
541 
542  return result;
543  }
544 
545 template< typename Activity_Tracker >
546 void
548  const std::type_index & type_index,
549  const message_ref_t & msg,
550  const mbox_t & mbox,
551  std::chrono::steady_clock::duration pause )
552  {
553  using namespace asio_common;
554 
555  // We do not control shutdown_status_t here. Because it seems
556  // to be safe to call schedule_timer after call to stop().
557  // New timer will simply ignored during shutdown process.
558 
561  m_io_svc.get(),
562  type_index,
563  msg,
564  mbox ) };
565 
567  }
568 
569 template< typename Activity_Tracker >
571 env_infrastructure_t<Activity_Tracker>::stats_controller() noexcept
572  {
573  return m_stats_controller;
574  }
575 
576 template< typename Activity_Tracker >
578 env_infrastructure_t<Activity_Tracker>::stats_repository() noexcept
579  {
580  return m_stats_controller;
581  }
582 
583 template< typename Activity_Tracker >
586  {
587  const auto stats = m_coop_repo.query_stats();
588 
593  };
594  }
595 
596 template< typename Activity_Tracker >
599  {
600  // NOTE: this type of environment_infrastructure doesn't support
601  // statistics for timers.
602  return { 0, 0 };
603  }
604 
605 template< typename Activity_Tracker >
608  {
609  return { m_default_disp };
610  }
611 
612 template< typename Activity_Tracker >
613 void
615  env_init_t init_fn )
616  {
617  try
618  {
624 
625  // User-supplied init can be called now.
626  init_fn();
627  }
628  catch(...)
629  {
630  // We can't restore if the following fragment throws and exception.
632  // The current exception should be stored to be
633  // rethrown later.
635 
636  // SObjectizer's shutdown should be initiated.
637  stop();
638 
639  // NOTE: pointer to the default dispatcher will be dropped
640  // in launch() method.
641  } );
642  }
643  }
644 
645 template< typename Activity_Tracker >
646 void
648  {
650  {
651  // If there is no more live coops then shutdown must be
652  // completed.
653  if( !m_coop_repo.has_live_coop() )
654  {
656  // Asio's event loop must be broken here!
657  m_io_svc.get().stop();
658  }
659  }
660  }
661 
662 } /* namespace impl */
663 
664 //
665 // factory
666 //
667 /*!
668  * \brief A factory for creation of environment infrastructure based on
669  * Asio's event loop.
670  *
671  * \attention
672  * This environment infrastructure is not a thread safe.
673  *
674  * Usage example:
675  * \code
676 int main()
677 {
678  asio::io_context io_svc;
679 
680  so_5::launch( [](so_5::environment_t & env) {
681  ... // Some initialization stuff.
682  },
683  [&io_svc](so_5::environment_params_t & params) {
684  using asio_env = so_5::extra::env_infrastructures::asio::simple_mtsafe;
685 
686  params.infrastructure_factory( asio_env::factory(io_svc) );
687  } );
688 
689  return 0;
690 }
691  * \endcode
692  */
695  {
696  using namespace impl;
697 
698  return [&io_svc](
699  environment_t & env,
700  environment_params_t & env_params,
701  mbox_t stats_distribution_mbox )
702  {
703  environment_infrastructure_t * obj = nullptr;
704 
705  // Create environment infrastructure object in dependence of
706  // work thread activity tracking flag.
707  const auto tracking = env_params.work_thread_activity_tracking();
708  if( work_thread_activity_tracking_t::on == tracking )
709  obj = new env_infrastructure_t< reusable::real_activity_tracker_t >(
710  outliving_mutable(io_svc),
711  env,
712  env_params.so5_giveout_coop_listener(),
713  std::move(stats_distribution_mbox) );
714  else
715  obj = new env_infrastructure_t< reusable::fake_activity_tracker_t >(
716  outliving_mutable(io_svc),
717  env,
718  env_params.so5_giveout_coop_listener(),
719  std::move(stats_distribution_mbox) );
720 
721  return environment_infrastructure_unique_ptr_t(
722  obj,
723  environment_infrastructure_t::default_deleter() );
724  };
725  }
726 
727 } /* namespace simple_mtsafe */
728 
729 } /* namespace asio */
730 
731 } /* namespace env_infrastructures */
732 
733 } /* namespace extra */
734 
735 } /* namespace so_5 */
Type for representation of statistical data for this event queue.
env_infrastructure_t(outliving_reference_t<::asio::io_context > io_svc, environment_t &env, coop_listener_unique_ptr_t coop_listener, mbox_t stats_distribution_mbox)
std::shared_ptr< default_dispatcher_t< Activity_Tracker > > m_default_disp
Dispatcher to be used as default dispatcher.
environment_infrastructure_factory_t factory(::asio::io_context &io_svc)
A factory for creation of environment infrastructure based on Asio&#39;s event loop.
Implementation of event_queue interface for the default dispatcher.
std::exception_ptr m_exception_from_init
The pointer to an exception that was thrown during init phase.
so_5::timer_id_t schedule_timer(const std::type_index &type_wrapper, const message_ref_t &msg, const mbox_t &mbox, std::chrono::steady_clock::duration pause, std::chrono::steady_clock::duration period) override
An implementation of dispatcher to be used in places where default dispatcher is needed.
default_dispatcher_t(outliving_reference_t< environment_t > env, outliving_reference_t< event_queue_impl_t< Activity_Tracker > > event_queue, outliving_reference_t< Activity_Tracker > activity_tracker)
Ranges for error codes of each submodules.
Definition: details.hpp:13
stats_t query_stats() const noexcept
Get the current statistics.
void start(current_thread_id_t thread_id)
Notification that event queue work is started.
std::atomic< std::size_t > m_final_dereg_coop_count
Counter of cooperations which are waiting for final deregistration step.
coop_unique_holder_t make_coop(coop_handle_t parent, disp_binder_shptr_t default_binder) override
event_queue_impl_t(outliving_reference_t<::asio::io_context > io_svc, outliving_reference_t< Activity_Tracker > activity_tracker)
Initializing constructor.
so_5::environment_infrastructure_t::coop_repository_stats_t query_coop_repository_stats() override
event_queue_impl_t< Activity_Tracker > m_event_queue
Event queue which is necessary for the default dispatcher.
stats_controller_t m_stats_controller
Stats controller for this environment.
Default implementation of not-thread safe single-threaded environment infrastructure.
std::atomic< shutdown_status_t > m_shutdown_status
Status of shutdown procedure.
outliving_reference_t< ::asio::io_context > m_io_svc
Asio&#39;s io_context to be used.
void single_timer(const std::type_index &type_wrapper, const message_ref_t &msg, const mbox_t &mbox, std::chrono::steady_clock::duration pause) override