SObjectizer-5 Extra
retained_msg.hpp
Go to the documentation of this file.
1 /*!
2  * \file
3  * \brief Implementation of mbox which holds last sent message.
4  *
5  * \since
6  * v.1.0.3
7  */
8 
9 #pragma once
10 
11 #include <so_5/version.hpp>
12 
13 #if SO_5_VERSION < SO_5_VERSION_MAKE(5u, 8u, 0u)
14 #error "SObjectizer-5.8.0 of newest is required"
15 #endif
16 
17 #include <so_5_extra/error_ranges.hpp>
18 
19 #include <so_5/impl/msg_tracing_helpers.hpp>
20 #include <so_5/impl/local_mbox_basic_subscription_info.hpp>
21 
22 #include <so_5/details/sync_helpers.hpp>
23 
24 #include <so_5/mbox.hpp>
25 
26 #include <memory>
27 
28 namespace so_5 {
29 
30 namespace extra {
31 
32 namespace mboxes {
33 
34 namespace retained_msg {
35 
36 namespace errors {
37 
38 // Note: this namespace is empty for now.
39 
40 } /* namespace errors */
41 
42 namespace details {
43 
44 /*!
45  * \brief A helper type which is a collection of type parameters.
46  *
47  * This type is used to simplify code of retained_msg_mbox internals.
48  * Instead of writting something like:
49  * \code
50  * template< typename Traits >
51  * class ... {...};
52  *
53  * template< typename Traits, typename Lock_Type >
54  * class ... {...};
55  * \endcode
56  * this config_type allows to write like that:
57  * \code
58  * template< typename Config_Type >
59  * class ... {...};
60  *
61  * template< typename Config_Type >
62  * class ... {...};
63  * \endcode
64  *
65  * \tparam Traits traits type to be used.
66  *
67  * \tparam Lock_Type type of object to be used for thread-safety (like
68  * std::mutex or so_5::null_mutex_t).
69  *
70  * \since
71  * v.1.0.3
72  */
73 template<
74  typename Traits,
75  typename Lock_Type >
77  {
78  using traits_type = Traits;
79  using lock_type = Lock_Type;
80  };
81 
82 /*!
83  * \name Type extractors for config_type
84  * \{
85  */
86 template< typename Config_Type >
87 using traits_t = typename Config_Type::traits_type;
88 
89 template< typename Config_Type >
90 using lock_t = typename Config_Type::lock_type;
91 /*!
92  * \}
93  */
94 
95 /*!
96  * \brief An information block about one subscriber.
97  *
98  * \since v.1.0.3, v.1.5.1
99  */
100 using subscriber_info_t =
102 
103 //
104 // messages_table_item_t
105 //
106 /*!
107  * \brief A type of item of message table for retained message mbox.
108  *
109  * For each message type is necessary to store:
110  * - a list of subscriber for that message;
111  * - the last message sent.
112  *
113  * This type is intended to be used as a container for such data.
114  *
115  * \since
116  * v.1.0.3
117  */
119  {
120  //! A special coparator for sinks with respect to
121  //! sinks's priority.
123  {
124  bool operator()(
125  abstract_message_sink_t * a,
126  abstract_message_sink_t * b ) const noexcept
127  {
128  return abstract_message_sink_t::special_sink_ptr_compare( a, b );
129  }
130  };
131 
132  //! Type of subscribers map.
133  using subscribers_map_t = std::map<
137  >;
138 
139  //! Subscribers.
140  /*!
141  * Can be empty. This is for case when the first message was sent
142  * when there is no subscribers yet.
143  */
145 
146  //! Retained message.
147  /*!
148  * Can be nullptr. It means that there is no any attempts to send
149  * a message of this type.
150  */
152  };
153 
154 //
155 // template_independent_mbox_data_t
156 //
157 /*!
158  * \brief A mixin with actual data which is necessary for implementation
159  * of retained mbox.
160  *
161  * This data type doesn't depend on any template parameters.
162  *
163  * \since
164  * v.1.0.3
165  */
167  {
168  //! SObjectizer Environment to work in.
170 
171  //! ID of the mbox.
173 
174  //! Type of messages table.
175  using messages_table_t =
177 
178  //! Table of current subscriptions and messages.
180 
182  environment_t & env,
183  mbox_id_t id )
184  : m_env{ env }
185  , m_id{id}
186  {}
187  };
188 
189 //
190 // actual_mbox_t
191 //
192 
193 /*!
194  * \brief An actual implementation of retained message mbox.
195  *
196  * \tparam Config type with main definitions for this message box type.
197  *
198  * \tparam Tracing_Base base class with implementation of message
199  * delivery tracing methods.
200  *
201  * \since
202  * v.1.0.3
203  */
204 template<
205  typename Config,
206  typename Tracing_Base >
208  : public abstract_message_box_t
209  , private Tracing_Base
210  {
211  public:
212  /*!
213  * \brief Initializing constructor.
214  *
215  * \tparam Tracing_Args parameters for Tracing_Base constructor
216  * (can be empty list if Tracing_Base have only the default constructor).
217  */
218  template< typename... Tracing_Args >
220  //! SObjectizer Environment to work in.
221  environment_t & env,
222  //! ID of this mbox.
223  mbox_id_t id,
224  //! Optional parameters for Tracing_Base's constructor.
225  Tracing_Args &&... args )
227  , m_data{ env, id }
228  {}
229 
230  mbox_id_t
231  id() const override
232  {
233  return this->m_data.m_id;
234  }
235 
236  void
238  const std::type_index & msg_type,
239  abstract_message_sink_t & subscriber ) override
240  {
242  msg_type,
243  subscriber,
244  [&] {
245  return subscriber_info_t{
247  };
248  },
249  [&]( subscriber_info_t & info ) {
251  } );
252  }
253 
254  void
256  const std::type_index & msg_type,
257  abstract_message_sink_t & subscriber ) noexcept override
258  {
260  msg_type,
261  subscriber,
262  []( subscriber_info_t & info ) {
264  } );
265  }
266 
267  std::string
268  query_name() const override
269  {
271  s << "<mbox:type=RETAINED_MPMC:id=" << this->m_data.m_id << ">";
272 
273  return s.str();
274  }
275 
277  type() const override
278  {
280  }
281 
282  void
284  message_delivery_mode_t delivery_mode,
285  const std::type_index & msg_type,
286  const message_ref_t & message,
287  unsigned int redirection_deep ) override
288  {
290  *this, // as Tracing_base
291  *this, // as abstract_message_box_t
292  "deliver_message",
295 
297 
299  tracer,
301  msg_type,
302  message,
304  }
305 
306  void
308  const std::type_index & msg_type,
309  const delivery_filter_t & filter,
310  abstract_message_sink_t & subscriber ) override
311  {
313  msg_type,
314  subscriber,
315  [&] {
316  return subscriber_info_t{ filter };
317  },
318  [&]( subscriber_info_t & info ) {
320  } );
321  }
322 
323  void
325  const std::type_index & msg_type,
326  abstract_message_sink_t & subscriber ) noexcept override
327  {
329  msg_type,
330  subscriber,
331  []( subscriber_info_t & info ) {
332  info.drop_filter();
333  } );
334  }
335 
337  environment() const noexcept override
338  {
339  return this->m_data.m_env;
340  }
341 
342  private :
343  //! Data of this message mbox.
345 
346  //! Object lock.
348 
349  template< typename Info_Maker, typename Info_Changer >
350  void
352  const std::type_index & msg_type,
353  abstract_message_sink_t & subscriber,
354  Info_Maker maker,
355  Info_Changer changer )
356  {
358 
359  // If there is no item for this message type it will be
360  // created automatically.
361  auto & table_item = this->m_data.m_messages_table[ msg_type ];
362 
366  // There is no subscriber yet. It must be added.
369  else
370  // Subscriber is known. It must be updated.
372 
373  // If there is a retained message then delivery attempt
374  // must be performed.
375  // NOTE: an exception at this stage doesn't remove new subscription.
378  msg_type,
380  subscriber,
382  }
383 
384  template< typename Info_Changer >
385  void
387  const std::type_index & msg_type,
388  abstract_message_sink_t & subscriber,
389  Info_Changer changer )
390  {
392 
394  if( it_table_item != this->m_data.m_messages_table.end() )
395  {
396  auto & table_item = it_table_item->second;
397 
401  {
402  // Subscriber is found and must be modified.
404 
405  // If info about subscriber becomes empty after
406  // modification then subscriber info must be removed.
407  if( it_subscriber->second.empty() )
409  }
410  }
411  }
412 
413  void
415  typename Tracing_Base::deliver_op_tracer const & tracer,
416  message_delivery_mode_t delivery_mode,
417  const std::type_index & msg_type,
418  const message_ref_t & message,
419  unsigned int redirection_deep )
420  {
422 
423  // If there is no item for this message type it will be
424  // created automatically.
425  auto & table_item = this->m_data.m_messages_table[ msg_type ];
426 
427  // Message must be stored as retained.
429 
431  if( !subscribers.empty() )
432  for( const auto & kv : subscribers )
434  *(kv.first),
435  kv.second,
436  tracer,
438  msg_type,
439  message,
441  else
443  }
444 
445  void
447  abstract_message_sink_t & subscriber,
448  const subscriber_info_t & subscriber_info,
449  typename Tracing_Base::deliver_op_tracer const & tracer,
450  message_delivery_mode_t delivery_mode,
451  const std::type_index & msg_type,
452  const message_ref_t & message,
453  unsigned int redirection_deep ) const
454  {
455  const auto delivery_status =
457  subscriber,
458  message,
459  []( const message_ref_t & msg ) -> message_t & {
460  return *msg;
461  } );
462 
464  {
465  using namespace so_5::message_limit::impl;
466 
468  this->id(),
470  msg_type,
471  message,
474  }
475  else
478  }
479 
480  /*!
481  * \brief An attempt to deliver retained message to the new subscriber.
482  *
483  * This attempt will be performed only if there is the retained message.
484  */
485  void
487  const std::type_index & msg_type,
488  const message_ref_t & retained_msg,
489  abstract_message_sink_t & subscriber,
490  const subscriber_info_t & subscriber_info )
491  {
492  if( retained_msg )
493  {
494  const unsigned int redirection_deep = 0;
495 
497  *this, // as Tracing_base
498  *this, // as abstract_message_box_t
499  "deliver_message_on_subscription",
501  msg_type,
502  retained_msg,
504 
506  subscriber,
508  tracer,
510  msg_type,
511  retained_msg,
513  }
514  }
515 
516  /*!
517  * \brief Ensures that message is an immutable message.
518  *
519  * Checks mutability flag and throws an exception if message is
520  * a mutable one.
521  */
522  void
524  const std::type_index & msg_type,
525  const message_ref_t & what ) const
526  {
531  "an attempt to deliver mutable message via MPMC mbox"
532  ", msg_type=" + std::string(msg_type.name()) );
533  }
534  };
535 
536 } /* namespace details */
537 
538 //
539 //
540 // default_traits_t
541 //
542 /*!
543  * \brief Default traits for retained message mbox.
544  */
546 
547 //
548 // make_mbox
549 //
550 /*!
551  * \brief Create an instance of retained message mbox.
552  *
553  * Simple usage example:
554  * \code
555  * so_5::environment_t & env = ...;
556  * const so_5::mbox_t retained_mbox = so_5::extra::mboxes::retained_msg::make_mbox<>(env);
557  * so_5::send<Some_Message>(retained_mbox, ...);
558  * \endcode
559  * An instance of default implementation retained message mbox will be created.
560  * This instance will be protected by std::mutex.
561  *
562  * If you want to use retained_mbox in a single-threaded environment
563  * without a multithreaded protection then so_5::null_mutex_t (or any
564  * similar null-mutex implementation) can be used:
565  * \code
566  * so_5::environment_t & env = ...
567  * const so_5::mbox_t retained_mbox =
568  * so_5::extra::mboxes::retained_msg::make_mbox<
569  * so_5::extra::mboxes::retained_msg::default_traits_t,
570  * so_5::null_mutex_t>(env);
571  * so_5::send<Some_Message>(retained_mbox, ...);
572  * \endcode
573  *
574  * If you want to use your own mutex-like object (with interface which
575  * allows to use your mutex-like class with std::lock_guard) then you can
576  * do it similar way:
577  * \code
578  * so_5::environment_t & env = ...
579  * const so_5::mbox_t retained_mbox =
580  * so_5::extra::mboxes::retained_msg::make_mbox<
581  * so_5::extra::mboxes::retained_msg::default_traits_t,
582  * Your_Own_Mutex_Class>(env);
583  * so_5::send<Some_Message>(retained_mbox, ...);
584  * \endcode
585  *
586  * \tparam Traits type with traits of mbox implementation.
587  *
588  * \tparam Lock_Type a type of mutex to be used for protection of
589  * retained message mbox content. This must be a DefaultConstructible
590  * type with interface which allows to use Lock_Type with std::lock_guard.
591  *
592  * \since
593  * v.1.0.3
594  */
595 template<
596  typename Traits = default_traits_t,
597  typename Lock_Type = std::mutex >
598 mbox_t
600  {
602 
603  return env.make_custom_mbox(
604  []( const mbox_creation_data_t & data )
605  {
606  mbox_t result;
607 
609  {
610  using T = details::actual_mbox_t<
611  config_type,
613 
614  result = mbox_t{ new T{
616  }
617  };
618  }
619  else
620  {
621  using T = details::actual_mbox_t<
622  config_type,
624  result = mbox_t{ new T{ data.m_env.get(), data.m_id } };
625  }
626 
627  return result;
628  } );
629  }
630 
631 } /* namespace retained_msg */
632 
633 } /* namespace mboxes */
634 
635 } /* namespace extra */
636 
637 } /* namespace so_5 */
void set_delivery_filter(const std::type_index &msg_type, const delivery_filter_t &filter, abstract_message_sink_t &subscriber) override
A type of item of message table for retained message mbox.
mbox_t make_mbox(environment_t &env)
Create an instance of retained message mbox.
void try_deliver_retained_message_to(const std::type_index &msg_type, const message_ref_t &retained_msg, abstract_message_sink_t &subscriber, const subscriber_info_t &subscriber_info)
An attempt to deliver retained message to the new subscriber.
template_independent_mbox_data_t m_data
Data of this message mbox.
void do_deliver_message_to_subscriber(abstract_message_sink_t &subscriber, const subscriber_info_t &subscriber_info, typename Tracing_Base::deliver_op_tracer const &tracer, message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const message_ref_t &message, unsigned int redirection_deep) const
bool operator()(abstract_message_sink_t *a, abstract_message_sink_t *b) const noexcept
Ranges for error codes of each submodules.
Definition: details.hpp:13
so_5::environment_t & environment() const noexcept override
A mixin with actual data which is necessary for implementation of retained mbox.
messages_table_t m_messages_table
Table of current subscriptions and messages.
An actual implementation of retained message mbox.
void insert_or_modify_subscriber(const std::type_index &msg_type, abstract_message_sink_t &subscriber, Info_Maker maker, Info_Changer changer)
void modify_and_remove_subscriber_if_needed(const std::type_index &msg_type, abstract_message_sink_t &subscriber, Info_Changer changer)
A helper type which is a collection of type parameters.
Default traits for retained message mbox.
void ensure_immutable_message(const std::type_index &msg_type, const message_ref_t &what) const
Ensures that message is an immutable message.
void do_deliver_message(message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const message_ref_t &message, unsigned int redirection_deep) override
void drop_delivery_filter(const std::type_index &msg_type, abstract_message_sink_t &subscriber) noexcept override
void unsubscribe_event_handler(const std::type_index &msg_type, abstract_message_sink_t &subscriber) noexcept override
void subscribe_event_handler(const std::type_index &msg_type, abstract_message_sink_t &subscriber) override
void do_deliver_message_impl(typename Tracing_Base::deliver_op_tracer const &tracer, message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const message_ref_t &message, unsigned int redirection_deep)
A special coparator for sinks with respect to sinks&#39;s priority.
actual_mbox_t(environment_t &env, mbox_id_t id, Tracing_Args &&... args)
Initializing constructor.
environment_t & m_env
SObjectizer Environment to work in.