SObjectizer-5 Extra
first_last_subscriber_notification.hpp
Go to the documentation of this file.
1 /*!
2  * \file
3  * \brief Implementation of mbox that informs about the first and the
4  * last subscriptions.
5  *
6  * \since v.1.5.2
7  */
8 
9 #pragma once
10 
11 #include <so_5/version.hpp>
12 
13 #if SO_5_VERSION < SO_5_VERSION_MAKE(5u, 8u, 0u)
14 #error "SObjectizer-5.8.0 of newest is required"
15 #endif
16 
17 #include <so_5_extra/error_ranges.hpp>
18 
19 #include <so_5/impl/msg_tracing_helpers.hpp>
20 #include <so_5/impl/local_mbox_basic_subscription_info.hpp>
21 
22 #include <so_5/details/sync_helpers.hpp>
23 #include <so_5/details/invoke_noexcept_code.hpp>
24 
25 #include <so_5/mbox.hpp>
26 #include <so_5/send_functions.hpp>
27 
28 #include <string_view>
29 
30 namespace so_5 {
31 
32 namespace extra {
33 
34 namespace mboxes {
35 
37 
38 namespace errors {
39 
40 /*!
41  * \brief An attempt to use a message type that differs from mbox's message
42  * type.
43  *
44  * Type of message to be used with first_last_subscriber_notification_mbox
45  * is fixed as part of first_last_subscriber_notification_mbox type.
46  * An attempt to use different message type (for subscription, delivery or
47  * setting delivery filter) will lead to an exception with this error code.
48  *
49  * \since v.1.5.2
50  */
53 
54 /*!
55  * \brief An attempt to add a new subscriber for MPSC mbox when another
56  * subscriber already exists.
57  *
58  * When an instance of first_last_subscriber_notification_mbox is created as
59  * MPSC mbox then only one subscriber can be added. An attempt to add another
60  * subscriber will lead to this error.
61  *
62  * \since v.1.5.2
63  */
66 
67 } /* namespace errors */
68 
69 /*!
70  * \brief Signal to be sent when the first subscriber arrives.
71  *
72  * Usage example:
73  * \code
74  * namespace mbox_ns = so_5::extra::mboxes::first_last_subscriber_notification;
75  *
76  * class my_producer final : public so_5::agent_t
77  * {
78  * public:
79  * // Message with published data.
80  * struct msg_data final : public so_5::message_t {...};
81  *
82  * private:
83  * state_t st_no_consumers{ this };
84  * state_t st_consumers_waiting{ this };
85  * ...
86  * const so_5::mbox_t publishing_mbox_;
87  * ...
88  * public:
89  * my_producer( context_t ctx )
90  * : so_5::agent_t{ std::move(ctx) }
91  * // New mbox for publishing produced data has to be created.
92  * , publishing_mbox_{ mbox_ns::make<msg_data>(
93  * so_environment(),
94  * // agent's direct mbox as the target for notifications.
95  * so_direct_mbox(),
96  * so_5::mbox_type_t::multi_producer_multi_consumer )
97  * }
98  * {...}
99  *
100  * void so_define_agent() override
101  * {
102  * st_consumers_waiting
103  * .on_enter{ turn_data_acquisition_on(); }
104  * .on_exit{ turn_data_acquisition_off(); }
105  * .event( &my_producer::evt_last_subscriber )
106  * ;
107  *
108  * st_no_consumers
109  * .event( &my_producer::evt_first_subscriber )
110  * ;
111  * ...
112  *
113  * st_no_consumers.activate();
114  * }
115  *
116  * ...
117  * private:
118  * void evt_first_subscriber( mhood_t< msg_first_subscriber > )
119  * {
120  * st_consumers_waiting.activate();
121  * }
122  *
123  * void evt_last_subscriber( mhood_t< msg_last_subscriber > )
124  * {
125  * st_no_consumers.activate();
126  * }
127  * ...
128  * };
129  * \endcode
130  *
131  * \since v.1.5.2
132  */
133 struct msg_first_subscriber final : public so_5::signal_t {};
134 
135 /*!
136  * \brief Signal to be sent when the last subscriber gone.
137  *
138  * See msg_first_subscriber for usage example.
139  *
140  * \since v.1.5.2
141  */
142 struct msg_last_subscriber final : public so_5::signal_t {};
143 
144 namespace details {
145 
146 /*!
147  * \brief An information block about one subscriber.
148  *
149  * \since v.1.5.2
150  */
151 using subscriber_info_t =
153 
154 //
155 // template_independent_mbox_data_t
156 //
157 /*!
158  * \brief A mixin with actual data which is necessary for implementation
159  * of actual mbox.
160  *
161  * This data type doesn't depend on any template parameters.
162  *
163  * \since v.1.5.2
164  */
166  {
167  //! A special coparator for sinks with respect to
168  //! sink's priority.
170  {
171  bool operator()(
172  abstract_message_sink_t * a,
173  abstract_message_sink_t * b ) const noexcept
174  {
175  return abstract_message_sink_t::special_sink_ptr_compare( a, b );
176  }
177  };
178 
179  //! Type of subscribers map.
180  using subscribers_map_t = std::map<
184  >;
185 
186  //! SObjectizer Environment to work in.
188 
189  //! ID of the mbox.
191 
192  //! Mbox for notifications about the first/last subscribers.
194 
195  //! Type of this mbox (MPMC or MPSC).
197 
198  //! Subscribers.
199  /*!
200  * Can be empty.
201  */
203 
204  //! Number of actual subscriptions.
205  /*!
206  * \note
207  * There could be cases when a delivery filter is set, but subscription
208  * isn't made yet. Such cases shouldn't be treated as subscriptions.
209  * So we have to store the number of actual subscriptions separately
210  * and don't rely on the size of m_subscribers.
211  */
213 
215  environment_t & env,
216  mbox_id_t id,
217  mbox_t notification_mbox,
218  mbox_type_t mbox_type )
219  : m_env{ env }
220  , m_id{ id }
223  {}
224  };
225 
226 //
227 // actual_mbox_t
228 //
229 
230 /*!
231  * \brief An actual implementation of first/last subscriber message mbox.
232  *
233  * \tparam Msg_Type type of message to be used with this mbox.
234  *
235  * \tparam Lock_Type type of lock object to be used for thread safety.
236  *
237  * \tparam Tracing_Base base class with implementation of message
238  * delivery tracing methods.
239  *
240  * \since v.1.5.2
241  */
242 template<
243  typename Msg_Type,
244  typename Lock_Type,
245  typename Tracing_Base >
247  : public abstract_message_box_t
248  , private Tracing_Base
249  {
250  public:
251  /*!
252  * \brief Initializing constructor.
253  *
254  * \tparam Tracing_Args parameters for Tracing_Base constructor
255  * (can be empty list if Tracing_Base have only the default constructor).
256  */
257  template< typename... Tracing_Args >
259  //! SObjectizer Environment to work in.
260  environment_t & env,
261  //! ID of this mbox.
262  mbox_id_t id,
263  //! Mbox for notifications about the first/last subscriber.
264  mbox_t notification_mbox,
265  //! Type of this mbox (MPSC or MPMC).
266  mbox_type_t mbox_type,
267  //! Optional parameters for Tracing_Base's constructor.
268  Tracing_Args &&... args )
271  {
272  // Use of mutable message type for MPMC mbox should be prohibited.
273  if constexpr( is_mutable_message< Msg_Type >::value )
274  {
275  switch( mbox_type )
276  {
280  "an attempt to make MPMC mbox for mutable message, "
281  "msg_type=" + std::string(typeid(Msg_Type).name()) );
282  break;
283 
285  break;
286  }
287  }
288  }
289 
290  mbox_id_t
291  id() const override
292  {
293  return this->m_data.m_id;
294  }
295 
296  void
298  const std::type_index & msg_type,
299  abstract_message_sink_t & subscriber ) override
300  {
302  msg_type,
303  "an attempt to subscribe with different message type" );
304 
306  subscriber,
307  [] {
308  return subscriber_info_t{
310  };
311  },
312  []( subscriber_info_t & info ) {
314  },
315  [this]() {
316  this->m_data.m_subscriptions_count += 1u;
317  } );
318  }
319 
320  void
322  const std::type_index & msg_type,
323  abstract_message_sink_t & subscriber ) noexcept override
324  {
325  // Since v.1.6.0 we can't throw. So perform the main
326  // action only if types are the same.
327  if( msg_type == typeid(Msg_Type) )
328  {
330  subscriber,
331  []( subscriber_info_t & info ) {
333  },
334  [this]() {
335  this->m_data.m_subscriptions_count -= 1u;
336  } );
337  }
338  }
339 
340  std::string
341  query_name() const override
342  {
344  s << "<mbox:type=FIRST_LAST_SUBSCR_NOTIFY";
345 
346  switch( this->m_data.m_mbox_type )
347  {
349  s << "(MPMC)";
350  break;
351 
353  s << "(MPSC)";
354  break;
355  }
356 
357  s << ":id=" << this->m_data.m_id << ">";
358 
359  return s.str();
360  }
361 
363  type() const override
364  {
365  return this->m_data.m_mbox_type;
366  }
367 
368  void
370  message_delivery_mode_t delivery_mode,
371  const std::type_index & msg_type,
372  const message_ref_t & message,
373  unsigned int redirection_deep ) override
374  {
376  msg_type,
377  "an attempt to deliver with different message type" );
378 
380  *this, // as Tracing_base
381  *this, // as abstract_message_box_t
382  "deliver_message",
385 
386  // NOTE: we don't check for message mutability because
387  // it's impossible to create MPMC mbox for mutable message.
388  // If MPMC mbox was created for immutable message, but a user
389  // tries to send a mutable message then it will be a message
390  // of a different type and the corresponding exception will
391  // be thrown earlier.
393  tracer,
395  msg_type,
396  message,
398  }
399 
400  void
402  const std::type_index & msg_type,
403  const delivery_filter_t & filter,
404  abstract_message_sink_t & subscriber ) override
405  {
407  msg_type,
408  "an attempt to set delivery_filter with "
409  "different message type" );
410 
412  subscriber,
413  [&filter] {
414  return subscriber_info_t{ filter };
415  },
416  [&filter]( subscriber_info_t & info ) {
418  },
419  []() { /* nothing to do */ } );
420  }
421 
422  void
424  const std::type_index & msg_type,
425  abstract_message_sink_t & subscriber ) noexcept override
426  {
428  msg_type,
429  "an attempt to drop delivery_filter with "
430  "different message type" );
431 
433  subscriber,
434  []( subscriber_info_t & info ) {
435  info.drop_filter();
436  },
437  []() { /* nothing to do */ } );
438  }
439 
441  environment() const noexcept override
442  {
443  return this->m_data.m_env;
444  }
445 
446  private :
447  //! Data of this message mbox.
449 
450  //! Object lock.
451  Lock_Type m_lock;
452 
453  /*!
454  * Throws an error if msg_type differs from Config::msg_type.
455  */
456  static void
458  const std::type_index & msg_type,
459  std::string_view error_description )
460  {
461  if( msg_type != typeid(Msg_Type) )
464  //FIXME: we have to create std::string object because
465  //so_5::exception_t::raise expects std::string.
466  //This should be fixed after resolving:
467  //https://github.com/Stiffstream/sobjectizer/issues/46
469  }
470 
471  template<
472  typename Info_Maker,
473  typename Info_Changer,
474  typename Post_Action >
475  void
477  abstract_message_sink_t & subscriber,
478  Info_Maker maker,
479  Info_Changer changer,
480  Post_Action post_action )
481  {
483 
484  auto it_subscriber = this->m_data.m_subscribers.find(
486  if( it_subscriber == this->m_data.m_subscribers.end() )
487  {
488  // There is no subscriber yet. It must be added if
489  // it's possible.
491 
493  std::addressof(subscriber), maker() );
494  }
495  else
496  // Subscriber is known. It must be updated.
498 
499  // All following actions shouldn't throw.
501  {
502  // post_action can increment number of actual subscribers so
503  // we have to store the old value before calling post_action.
504  const auto old_subscribers_count =
506  post_action();
507 
509  1u == this->m_data.m_subscriptions_count )
510  {
511  // We've got the first subscriber.
513  this->m_data.m_notification_mbox );
514  }
515  } );
516  }
517 
518  template<
519  typename Info_Changer,
520  typename Post_Action >
521  void
523  abstract_message_sink_t & subscriber,
524  Info_Changer changer,
525  Post_Action post_action )
526  {
528 
529  auto it_subscriber = this->m_data.m_subscribers.find(
531  if( it_subscriber != this->m_data.m_subscribers.end() )
532  {
533  // Subscriber is found and must be modified.
535 
536  // If info about subscriber becomes empty after
537  // modification then subscriber info must be removed.
538  if( it_subscriber->second.empty() )
540 
541  // All following actions shouldn't throw.
543  {
544  // post_action can increment number of actual
545  // subscribers so we have to store the old value before
546  // calling post_action.
547  const auto old_subscribers_count =
549  post_action();
550 
552  0u == this->m_data.m_subscriptions_count )
553  {
554  // We've lost the last subscriber.
556  this->m_data.m_notification_mbox );
557  }
558  } );
559  }
560  }
561 
562  void
564  typename Tracing_Base::deliver_op_tracer const & tracer,
565  message_delivery_mode_t delivery_mode,
566  const std::type_index & msg_type,
567  const message_ref_t & message,
568  unsigned int redirection_deep )
569  {
571 
572  auto & subscribers = this->m_data.m_subscribers;
573  if( !subscribers.empty() )
574  for( const auto & kv : subscribers )
576  *(kv.first),
577  kv.second,
578  tracer,
580  msg_type,
581  message,
583  else
585  }
586 
587  void
589  abstract_message_sink_t & subscriber,
590  const subscriber_info_t & subscriber_info,
591  typename Tracing_Base::deliver_op_tracer const & tracer,
592  message_delivery_mode_t delivery_mode,
593  const std::type_index & msg_type,
594  const message_ref_t & message,
595  unsigned int redirection_deep ) const
596  {
597  const auto delivery_status =
599  subscriber,
600  message,
601  []( const message_ref_t & msg ) -> message_t & {
602  return *msg;
603  } );
604 
606  {
607  using namespace so_5::message_limit::impl;
608 
610  this->id(),
612  msg_type,
613  message,
616  }
617  else
620  }
621 
622  void
624  {
625  // If this mbox is MPSC mbox then a new item can be
626  // added to subscribers container only if that container
627  // is empty.
628  // This is true even if new item will hold only delivery_filter,
629  // but not a subscription. It's because there is no sense
630  // to have a delivery_filter for MPSC mbox without having
631  // a subscription.
633  this->m_data.m_mbox_type) &&
634  !this->m_data.m_subscribers.empty() )
635  {
638  "subscriber already exists for MPSC mbox, new "
639  "subscriber can't be added" );
640  }
641  }
642  };
643 
644 } /* namespace details */
645 
646 //
647 // make_mbox
648 //
649 /*!
650  * \brief Create an instance of first_last_subscriber_notification mbox.
651  *
652  * Usage examples:
653  *
654  * Create a MPMC mbox with std::mutex as Lock_Type (this mbox can safely be
655  * used in multi-threaded environments):
656  * \code
657  * namespace mbox_ns = so_5::extra::mboxes::first_last_subscriber_notification;
658  * so_5::environment_t & env = ...;
659  * auto notification_mbox = env.create_mbox();
660  * auto mbox = mbox_ns::make_mbox<my_message>(
661  * env,
662  * notification_mbox,
663  * so_5::mbox_type_t::multi_producer_multi_consumer);
664  * \endcode
665  *
666  * Create a MPSC mbox with std::mutex as Lock_Type (this mbox can safely be
667  * used in multi-threaded environments):
668  * \code
669  * namespace mbox_ns = so_5::extra::mboxes::first_last_subscriber_notification;
670  * so_5::environment_t & env = ...;
671  * auto notification_mbox = env.create_mbox();
672  * auto mbox = mbox_ns::make_mbox<my_message>(
673  * env,
674  * notification_mbox,
675  * so_5::mbox_type_t::multi_producer_single_consumer);
676  * \endcode
677  *
678  * Create a MPMC mbox with so_5::null_mutex_t as Lock_Type (this mbox can only
679  * be used in single-threaded environments):
680  * \code
681  * namespace mbox_ns = so_5::extra::mboxes::first_last_subscriber_notification;
682  * so_5::environment_t & env = ...;
683  * auto notification_mbox = env.create_mbox();
684  * auto mbox = mbox_ns::make_mbox<my_message, so_5::null_mutex_t>(
685  * env,
686  * notification_mbox,
687  * so_5::mbox_type_t::multi_producer_multi_consumer);
688  * \endcode
689  *
690  * \attention
691  * This type of mbox terminates the whole application if an attempt
692  * to send a notification (in form of msg_first_subscriber and msg_last_subscriber
693  * signals) throws.
694  *
695  * \tparam Msg_Type type of message to be used with a new mbox.
696  *
697  * \tparam Lock_Type type of lock to be used for thread safety. It can be
698  * std::mutex or so_5::null_mutex_t (or any other type which can be used
699  * with std::lock_quard).
700  *
701  * \since v.1.5.2
702  */
703 template<
704  typename Msg_Type,
705  typename Lock_Type = std::mutex >
706 [[nodiscard]]
707 mbox_t
709  //! SObjectizer Environment to work in.
710  environment_t & env,
711  //! Mbox for notifications about the first/last subscriber.
713  //! Type of this mbox (MPSC or MPMC).
715  {
716  return env.make_custom_mbox(
718  {
719  mbox_t result;
720 
722  {
723  using T = details::actual_mbox_t<
724  Msg_Type,
725  Lock_Type,
727 
728  result = mbox_t{ new T{
729  data.m_env.get(),
730  data.m_id,
732  mbox_type,
733  data.m_tracer
734  } };
735  }
736  else
737  {
738  using T = details::actual_mbox_t<
739  Msg_Type,
740  Lock_Type,
742  result = mbox_t{ new T{
743  data.m_env.get(),
744  data.m_id,
746  mbox_type
747  } };
748  }
749 
750  return result;
751  } );
752  }
753 
754 //
755 // make_multi_consumer_mbox
756 //
757 /*!
758  * \brief Create an instance of first_last_subscriber_notification MPMC mbox.
759  *
760  * Usage examples:
761  *
762  * Create a MPMC mbox with std::mutex as Lock_Type (this mbox can safely be
763  * used in multi-threaded environments):
764  * \code
765  * namespace mbox_ns = so_5::extra::mboxes::first_last_subscriber_notification;
766  * so_5::environment_t & env = ...;
767  * auto notification_mbox = env.create_mbox();
768  * auto mbox = mbox_ns::make_multi_consumer_mbox<my_message>(
769  * env,
770  * notification_mbox);
771  * \endcode
772  *
773  * \note
774  * It's just a thin wrapper around make_mbox() template function.
775  *
776  * \sa make_mbox
777  *
778  * \since v.1.5.2
779  */
780 template<
781  typename Msg_Type,
782  typename Lock_Type = std::mutex >
783 [[nodiscard]]
784 mbox_t
786  //! SObjectizer Environment to work in.
787  environment_t & env,
788  //! Mbox for notifications about the first/last subscriber.
790 {
791  return make_mbox< Msg_Type, Lock_Type >(
792  env,
795 }
796 
797 //
798 // make_single_consumer_mbox
799 //
800 /*!
801  * \brief Create an instance of first_last_subscriber_notification MPSC mbox.
802  *
803  * Usage examples:
804  *
805  * Create a MPSC mbox with std::mutex as Lock_Type (this mbox can safely be
806  * used in multi-threaded environments):
807  * \code
808  * namespace mbox_ns = so_5::extra::mboxes::first_last_subscriber_notification;
809  * so_5::environment_t & env = ...;
810  * auto notification_mbox = env.create_mbox();
811  * auto mbox = mbox_ns::make_single_consumer_mbox<my_message>(
812  * env,
813  * notification_mbox);
814  * \endcode
815  *
816  * \note
817  * It's just a thin wrapper around make_mbox() template function.
818  *
819  * \sa make_mbox
820  *
821  * \since v.1.5.2
822  */
823 template<
824  typename Msg_Type,
825  typename Lock_Type = std::mutex >
826 [[nodiscard]]
827 mbox_t
829  //! SObjectizer Environment to work in.
830  environment_t & env,
831  //! Mbox for notifications about the first/last subscriber.
833 {
834  return make_mbox< Msg_Type, Lock_Type >(
835  env,
838 }
839 
840 } /* namespace first_last_subscriber_notification */
841 
842 } /* namespace mboxes */
843 
844 } /* namespace extra */
845 
846 } /* namespace so_5 */
mbox_t make_multi_consumer_mbox(environment_t &env, mbox_t notification_mbox)
Create an instance of first_last_subscriber_notification MPMC mbox.
const int rc_subscriber_already_exists_for_mpsc_mbox
An attempt to add a new subscriber for MPSC mbox when another subscriber already exists.
void do_deliver_message(message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const message_ref_t &message, unsigned int redirection_deep) override
void do_deliver_message_to_subscriber(abstract_message_sink_t &subscriber, const subscriber_info_t &subscriber_info, typename Tracing_Base::deliver_op_tracer const &tracer, message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const message_ref_t &message, unsigned int redirection_deep) const
Ranges for error codes of each submodules.
Definition: details.hpp:13
mbox_t make_mbox(environment_t &env, mbox_t notification_mbox, mbox_type_t mbox_type)
Create an instance of first_last_subscriber_notification mbox.
void unsubscribe_event_handler(const std::type_index &msg_type, abstract_message_sink_t &subscriber) noexcept override
const int rc_different_message_type
An attempt to use a message type that differs from mbox&#39;s message type.
void do_deliver_message_impl(typename Tracing_Base::deliver_op_tracer const &tracer, message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const message_ref_t &message, unsigned int redirection_deep)
mbox_t make_single_consumer_mbox(environment_t &env, mbox_t notification_mbox)
Create an instance of first_last_subscriber_notification MPSC mbox.
actual_mbox_t(environment_t &env, mbox_id_t id, mbox_t notification_mbox, mbox_type_t mbox_type, Tracing_Args &&... args)
Initializing constructor.
void drop_delivery_filter(const std::type_index &msg_type, abstract_message_sink_t &subscriber) noexcept override
void set_delivery_filter(const std::type_index &msg_type, const delivery_filter_t &filter, abstract_message_sink_t &subscriber) override
void modify_and_remove_subscriber_if_needed(abstract_message_sink_t &subscriber, Info_Changer changer, Post_Action post_action)
void subscribe_event_handler(const std::type_index &msg_type, abstract_message_sink_t &subscriber) override
static void ensure_expected_msg_type(const std::type_index &msg_type, std::string_view error_description)
void insert_or_modify_subscriber(abstract_message_sink_t &subscriber, Info_Maker maker, Info_Changer changer, Post_Action post_action)
template_independent_mbox_data_t(environment_t &env, mbox_id_t id, mbox_t notification_mbox, mbox_type_t mbox_type)