SObjectizer-5 Extra
time_limited.hpp
Go to the documentation of this file.
1 /*!
2  * \file
3  * \brief Implementation of time-limited asynchronous one-time operation.
4  *
5  * \since
6  * v.1.0.4
7  */
8 
9 #pragma once
10 
11 #include <so_5_extra/async_op/details.hpp>
12 #include <so_5_extra/async_op/errors.hpp>
13 
14 #include <so_5/details/invoke_noexcept_code.hpp>
15 
16 #include <so_5/impl/internal_env_iface.hpp>
17 
18 #include <so_5/agent.hpp>
19 #include <so_5/send_functions.hpp>
20 
21 #include <so_5/timers.hpp>
22 
23 #include <so_5/outliving.hpp>
24 
25 #include <chrono>
26 #include <vector>
27 
28 namespace so_5 {
29 
30 namespace extra {
31 
32 namespace async_op {
33 
34 namespace time_limited {
35 
36 //! Enumeration for status of operation.
37 enum class status_t
38  {
39  //! Status of operation is unknown because the
40  //! operation data has been moved to another proxy-object.
42  //! Operation is not activated yet.
44  //! Operation is activated.
45  activated,
46  //! Operation is completed.
47  completed,
48  //! Operation is cancelled.
49  cancelled,
50  //! Operation is timed-out.
51  timedout
52  };
53 
54 namespace details {
55 
56 /*!
57  * \brief Container of all data related to async operation.
58  *
59  * \attention
60  * There can be cyclic references from op_data_t instance to
61  * completion/timeout handlers and back from completion/timeout
62  * handlers to op_data_t. Because of that op_data_t can't
63  * perform cleanup in its destructor because the destructor
64  * will not be called until these cyclic references exist.
65  * It requires special attention: content of op_data_t must
66  * be cleared by owners of op_data_t instances.
67  *
68  * \since
69  * v.1.0.4
70  */
71 class op_data_t : protected ::so_5::atomic_refcounted_t
72  {
73  private :
74  friend class ::so_5::intrusive_ptr_t<op_data_t>;
75 
76  //! Description of one completion handler subscription.
78  {
79  //! Mbox from that a message is expected.
81 
82  //! State for that a subscription should be created.
84 
85  //! Subscription type.
86  /*!
87  * \note
88  * This is a subscription type. Not a type which will
89  * be passed to the event handler.
90  */
92 
93  //! Event handler.
95 
96  //! Initializing constructor.
98  so_5::mbox_t mbox,
99  const so_5::state_t & state,
100  std::type_index subscription_type,
101  so_5::event_handler_method_t handler )
102  : m_mbox( std::move(mbox) )
105  , m_handler( std::move(handler) )
106  {}
107  };
108 
109  //! Description of one timeout handler subscription.
111  {
112  //! State for that a subscription should be created.
114 
115  //! Event handler.
117 
118  //! Initializing constructor.
120  const so_5::state_t & state,
121  so_5::event_handler_method_t handler )
123  , m_handler( std::move(handler) )
124  {}
125  };
126 
127  //! Owner of async operation.
129 
130  //! Type of timeout message.
132 
133  //! The status of the async operation.
135 
136  //! Subscriptions for completion handlers which should be created on
137  //! activation.
139 
140  //! Subscriptions for timeout handlers which should be created on
141  //! activation.
143 
144  //! A default timeout handler which will be used as
145  //! deadletter handler for timeout message/signal.
146  /*!
147  * \note
148  * Can be nullptr. If so the default timeout handler will
149  * be created during activation procedure.
150  */
152 
153  //! A mbox to which timeout message/signal will be sent.
154  /*!
155  * \note
156  * It will be limitless MPSC-mbox.
157  */
159 
160  //! An ID of timeout message/signal.
161  /*!
162  * Will be used for cancellation of async operation.
163  */
165 
166  //! Create subscriptions for all defined completion handlers.
167  /*!
168  * This method will rollback all subscriptions made in case of
169  * an exception. It means that if an exception is throw during
170  * subscription then all already subscribed completion handlers
171  * will be removed.
172  */
173  void
175  {
176  std::size_t i = 0;
177  ::so_5::details::do_with_rollback_on_exception( [&] {
178  for(; i != m_completion_handlers.size(); ++i )
179  {
180  auto & ch = m_completion_handlers[ i ];
181  m_owner.get().so_create_event_subscription(
182  ch.m_mbox,
183  ch.m_subscription_type,
184  ch.m_state.get(),
185  ch.m_handler,
186  ::so_5::thread_safety_t::unsafe,
187  ::so_5::event_handler_kind_t::final_handler );
188  }
189  },
190  [&] {
191  drop_completion_handlers_subscriptions_up_to( i );
192  } );
193  }
194 
195  //! Removes subscription for the first N completion handlers.
196  void
198  //! Upper bound in m_completion_handlers (not included).
199  std::size_t upper_border ) noexcept
200  {
201  for( std::size_t i = 0; i != upper_border; ++i )
202  {
203  const auto & ch = m_completion_handlers[ i ];
204  m_owner.get().so_destroy_event_subscription(
205  ch.m_mbox,
206  ch.m_subscription_type,
207  ch.m_state.get() );
208  }
209  }
210 
211  //! Create subscriptions for all defined timeout handlers
212  //! (including default handler).
213  /*!
214  * This method will rollback all subscriptions made in case of
215  * an exception. It means that if an exception is throw during
216  * subscription then all already subscribed timeout handlers
217  * will be removed.
218  */
219  void
221  {
223  ::so_5::details::do_with_rollback_on_exception( [&] {
224  do_subscribe_default_timeout_handler();
225  },
226  [&] {
227  do_unsubscribe_default_timeout_handler();
228  } );
229  }
230 
231  //! An implementation of subscription of timeout handlers.
232  /*!
233  * Default timeout handler is not subscribed by this method.
234  */
235  void
237  {
238  std::size_t i = 0;
239  ::so_5::details::do_with_rollback_on_exception( [&] {
240  for(; i != m_timeout_handlers.size(); ++i )
241  {
242  auto & th = m_timeout_handlers[ i ];
243  m_owner.get().so_create_event_subscription(
244  m_timeout_mbox,
245  m_timeout_msg_type,
246  th.m_state.get(),
247  th.m_handler,
248  ::so_5::thread_safety_t::unsafe,
249  ::so_5::event_handler_kind_t::final_handler );
250  }
251  },
252  [&] {
253  drop_timeout_handlers_subscriptions_up_to( i );
254  } );
255  }
256 
257  //! An implementation of subscription of default timeout handler.
258  void
260  {
261  m_owner.get().so_create_deadletter_subscription(
262  m_timeout_mbox,
263  m_timeout_msg_type,
264  m_default_timeout_handler,
265  ::so_5::thread_safety_t::unsafe );
266  }
267 
268  //! An implementation of unsubscription of the first N timeout handlers.
269  /*!
270  * The default timeout handler is not unsubscribed by this method.
271  */
272  void
274  //! Upper bound in m_timeout_handlers (not included).
275  std::size_t upper_border ) noexcept
276  {
277  for( std::size_t i = 0; i != upper_border; ++i )
278  {
279  const auto & th = m_timeout_handlers[ i ];
280  m_owner.get().so_destroy_event_subscription(
281  m_timeout_mbox,
282  m_timeout_msg_type,
283  th.m_state.get() );
284  }
285  }
286 
287  //! Remove subscriptions for all completion handlers.
288  void
290  {
291  drop_completion_handlers_subscriptions_up_to(
292  m_completion_handlers.size() );
293  }
294 
295  //! Remove subscriptions for all timeout handlers
296  //! (including the default timeout handler).
297  void
299  {
302  }
303 
304  //! Actual unsubscription for the default timeout handler.
305  void
307  {
308  m_owner.get().so_destroy_deadletter_subscription(
309  m_timeout_mbox,
310  m_timeout_msg_type );
311  }
312 
313  //! Actual unsubscription for timeout handlers.
314  /*!
315  * The default timeout handler is not unsubscribed by this method.
316  */
317  void
319  {
320  drop_timeout_handlers_subscriptions_up_to(
321  m_timeout_handlers.size() );
322  }
323 
324  //! Performs total cleanup of the operation data.
325  /*!
326  * All subscriptions are removed.
327  * The delayed message is released.
328  *
329  * Content of m_completion_handlers, m_timeout_handlers and
330  * m_default_timeout_handler will be erased.
331  */
332  void
334  {
335  // All subscriptions must be removed.
338 
339  // Timer is no more needed.
340  m_timeout_timer_id.release();
341 
342  // Information about completion and timeout handlers
343  // is no more needed.
344  m_completion_handlers.clear();
345  m_timeout_handlers.clear();
346  m_default_timeout_handler = ::so_5::event_handler_method_t{};
347  }
348 
349  //! Creates the default timeout handler if it is not set
350  //! by the user.
351  void
353  {
354  if( !m_default_timeout_handler )
355  {
356  m_default_timeout_handler =
357  [op = ::so_5::intrusive_ptr_t<op_data_t>(this)](
358  ::so_5::message_ref_t & /*msg*/ )
359  {
360  op->timedout();
361  };
362  }
363  }
364 
365  //! Cleans the operation data and sets the status specified.
366  void
368  //! A new status to be set.
369  status_t status )
370  {
372  m_status = status;
373  }
374 
375  public :
376  //! Initializing constructor.
378  //! Owner of the async operation.
379  ::so_5::outliving_reference_t< ::so_5::agent_t > owner,
380  //! Type of the timeout message/signal.
381  const std::type_index & timeout_msg_type )
382  : m_owner( owner )
384  // Timeout mbox must be created right now.
385  // It will be used during timeout_handlers processing.
386  , m_timeout_mbox(
389  {}
390 
391  //! Access to timeout mbox.
392  [[nodiscard]] const ::so_5::mbox_t &
393  timeout_mbox() const noexcept
394  {
395  return m_timeout_mbox;
396  }
397 
398  //! Access to type of timeout message/signal.
399  [[nodiscard]] const std::type_index &
400  timeout_msg_type() const noexcept
401  {
402  return m_timeout_msg_type;
403  }
404 
405  //! Access to owner of the async operation.
406  [[nodiscard]] ::so_5::agent_t &
407  owner() noexcept
408  {
409  return m_owner.get();
410  }
411 
412  //! Access to SObjectizer Environment in that the owner is working.
413  [[nodiscard]] ::so_5::environment_t &
414  environment() noexcept
415  {
416  return m_owner.get().so_environment();
417  }
418 
419  //! Reserve a space for storage of completion handlers.
420  void
422  //! A required capacity.
423  std::size_t capacity )
424  {
425  m_completion_handlers.reserve( capacity );
426  }
427 
428  //! Reserve a space for storage of timeout handlers.
429  void
431  //! A required capacity.
432  std::size_t capacity )
433  {
434  m_timeout_handlers.reserve( capacity );
435  }
436 
437  //! Performs activation actions.
438  /*!
439  * The default timeout handler is created if necessary.
440  *
441  * Subscriptions for completion handlers and timeout handler are created.
442  * If an exception is thrown during subscription then all already
443  * subscribed completion and timeout handler will be unsubscribed.
444  */
445  void
447  {
449 
451 
452  ::so_5::details::do_with_rollback_on_exception(
453  [&] {
454  create_timeout_handlers_subscriptions();
455  },
456  [&] {
457  drop_all_completion_handlers_subscriptions();
458  } );
459  }
460 
461  //! Change the status of async operation.
462  void
464  //! A new status to be set.
465  status_t status ) noexcept
466  {
467  m_status = status;
468  }
469 
470  //! Get the current status of async operation.
471  [[nodiscard]] status_t
472  current_status() const noexcept
473  {
474  return m_status;
475  }
476 
477  //! Add a new completion handler for the async operation.
478  void
480  //! A mbox from which the completion message/signal is expected.
481  const ::so_5::mbox_t & mbox,
482  //! A state for completion handler.
483  const ::so_5::state_t & state,
484  //! A type of the completion message/signal.
485  const std::type_index msg_type,
486  //! Completion handler itself.
487  ::so_5::event_handler_method_t evt_handler )
488  {
489  m_completion_handlers.emplace_back(
490  mbox,
491  state,
492  msg_type,
493  std::move(evt_handler) );
494  }
495 
496  //! Add a new timeout handler for the async operation.
497  void
499  //! A state for timeout handler.
500  const ::so_5::state_t & state,
501  //! Timeout handler itself.
502  ::so_5::event_handler_method_t evt_handler )
503  {
504  m_timeout_handlers.emplace_back(
505  state,
506  std::move(evt_handler) );
507  }
508 
509  //! Set the default timeout handler.
510  /*!
511  * \note
512  * If there already is the default timeout handler then the old
513  * one will be replaced by new handler.
514  */
515  void
517  //! Timeout handler itself.
518  ::so_5::event_handler_method_t evt_handler )
519  {
520  m_default_timeout_handler = std::move(evt_handler);
521  }
522 
523  //! Set the ID of timeout message/signal timer.
524  void
526  //! Timer ID of delayed timeout message/signal.
527  ::so_5::timer_id_t id )
528  {
529  m_timeout_timer_id = std::move(id);
530  }
531 
532  //! Mark the async operation as completed.
533  void
534  completed() noexcept
535  {
537  }
538 
539  //! Mark the async operation as timedout.
540  void
541  timedout() noexcept
542  {
544  }
545 
546  //! Mark the async operation as cancelled.
547  void
548  cancelled() noexcept
549  {
551  }
552  };
553 
554 /*!
555  * \brief An alias for smart pointer to operation data.
556  *
557  * \tparam Operation_Data Type of actual operation data representation.
558  * It is expected to be op_data_t or some of its derivatives.
559  */
560 template< typename Operation_Data >
562 
563 } /* namespace details */
564 
565 /*!
566  * \brief An object that allows to cancel async operation.
567  *
568  * Usage example:
569  * \code
570  * namespace asyncop = so_5::extra::async_op::time_limited;
571  * class demo : public so_5::agent_t {
572  * asyncop::cancellation_point_t<> cp_;
573  * ...
574  * void initiate_async_op() {
575  * auto op = asyncop::make<timeout>(*this);
576  * op.completed_on(...);
577  * op.timeout_handler(...);
578  * cp_ = op.activate(std::chrono::milliseconds(300), ...);
579  * }
580  * void on_operation_aborted(mhood_t<op_aborted_notify> cmd) {
581  * // Operation aborted. There is no need to wait for
582  * // completion of operation.
583  * cp_.cancel();
584  * }
585  * };
586  * \endcode
587  *
588  * \note
589  * This class is DefaultConstructible and Moveable, but not Copyable.
590  *
591  * \attention
592  * Objects of this class are not thread safe. It means that cancellation
593  * point should be used only by agent which created it. And the cancellation
594  * point can't be used inside thread-safe event handlers of that agent.
595  *
596  * \tparam Operation_Data Type of actual operation data representation.
597  * Please note that this template parameter is indended to be used for
598  * debugging and testing purposes only.
599  *
600  * \since
601  * v.1.0.4
602  */
603 template< typename Operation_Data = details::op_data_t >
605  {
606  private :
607  template<typename Msg, typename Op_Data> friend class definition_point_t;
608 
609  //! Actual data for async op.
610  /*!
611  * \note
612  * This can be a nullptr if the default constructor was used,
613  * or if operation is already cancelled, or if the content of
614  * the cancellation_point was moved to another object.
615  */
617 
618  //! Initializing constructor to be used by definition_point.
620  //! Actual data for async operation.
621  //! Can't be null.
622  details::op_shptr_t< Operation_Data > op )
623  : m_op( std::move(op) )
624  {}
625 
626  public :
627  cancellation_point_t() = default;
628 
629  cancellation_point_t( const cancellation_point_t & ) = delete;
631 
633  operator=( const cancellation_point_t & ) = delete;
634 
636  operator=( cancellation_point_t && ) = default;
637 
638  //! Get the status of the operation.
639  /*!
640  * \note
641  * The value status_t::unknown_moved_away can be returned if
642  * the actual data of the async operation was moved to another object
643  * (like another cancellation_point_t). Or after a call to
644  * cleanup() method.
645  */
646  [[nodiscard]] status_t
647  status() const noexcept
648  {
649  if( m_op)
650  return m_op->current_status();
652  }
653 
654  //! Can the async operation be cancelled via this cancellation point?
655  /*!
656  * \return true if the cancellation_point holds actual async operation's
657  * data and this async operation is not completed nor timedout yet.
658  */
659  [[nodiscard]] bool
660  is_cancellable() const noexcept
661  {
662  return m_op && status_t::activated == m_op->current_status();
663  }
664 
665  //! An attempt to cancel the async operation.
666  /*!
667  * \note
668  * Operation will be cancelled only if (true == is_cancellable()).
669  *
670  * It is safe to call cancel() if the operation is already
671  * cancelled or completed, or timed out. In that case the call to
672  * cancel() will have no effect.
673  */
674  void
675  cancel() noexcept
676  {
677  if( is_cancellable() )
678  {
679  m_op->cancelled();
680  }
681  }
682 
683  //! Throw out a reference to the async operation data.
684  /*!
685  * A cancellation_point holds a reference to the async operation
686  * data. It means that the async operation data will be destroyed
687  * only when the cancellation_point will be destroyed. For example,
688  * in that case:
689  * \code
690  * namespace asyncop = so_5::extra::async_op::time_limited;
691  * class demo : public so_5::agent_t {
692  * ...
693  * asyncop::cancellation_point_t<> cp_;
694  * ...
695  * void initiate_async_op() {
696  * cp_ = asyncop::make<timeout_msg>(*this)
697  * ...
698  * .activate(...);
699  * ...
700  * }
701  * ...
702  * void on_some_interruption() {
703  * // Cancel asyncop.
704  * cp_.cancel();
705  * // Async operation is cancelled, but the async operation
706  * // data is still in memory. The data will be deallocated
707  * // only when cp_ receives new value or when cp_ will be
708  * // destroyed (e.g. after destruction of demo agent).
709  * }
710  * \endcode
711  *
712  * A call to cleanup() method removes the reference to the async
713  * operation data. It means that if the operation is already
714  * completed, timed out or cancelled, then the operation data
715  * fill be deallocated.
716  *
717  * \note
718  * If the operation is still in progress then a call to cleanup()
719  * doesn't break the operation. You need to call cancel() manually
720  * before calling cleanup() to cancel the operation.
721  */
722  void
723  cleanup() noexcept
724  {
725  m_op.reset();
726  }
727  };
728 
729 namespace details
730 {
731 
732 /*!
733  * \brief A basic part of implementation of definition_point.
734  *
735  * This part is independent from timeout message/signal type.
736  *
737  * \note
738  * This is Moveable class. But not DefaultConstructible.
739  * And not Copyable.
740  *
741  * \note
742  * Most of methods are declared as protected to be used only in
743  * derived class.
744  *
745  * \since
746  * v.1.0.4
747  */
748 template< typename Operation_Data >
750  {
751  protected :
752  //! Actual async operation data.
753  /*!
754  * \note This pointer can be nullptr after activation or
755  * when the object is move away.
756  */
758 
759  //! Initializing constructor.
761  //! Actual async operation data.
762  //! This pointer can't be nullptr.
763  details::op_shptr_t< Operation_Data > op )
764  : m_op( std::move(op) )
765  {}
766 
771 
776 
777  //! Can the async_op be activated?
778  /*!
779  * The async operation can't be activated if it is already activated.
780  * Or if the content of the definition_point is moved to another
781  * definition_point.
782  */
783  [[nodiscard]] bool
784  is_activable() const noexcept
785  {
786  return static_cast<bool>(m_op);
787  }
788 
789  //! Throws an exception if the async operation can't be activated.
790  void
792  {
793  if( !is_activable() )
796  "An attempt to do something on non-activable async_op" );
797  }
798 
799  //! Reserve a space for storage of completion handlers.
800  void
802  //! A required capacity.
803  std::size_t capacity )
804  {
806 
808  }
809 
810  //! Reserve a space for storage of timeout handlers.
811  void
813  //! A required capacity.
814  std::size_t capacity )
815  {
817 
819  }
820 
821  //! The actual implementation of addition of completion handler.
822  /*!
823  * \tparam Msg_Target It can be a mbox, or a reference to an agent.
824  * In the case if Msg_Target if a reference to an agent the agent's
825  * direct mbox will be used as message source.
826  *
827  * \tparam Event_Handler Type of actual handler for message/signal.
828  * It can be a pointer to agent's method or lambda (or another
829  * type of functional object).
830  */
831  template<
832  typename Msg_Target,
833  typename Event_Handler >
834  void
836  //! A source from which a completion message/signal is expected.
837  //! It can be a mbox or an agent (in that case the agent's direct
838  //! mbox will be used).
839  Msg_Target && msg_target,
840  //! A state for that a completion handler is defined.
841  const ::so_5::state_t & state,
842  //! Completion handler itself.
843  Event_Handler && evt_handler )
844  {
846 
847  const auto mbox = ::so_5::extra::async_op::details::target_to_mbox(
848  msg_target );
849 
850  auto evt_handler_info =
852  mbox,
853  m_op->owner(),
855 
857  [op = m_op,
859  ::so_5::message_ref_t & msg )
860  {
861  op->completed();
862  user_handler( msg );
863  };
864 
866  mbox,
867  state,
869  std::move(actual_handler) );
870  }
871 
872  //! The actual implementation of addition of timeout handler.
873  /*!
874  * \tparam Event_Handler Type of actual handler for message/signal.
875  * It can be a pointer to agent's method or lambda (or another
876  * type of functional object).
877  */
878  template< typename Event_Handler >
879  void
881  //! A state for that a timeout handler is defined.
882  const ::so_5::state_t & state,
883  //! Timeout handler itself.
884  Event_Handler && evt_handler )
885  {
887 
889  state,
892  }
893 
894  //! The actual implementation of addition of the default timeout handler.
895  /*!
896  * \tparam Event_Handler Type of actual handler for message/signal.
897  * It can be a pointer to agent's method or lambda (or another
898  * type of functional object).
899  */
900  template< typename Event_Handler >
901  void
903  //! The default timeout handler itself.
904  Event_Handler && evt_handler )
905  {
907 
911  }
912 
913  private :
914  //! A helper method for creation a wrapper for a timeout handler.
915  template< typename Event_Handler >
918  //! Timeout handler to be wrapped.
920  {
921  auto evt_handler_info =
923  m_op->timeout_mbox(),
924  m_op->owner(),
929  std::string(
930  "An attempt to register timeout handler for "
931  "different message type. Expected type: " )
932  + m_op->timeout_msg_type().name()
933  + ", actual type: " + evt_handler_info.m_msg_type.name() );
934 
935  return
936  [op = m_op,
938  ::so_5::message_ref_t & msg )
939  {
940  op->timedout();
941  user_handler( msg );
942  };
943  }
944  };
945 
946 } /* namespace details */
947 
948 /*!
949  * \brief An interface for definition of async operation.
950  *
951  * Object of this type is usually created by make() function and
952  * is used for definition of async operation. Completion and timeout
953  * handlers are set for async operation by using definition_point object.
954  *
955  * Then an user calls activate() method and the definition_point transfers
956  * the async operation data into cancellation_point object. It means that
957  * after a call to activate() method the definition_point object should
958  * not be used. Because it doesn't hold any async operation anymore.
959  *
960  * A simple usage without storing the cancellation_point:
961  * \code
962  * namespace asyncop = so_5::extra::async_op::time_limited;
963  * class demo : public so_5::agent_t {
964  * struct timeout final : public so_5::signal_t {};
965  * ...
966  * void initiate_async_op() {
967  * // Create a definition_point for a new async operation...
968  * asyncop::make<timeout>(*this)
969  * // ...then set up completion handler(s)...
970  * .completed_on(
971  * *this,
972  * so_default_state(),
973  * &demo::on_first_completion_msg )
974  * .completed_on(
975  * some_external_mbox_,
976  * some_user_defined_state_,
977  * [this](mhood_t<another_completion_msg> cmd) {...})
978  * // ...then set up timeout handler(s)...
979  * .timeout_handler(
980  * so_default_state(),
981  * &demo::on_timeout )
982  * .timeout_handler(
983  * some_user_defined_state_,
984  * [this](mhood_t<timeout> cmd) {...})
985  * // ...and now we can activate the operation.
986  * .activate(std::chrono::milliseconds(300));
987  * ...
988  * }
989  * };
990  * \endcode
991  * \note
992  * There is no need to hold definition_point object after activation
993  * of the async operation. This object can be safely discarded.
994  *
995  * A more complex example using cancellation_point for
996  * cancelling the operation.
997  * \code
998  * namespace asyncop = so_5::extra::async_op::time_limited;
999  * class demo : public so_5::agent_t {
1000  * struct timeout final : public so_5::signal_t {};
1001  * ...
1002  * // Cancellation point for the async operation.
1003  * asyncop::cancellation_point_t<> cp_;
1004  * ...
1005  * void initiate_async_op() {
1006  * // Create a definition_point for a new async operation
1007  * // and store the cancellation point after activation.
1008  * cp_ = asyncop::make<timeout>(*this)
1009  * // ...then set up completion handler(s)...
1010  * .completed_on(
1011  * *this,
1012  * so_default_state(),
1013  * &demo::on_first_completion_msg )
1014  * .completed_on(
1015  * some_external_mbox_,
1016  * some_user_defined_state_,
1017  * [this](mhood_t<another_completion_msg> cmd) {...})
1018  * // ...then set up timeout handler(s)...
1019  * .timeout_handler(
1020  * so_default_state(),
1021  * &demo::on_timeout )
1022  * .timeout_handler(
1023  * some_user_defined_state_,
1024  * [this](mhood_t<timeout> cmd) {...})
1025  * // ...and now we can activate the operation.
1026  * .activate(std::chrono::milliseconds(300));
1027  * ...
1028  * }
1029  * ...
1030  * void on_abortion_signal(mhood_t<abort_signal>) {
1031  * // The current async operation should be cancelled.
1032  * cp_.cancel();
1033  * }
1034  * };
1035  * \endcode
1036  *
1037  * \note
1038  * This class is Moveable, but not DefaultConstructible nor Copyable.
1039  *
1040  * \attention
1041  * Objects of this class are not thread safe. It means that a definition
1042  * point should be used only by agent which created it. And the definition
1043  * point can't be used inside thread-safe event handlers of that agent.
1044  *
1045  * \since
1046  * v.1.0.4
1047  */
1048 template<
1049  typename Message,
1050  typename Operation_Data = details::op_data_t >
1052  : protected details::msg_independent_part_of_definition_point_t< Operation_Data >
1053  {
1054  //! A short alias for base type.
1055  using base_type_t =
1057 
1058  public :
1059  //! Initializing constructor.
1061  //! The owner of the async operation.
1062  ::so_5::outliving_reference_t< ::so_5::agent_t > owner )
1063  : base_type_t(
1065  new Operation_Data( owner, typeid(Message) ) ) )
1066  {}
1067 
1069  {
1070  // If operation data is still here then it means that
1071  // there wasn't call to `activate()` and we should cancel
1072  // all described handlers.
1073  // This will lead to deallocation of operation data.
1074  if( this->m_op )
1075  this->m_op->cancelled();
1076  }
1077 
1078  definition_point_t( const definition_point_t & ) = delete;
1080  operator=( const definition_point_t & ) = delete;
1081 
1082  definition_point_t( definition_point_t && ) = default;
1084  operator=( definition_point_t && ) = default;
1085 
1086  using base_type_t::is_activable;
1087 
1088  /*!
1089  * \brief Reserve a space for storage of completion handlers.
1090  *
1091  * Usage example:
1092  * \code
1093  * namespace asyncop = so_5::extra::async_op::time_limited;
1094  * auto op = asyncop::make<timeout>(some_agent);
1095  * // Reserve space for four completion handlers.
1096  * op.reserve_completion_handlers_capacity(4);
1097  * op.completed_on(...);
1098  * op.completed_on(...);
1099  * op.completed_on(...);
1100  * op.completed_on(...);
1101  * op.activate(...);
1102  * \endcode
1103  */
1106  //! A required capacity.
1107  std::size_t capacity ) &
1108  {
1110  return *this;
1111  }
1112 
1113  //! Just a proxy for actual version of %reserve_completion_handlers_capacity.
1114  template< typename... Args >
1115  auto
1117  {
1118  return std::move(this->reserve_completion_handlers_capacity(
1119  std::forward<Args>(args)... ));
1120  }
1121 
1122  /*!
1123  * \brief Reserve a space for storage of timeout handlers.
1124  *
1125  * Usage example:
1126  * \code
1127  * namespace asyncop = so_5::extra::async_op::time_limited;
1128  * auto op = asyncop::make<timeout>(some_agent);
1129  * // Reserve space for eight timeout handlers.
1130  * op.reserve_timeout_handlers_capacity(8);
1131  * op.timeout_handler(...);
1132  * op.timeout_handler(...);
1133  * ...
1134  * op.activate(...);
1135  * \endcode
1136  */
1139  //! A required capacity.
1140  std::size_t capacity ) &
1141  {
1143  return *this;
1144  }
1145 
1146  //! Just a proxy for actual version of %reserve_timeout_handlers_capacity.
1147  template< typename... Args >
1148  auto
1150  {
1151  return std::move(this->reserve_timeout_handlers_capacity(
1152  std::forward<Args>(args)... ));
1153  }
1154 
1155  /*!
1156  * \brief Add a completion handler for the async operation.
1157  *
1158  * Usage example:
1159  * \code
1160  * namespace asyncop = so_5::extra::async_op::time_limited;
1161  * class demo : public so_5::agent_t {
1162  * ...
1163  * void initiate_async_op() {
1164  * asyncop::make<timeout>(*this)
1165  * .completed_on(
1166  * *this,
1167  * so_default_state(),
1168  * &demo::some_event )
1169  * .completed_on(
1170  * some_mbox_,
1171  * some_agent_state_,
1172  * [this](mhood_t<some_msg> cmd) {...})
1173  * ...
1174  * .activate(...);
1175  * }
1176  * };
1177  * \endcode
1178  *
1179  * \note
1180  * The completion handler will be stored inside async operation
1181  * data. Actual subscription for it will be made during activation
1182  * of the async operation.
1183  *
1184  * \tparam Msg_Target It can be a mbox, or a reference to an agent.
1185  * In the case if Msg_Target if a reference to an agent the agent's
1186  * direct mbox will be used as message source.
1187  *
1188  * \tparam Event_Handler Type of actual handler for message/signal.
1189  * It can be a pointer to agent's method or lambda (or another
1190  * type of functional object).
1191  */
1192  template<
1193  typename Msg_Target,
1194  typename Event_Handler >
1197  //! A source from which a completion message is expected.
1198  //! It \a msg_target is a reference to an agent then
1199  //! the agent's direct mbox will be used.
1200  Msg_Target && msg_target,
1201  //! A state for which that completion handler will be subscribed.
1202  const ::so_5::state_t & state,
1203  //! The completion handler itself.
1204  Event_Handler && evt_handler ) &
1205  {
1206  this->completed_on_impl(
1208  state,
1210 
1211  return *this;
1212  }
1213 
1214  //! Just a proxy for the main version of %completed_on.
1215  template< typename... Args >
1217  completed_on( Args && ...args ) &&
1218  {
1219  return std::move(this->completed_on(std::forward<Args>(args)...));
1220  }
1221 
1222  /*!
1223  * \brief Add a timeout handler for the async operation.
1224  *
1225  * Usage example:
1226  * \code
1227  * namespace asyncop = so_5::extra::async_op::time_limited;
1228  * class demo : public so_5::agent_t {
1229  * ...
1230  * void initiate_async_op() {
1231  * asyncop::make<timeout>(*this)
1232  * .timeout_handler(
1233  * so_default_state(),
1234  * &demo::some_event )
1235  * .timeout_handler(
1236  * some_agent_state_,
1237  * [this](mhood_t<timeout> cmd) {...})
1238  * ...
1239  * .activate(...);
1240  * }
1241  * };
1242  * \endcode
1243  *
1244  * \note
1245  * The timeout handler will be stored inside async operation
1246  * data. Actual subscription for it will be made during activation
1247  * of the async operation.
1248  *
1249  * \tparam Event_Handler Type of actual handler for message/signal.
1250  * It can be a pointer to agent's method or lambda (or another
1251  * type of functional object).
1252  * Please notice that Event_Handler must receive a message/signal
1253  * of type \a Message. It means that Event_Handler can have one of the
1254  * following formats:
1255  * \code
1256  * ret_type handler(Message);
1257  * ret_type handler(const Message &);
1258  * ret_type handler(mhood_t<Message>); // This is the recommended format.
1259  * \endcode
1260  */
1261  template< typename Event_Handler >
1264  const ::so_5::state_t & state,
1265  Event_Handler && evt_handler ) &
1266  {
1267  this->timeout_handler_impl(
1268  state,
1270 
1271  return *this;
1272  }
1273 
1274  //! Just a proxy for the main version of %timeout_handler.
1275  template< typename... Args >
1277  timeout_handler( Args && ...args ) &&
1278  {
1279  return std::move(this->timeout_handler(
1280  std::forward<Args>(args)...));
1281  }
1282 
1283  /*!
1284  * \brief Add the default timeout handler for the async operation.
1285  *
1286  * The default timeout handler will be called if no timeout
1287  * handler was called for timeout message/signal.
1288  * A deadletter handlers from SObjectizer-5.5.21 are used for
1289  * implementation of the default timeout handler for async
1290  * operation.
1291  *
1292  * There can be only one the default timeout handler.
1293  * If the default timeout handler is already specified a new call
1294  * to default_timeout_handler() will replace the old handler with
1295  * new one (the old default timeout handler will be lost).
1296  *
1297  * Usage example:
1298  * \code
1299  * namespace asyncop = so_5::extra::async_op::time_limited;
1300  * class demo : public so_5::agent_t {
1301  * ...
1302  * void initiate_async_op() {
1303  * asyncop::make<timeout>(*this)
1304  * .default_timeout_handler(
1305  * &demo::some_deadletter_handler )
1306  * ...
1307  * .activate(...);
1308  * }
1309  * };
1310  * \endcode
1311  *
1312  * \note
1313  * The default timeout handler will be stored inside async operation
1314  * data. Actual subscription for it will be made during activation
1315  * of the async operation.
1316  *
1317  * \tparam Event_Handler Type of actual handler for message/signal.
1318  * It can be a pointer to agent's method or lambda (or another
1319  * type of functional object).
1320  * Please notice that Event_Handler must receive a message/signal
1321  * of type \a Message. It means that Event_Handler can have one of the
1322  * following formats:
1323  * \code
1324  * ret_type handler(Message);
1325  * ret_type handler(const Message &);
1326  * ret_type handler(mhood_t<Message>); // This is the recommended format.
1327  * \endcode
1328  */
1329  template< typename Event_Handler >
1332  //! The default timeout handler itself.
1333  Event_Handler && evt_handler ) &
1334  {
1337 
1338  return *this;
1339  }
1340 
1341  //! Just a proxy for the main version of %default_timeout_handler.
1342  template< typename... Args >
1344  default_timeout_handler( Args && ...args ) &&
1345  {
1346  return std::move(this->default_timeout_handler(
1347  std::forward<Args>(args)...));
1348  }
1349 
1350  /*!
1351  * \brief Activation of the async operation.
1352  *
1353  * All subscriptions for completion and timeout handlers will be made
1354  * here. Then the timeout message/signal will be sent. And then
1355  * a cancellation_point for that async operation will be returned.
1356  *
1357  * An example of activation of the async operation in the case
1358  * when \a Message is a signal:
1359  * \code
1360  * namespace asyncop = so_5::extra::async_op::time_limited;
1361  * class demo : public so_5::agent_t {
1362  * struct timeout final : public so_5::signal_t {};
1363  * ...
1364  * void initiate_async_op() {
1365  * asyncop::make<timeout>()
1366  * ... // Set up completion and timeout handlers...
1367  * // Activation with just one argument - timeout duration.
1368  * .activate(std::chrono::milliseconds(200));
1369  * ...
1370  * }
1371  * };
1372  * \endcode
1373  *
1374  * An example of activation of the async operation in the case
1375  * when \a Message is a message:
1376  * \code
1377  * namespace asyncop = so_5::extra::async_op::time_limited;
1378  * class demo : public so_5::agent_t {
1379  * struct timeout final : public so_5::message_t {
1380  * int id_;
1381  * const std::string reply_payload_;
1382  * const so_5::mbox_t reply_mbox_;
1383  *
1384  * timeout(int id, std::string playload, so_5::mbox_t mbox)
1385  * : id_(id), reply_payload_(std::move(payload), reply_mbox_(std::move(mbox))
1386  * {}
1387  * };
1388  * ...
1389  * void initiate_async_op() {
1390  * asyncop::make<timeout>()
1391  * ... // Set up completion and timeout handlers...
1392  * // Activation with several arguments.
1393  * .activate(
1394  * // This is timeout duration.
1395  * std::chrono::milliseconds(200),
1396  * // All these arguments will be passed to the constructor of timeout message.
1397  * current_op_id,
1398  * op_reply_payload,
1399  * op_result_consumer_mbox);
1400  * ...
1401  * }
1402  * };
1403  * \endcode
1404  *
1405  * \note
1406  * There is no need to store the cancellation_point returned if you
1407  * don't want to cancel async operation. The return value of
1408  * activate() can be safely discarded in that case.
1409  *
1410  * \attention
1411  * If cancellation_point is stored then it should be used only on
1412  * the working context of the agent for that the async operation is
1413  * created. And the cancellation_point should not be used in the
1414  * thread-safe event handlers of that agent. It is because
1415  * cancellation_point is not a thread safe object.
1416  *
1417  * \note
1418  * If an exception is thrown during the activation procedure all
1419  * completion and timeout handlers which were subscribed before
1420  * the exception will be unsubscribed. And the async operation
1421  * data will be deleted. It means that after an exception in
1422  * activate() method the definition_point can't be used anymore.
1423  */
1424  template< typename... Args >
1425  cancellation_point_t< Operation_Data >
1427  //! The maximum duration for the async operation.
1428  //! Note: this should be non-negative value.
1429  std::chrono::steady_clock::duration timeout,
1430  //! Arguments for the construction of timeout message/signal
1431  //! of the type \a Message.
1432  //! This arguments will be passed to so_5::send_delayed
1433  //! function.
1434  Args && ...args ) &
1435  {
1436  this->ensure_activable();
1437 
1438  // From this point the definition_point will be detached
1439  // from operation data. It means that is_activable() will
1440  // return false.
1441  auto op = std::move(this->m_op);
1442 
1445  op->setup_timer_id(
1447  op->timeout_mbox(),
1448  timeout,
1450  std::forward<Args>(args)... ) );
1452  },
1453  [&] {
1454  op->cancelled();
1455  } );
1456 
1458  }
1459 
1460  //! Just a proxy for the main version of %activate.
1461  template< typename... Args >
1462  auto
1463  activate( Args && ...args ) &&
1464  {
1465  return this->activate( std::forward<Args>(args)... );
1466  }
1467  };
1468 
1469 //
1470 // make
1471 //
1472 /*!
1473  * \brief A factory for creation definition points of async operations.
1474  *
1475  * Instead of creation of definition_point_t object by hand this
1476  * helper function should be used.
1477  *
1478  * Usage examples:
1479  * \code
1480  * namespace asyncop = so_5::extra::async_op::time_limited;
1481  * class demo : public so_5::agent_t {
1482  * struct timeout final : public so_5::signal_t {};
1483  * ...
1484  * void initiate_async_op() {
1485  * // Create a definition_point for a new async operation...
1486  * asyncop::make<timeout>(*this)
1487  * // ...then set up completion handler(s)...
1488  * .completed_on(
1489  * *this,
1490  * so_default_state(),
1491  * &demo::on_first_completion_msg )
1492  * .completed_on(
1493  * some_external_mbox_,
1494  * some_user_defined_state_,
1495  * [this](mhood_t<another_completion_msg> cmd) {...})
1496  * // ...then set up timeout handler(s)...
1497  * .timeout_handler(
1498  * so_default_state(),
1499  * &demo::on_timeout )
1500  * .timeout_handler(
1501  * some_user_defined_state_,
1502  * [this](mhood_t<timeout> cmd) {...})
1503  * // ...and now we can activate the operation.
1504  * .activate(std::chrono::milliseconds(300));
1505  * ...
1506  * }
1507  * };
1508  * \endcode
1509  *
1510  * \since
1511  * v.1.0.4
1512  */
1513 template< typename Message >
1516  //! Agent for that this async operation will be created.
1517  ::so_5::agent_t & owner )
1518  {
1520  }
1521 
1522 } /* namespace time_limited */
1523 
1524 } /* namespace async_op */
1525 
1526 } /* namespace extra */
1527 
1528 } /* namespace so_5 */
void do_subscribe_timeout_handlers()
An implementation of subscription of timeout handlers.
void do_activation_actions()
Performs activation actions.
An object that allows to cancel async operation.
cancellation_point_t< Operation_Data > activate(std::chrono::steady_clock::duration timeout, Args &&...args) &
Activation of the async operation.
definition_point_t & reserve_completion_handlers_capacity(std::size_t capacity) &
Reserve a space for storage of completion handlers.
void drop_all_completion_handlers_subscriptions() noexcept
Remove subscriptions for all completion handlers.
::so_5::timer_id_t m_timeout_timer_id
An ID of timeout message/signal.
void timedout() noexcept
Mark the async operation as timedout.
void create_timeout_handlers_subscriptions()
Create subscriptions for all defined timeout handlers (including default handler).
msg_independent_part_of_definition_point_t & operator=(const msg_independent_part_of_definition_point_t &)=delete
completion_handler_subscription_t(so_5::mbox_t mbox, const so_5::state_t &state, std::type_index subscription_type, so_5::event_handler_method_t handler)
Initializing constructor.
auto activate(Args &&...args) &&
Just a proxy for the main version of activate.
timeout_handler_subscription_t(const so_5::state_t &state, so_5::event_handler_method_t handler)
Initializing constructor.
const ::so_5::mbox_t & timeout_mbox() const noexcept
Access to timeout mbox.
op_data_t(::so_5::outliving_reference_t< ::so_5::agent_t > owner, const std::type_index &timeout_msg_type)
Initializing constructor.
void completed() noexcept
Mark the async operation as completed.
void make_default_timeout_handler_if_necessary()
Creates the default timeout handler if it is not set by the user.
void cleanup() noexcept
Throw out a reference to the async operation data.
definition_point_t< Message > make(::so_5::agent_t &owner)
A factory for creation definition points of async operations.
::so_5::outliving_reference_t< const ::so_5::state_t > m_state
State for that a subscription should be created.
details::op_shptr_t< Operation_Data > m_op
Actual data for async op.
void drop_all_timeout_handlers_subscriptions() noexcept
Remove subscriptions for all timeout handlers (including the default timeout handler).
cancellation_point_t & operator=(const cancellation_point_t &)=delete
definition_point_t & operator=(const definition_point_t &)=delete
void cancel() noexcept
An attempt to cancel the async operation.
void reserve_completion_handlers_capacity(std::size_t capacity)
Reserve a space for storage of completion handlers.
void timeout_handler_impl(const ::so_5::state_t &state, Event_Handler &&evt_handler)
The actual implementation of addition of timeout handler.
void change_status(status_t status) noexcept
Change the status of async operation.
Ranges for error codes of each submodules.
Definition: details.hpp:13
status_t
Enumeration for status of operation.
std::vector< completion_handler_subscription_t > m_completion_handlers
Subscriptions for completion handlers which should be created on activation.
void cancelled() noexcept
Mark the async operation as cancelled.
void reserve_completion_handlers_capacity_impl(std::size_t capacity)
Reserve a space for storage of completion handlers.
definition_point_t & reserve_timeout_handlers_capacity(std::size_t capacity) &
Reserve a space for storage of timeout handlers.
cancellation_point_t(details::op_shptr_t< Operation_Data > op)
Initializing constructor to be used by definition_point.
const ::so_5::mbox_t m_timeout_mbox
A mbox to which timeout message/signal will be sent.
void do_unsubscribe_default_timeout_handler() noexcept
Actual unsubscription for the default timeout handler.
void do_subscribe_default_timeout_handler()
An implementation of subscription of default timeout handler.
definition_point_t && default_timeout_handler(Args &&...args) &&
Just a proxy for the main version of default_timeout_handler.
definition_point_t && completed_on(Args &&...args) &&
Just a proxy for the main version of completed_on.
void ensure_activable() const
Throws an exception if the async operation can&#39;t be activated.
msg_independent_part_of_definition_point_t(const msg_independent_part_of_definition_point_t &)=delete
definition_point_t(const definition_point_t &)=delete
status_t status() const noexcept
Get the status of the operation.
definition_point_t & timeout_handler(const ::so_5::state_t &state, Event_Handler &&evt_handler) &
Add a timeout handler for the async operation.
definition_point_t & operator=(definition_point_t &&)=default
auto reserve_completion_handlers_capacity(Args &&...args) &&
Just a proxy for actual version of reserve_completion_handlers_capacity.
void default_timeout_handler_impl(Event_Handler &&evt_handler)
The actual implementation of addition of the default timeout handler.
cancellation_point_t(cancellation_point_t &&)=default
definition_point_t & default_timeout_handler(Event_Handler &&evt_handler) &
Add the default timeout handler for the async operation.
Status of operation is unknown because the operation data has been moved to another proxy-object...
::so_5::agent_t & owner() noexcept
Access to owner of the async operation.
std::vector< timeout_handler_subscription_t > m_timeout_handlers
Subscriptions for timeout handlers which should be created on activation.
void add_completion_handler(const ::so_5::mbox_t &mbox, const ::so_5::state_t &state, const std::type_index msg_type, ::so_5::event_handler_method_t evt_handler)
Add a new completion handler for the async operation.
auto reserve_timeout_handlers_capacity(Args &&...args) &&
Just a proxy for actual version of reserve_timeout_handlers_capacity.
void completed_on_impl(Msg_Target &&msg_target, const ::so_5::state_t &state, Event_Handler &&evt_handler)
The actual implementation of addition of completion handler.
void add_timeout_handler(const ::so_5::state_t &state, ::so_5::event_handler_method_t evt_handler)
Add a new timeout handler for the async operation.
::so_5::event_handler_method_t m_default_timeout_handler
A default timeout handler which will be used as deadletter handler for timeout message/signal.
cancellation_point_t(const cancellation_point_t &)=delete
void reserve_timeout_handlers_capacity_impl(std::size_t capacity)
Reserve a space for storage of timeout handlers.
definition_point_t && timeout_handler(Args &&...args) &&
Just a proxy for the main version of timeout_handler.
const std::type_index m_timeout_msg_type
Type of timeout message.
const std::type_index & timeout_msg_type() const noexcept
Access to type of timeout message/signal.
msg_independent_part_of_definition_point_t & operator=(msg_independent_part_of_definition_point_t &&)=default
::so_5::event_handler_method_t create_actual_timeout_handler(Event_Handler &&evt_handler)
A helper method for creation a wrapper for a timeout handler.
::so_5::environment_t & environment() noexcept
Access to SObjectizer Environment in that the owner is working.
status_t current_status() const noexcept
Get the current status of async operation.
status_t m_status
The status of the async operation.
void drop_completion_handlers_subscriptions_up_to(std::size_t upper_border) noexcept
Removes subscription for the first N completion handlers.
void create_completion_handlers_subscriptions()
Create subscriptions for all defined completion handlers.
definition_point_t(::so_5::outliving_reference_t< ::so_5::agent_t > owner)
Initializing constructor.
definition_point_t & completed_on(Msg_Target &&msg_target, const ::so_5::state_t &state, Event_Handler &&evt_handler) &
Add a completion handler for the async operation.
msg_independent_part_of_definition_point_t(details::op_shptr_t< Operation_Data > op)
Initializing constructor.
::so_5::outliving_reference_t< const ::so_5::state_t > m_state
State for that a subscription should be created.
msg_independent_part_of_definition_point_t(msg_independent_part_of_definition_point_t &&)=default
Container of all data related to async operation.
bool is_cancellable() const noexcept
Can the async operation be cancelled via this cancellation point?
void setup_timer_id(::so_5::timer_id_t id)
Set the ID of timeout message/signal timer.
cancellation_point_t & operator=(cancellation_point_t &&)=default
void clean_with_status(status_t status)
Cleans the operation data and sets the status specified.
details::op_shptr_t< Operation_Data > m_op
Actual async operation data.
void do_cancellation_actions() noexcept
Performs total cleanup of the operation data.
void drop_timeout_handlers_subscriptions_up_to(std::size_t upper_border) noexcept
An implementation of unsubscription of the first N timeout handlers.
void default_timeout_handler(::so_5::event_handler_method_t evt_handler)
Set the default timeout handler.
::so_5::outliving_reference_t<::so_5::agent_t > m_owner
Owner of async operation.
void do_unsubscribe_timeout_handlers() noexcept
Actual unsubscription for timeout handlers.
void reserve_timeout_handlers_capacity(std::size_t capacity)
Reserve a space for storage of timeout handlers.