SObjectizer-5 Extra
inflight_limit.hpp
Go to the documentation of this file.
1 /*!
2  * \file
3  * \brief Implementation of proxy mbox with inflight limit.
4  *
5  * \since v.1.5.2
6  */
7 
8 #pragma once
9 
10 #include <so_5_extra/mboxes/proxy.hpp>
11 
12 #include <so_5_extra/error_ranges.hpp>
13 
14 #include <so_5_extra/enveloped_msg/just_envelope.hpp>
15 
16 #include <so_5/impl/msg_tracing_helpers.hpp>
17 
18 #include <so_5/environment.hpp>
19 
20 #include <atomic>
21 
22 namespace so_5 {
23 
25 
26 /*!
27  * \brief Type to be used for limit and counter of inflight messages.
28  *
29  * \since v.1.5.2
30  */
31 using underlying_counter_t = unsigned int;
32 
33 } /* namespace extra::mboxes::inflight_limit */
34 
35 namespace impl::msg_tracing_helpers::details {
36 
37 // Special extension for inflight_limit specific data.
39 
40 struct limit_info
41  {
42  so_5::extra::mboxes::inflight_limit::underlying_counter_t m_limit;
43  so_5::extra::mboxes::inflight_limit::underlying_counter_t m_current_number;
44  };
45 
46 inline void
48  {
49  s << "[inflight_limit=" << info.m_limit << ",inflight_current="
50  << info.m_current_number << "]";
51  }
52 
53 inline void
55  actual_trace_data_t & /*d*/,
57  {
58  // Nothing to do.
59  }
60 
61 } /* namespace extra_inflight_limit_specifics */
62 
63 } /* namespace impl::msg_tracing_helpers::details */
64 
65 namespace extra {
66 
67 namespace mboxes {
68 
69 namespace inflight_limit {
70 
71 namespace errors {
72 
73 /*!
74  * \brief An attempt to use a message type that differs from mbox's message
75  * type.
76  *
77  * Type of message to be used with inflight_limit_mbox
78  * is fixed as part of inflight_limit_mbox type.
79  * An attempt to use different message type (for subscription, delivery or
80  * setting delivery filter) will lead to an exception with this error code.
81  *
82  * \since v.1.5.2
83  */
86 
87 /*!
88  * \brief Null pointer to underlying mbox.
89  *
90  * A inflight_limit_mbox uses underlying mbox and delegates all actions to that mbox.
91  * Because of that underlying mbox can't be nullptr.
92  *
93  * \since v.1.5.2
94  */
97 
98 } /* namespace impl */
99 
100 namespace impl {
101 
102 /*!
103  * \brief Separate type for holding inflight message counter as a separate object.
104  *
105  * It's expected that an instance of instances_counter_t will be created in
106  * dynamic memory and shared between entities via shared_ptr.
107  *
108  * \since v.1.5.2
109  */
111  {
112  //! Counter of inflight instances.
114  };
115 
116 /*!
117  * \brief An alias for shared_ptr to instances_counter.
118  *
119  * \since v.1.5.2
120  */
122 
123 /*!
124  * \brief Helper class for incrementing/decrementing number of messages in
125  * RAII style.
126  *
127  * An instance always increments the counter in the constructor. The result
128  * value is stored inside counter_incrementer_t instance. That value is available
129  * via value() method.
130  *
131  * The destructor decrements the counter if there weren't a call to
132  * do_not_decrement_in_destructor().
133  *
134  * The intended usage scenario is:
135  *
136  * - create an instance of counter_incrementer_t;
137  * - check the counter via value() method;
138  * - if limit wasn't exceeded then create an appropriate envelope for a message and
139  * call do_not_decrement_in_destructor(). In such a case the envelope will
140  * decrement the number of inflight messages;
141  * - if limit was exceeded then just stop the operation and the destructor of
142  * counter_incrementer_t will decrement number of messages automatically.
143  *
144  * \since v.1.5.2
145  */
147  {
149  const underlying_counter_t m_value;
150 
152 
153  public:
155  outliving_reference_t< instances_counter_t > counter ) noexcept
156  : m_counter{ counter.get() }
157  , m_value{ ++(m_counter.m_instances) }
158  {}
159 
161  {
162  if( m_should_decrement_in_destructor )
163  (m_counter.m_instances)--;
164  }
165 
166  void
168  {
170  }
171 
172  [[nodiscard]]
173  underlying_counter_t
174  value() const noexcept
175  {
176  return m_value;
177  }
178  };
179 
180 /*!
181  * \brief Type of envelope to be used by inflight_limit_mbox.
182  *
183  * \attention
184  * The envelope expects that the number of messages is already incremented before
185  * the creation of the envelope. That number is always decremented in the
186  * destructor.
187  *
188  * \since v.1.5.2
189  */
190 class special_envelope_t final : public so_5::extra::enveloped_msg::just_envelope_t
191  {
192  using base_type_t = so_5::extra::enveloped_msg::just_envelope_t;
193 
195 
196  public:
197  //! Initializing constructor.
199  message_ref_t payload,
200  instances_counter_shptr_t counter )
201  : base_type_t{ std::move(payload) }
202  , m_counter{ std::move(counter) }
203  {}
204 
206  {
207  // Counter should always be decremented because it was
208  // incremented before the creation of envelope instance.
209  (m_counter->m_instances)--;
210  }
211  };
212 
213 /*!
214  * \brief Helper type that tells that underlying mbox isn't nullptr.
215  *
216  * \since v.1.5.2
217  */
219  {
222 
224 
225  not_null_underlying_mbox_t( so_5::mbox_t value )
226  : m_value{ std::move(value) }
227  {}
228 
229  public:
230  [[nodiscard]]
231  const so_5::mbox_t &
232  value() const noexcept { return m_value; }
233  };
234 
235 //! Ensure that underlying mbox is not nullptr.
236 /*!
237  * \throw so_5::exception_t if \a mbox is nullptr.
238  */
239 [[nodiscard]]
242  so_5::mbox_t mbox )
243  {
244  if( !mbox )
245  SO_5_THROW_EXCEPTION(
246  errors::rc_nullptr_as_underlying_mbox,
247  "nullptr is used as underlying mbox" );
248 
249  return { std::move(mbox) };
250  }
251 
252 /*!
253  * \brief Actual implementation of inflight_limit_mbox.
254  *
255  * \tparam Tracing_Base base class with implementation of message
256  * delivery tracing methods.
257  *
258  * \since v.1.5.2
259  */
260 template< typename Tracing_Base >
262  : public so_5::abstract_message_box_t
263  , private Tracing_Base
264  {
265  //! Actual underlying mbox to be used for all calls.
266  /*!
267  * \attention Should not be nullptr.
268  */
270 
271  //! Type of a message for that mbox is created.
273 
274  //! The limit of inflight messages.
275  const underlying_counter_t m_limit;
276 
277  //! Counter for inflight instances.
279 
280  public:
281  /*!
282  * \brief Initializing constructor.
283  *
284  * \tparam Tracing_Args parameters for Tracing_Base constructor
285  * (can be empty list if Tracing_Base have only the default constructor).
286  */
287  template< typename... Tracing_Args >
289  //! Destination mbox.
290  const not_null_underlying_mbox_t & dest_mbox,
291  //! Type of a message for that mbox.
292  std::type_index msg_type,
293  //! The limit of inflight messages.
294  underlying_counter_t limit,
295  //! Optional parameters for Tracing_Base's constructor.
296  Tracing_Args &&... args )
299  , m_msg_type{ std::move(msg_type) }
300  , m_limit{ limit }
302  {}
303 
304  mbox_id_t
305  id() const override
306  {
307  return m_underlying_mbox->id();
308  }
309 
310  void
312  const std::type_index & msg_type,
313  abstract_message_sink_t & subscriber ) override
314  {
316  msg_type,
317  "an attempt to subscribe with different message type" );
318 
320  msg_type, subscriber );
321  }
322 
323  void
325  const std::type_index & msg_type,
326  abstract_message_sink_t & subscriber ) noexcept override
327  {
328  // Since v.1.6.0 we can't throw. So perform the main
329  // action only if types are the same.
330  if( msg_type != m_msg_type )
331  {
333  msg_type, subscriber );
334  }
335  }
336 
337  std::string
338  query_name() const override
339  {
340  return m_underlying_mbox->query_name();
341  }
342 
344  type() const override
345  {
346  return m_underlying_mbox->type();
347  }
348 
349  void
351  message_delivery_mode_t delivery_mode,
352  const std::type_index & msg_type,
353  const message_ref_t & message,
354  unsigned int redirection_deep ) override
355  {
357  msg_type,
358  "an attempt to deliver message of a different message type" );
359 
361  *this, // as Tracing_base
362  *this, // as abstract_message_box_t
363  "deliver_message",
366 
367  // Step 1: increment the counter and check that the limit
368  // isn't exceeded yet.
371  };
372  if( incrementer.value() <= m_limit )
373  {
374  // NOTE: if there will be an exception then the number
375  // of instance will be decremented by incrementer.
378  std::move(message),
380  };
381 
382  // incrementer shouldn't control the number of instances
383  // anymore.
385 
386  // Our envelope object has to be sent.
389  msg_type,
390  our_envelope,
392  }
393  else
394  {
395  using namespace so_5::impl::msg_tracing_helpers::details::
397 
399  "too_many_inflight_messages",
401  }
402  }
403 
404  void
406  const std::type_index & msg_type,
407  const delivery_filter_t & filter,
408  abstract_message_sink_t & subscriber ) override
409  {
411  msg_type,
412  "an attempt to set delivery_filter for different "
413  "message type" );
414 
416  msg_type,
417  filter,
418  subscriber );
419  }
420 
421  void
423  const std::type_index & msg_type,
424  abstract_message_sink_t & subscriber ) noexcept override
425  {
426  // Because drop_delivery_filter is noexcept we just ignore
427  // an errornous call with a different message type.
428  if( msg_type == m_msg_type )
429  {
431  msg_type,
432  subscriber );
433  }
434  }
435 
437  environment() const noexcept override
438  {
439  return m_underlying_mbox->environment();
440  }
441 
442  private:
443  /*!
444  * Throws an error if msg_type differs from expected message type.
445  */
446  void
448  const std::type_index & msg_type,
449  std::string_view error_description ) const
450  {
451  if( msg_type != m_msg_type )
454  //FIXME: we have to create std::string object because
455  //so_5::exception_t::raise expects std::string.
456  //This should be fixed after resolving:
457  //https://github.com/Stiffstream/sobjectizer/issues/46
459  }
460  };
461 
462 /*!
463  * \brief Check for compatibility between mbox type and message type.
464  *
465  * Throws if mutable message is used with MPMC mbox.
466  *
467  * \since v.1.5.2
468  */
469 template< typename Msg_Type >
470 void
472  //! NOTE: it's expected to be not-null.
473  const so_5::mbox_t & underlying_mbox )
474  {
475  // Use of mutable message type for MPMC mbox should be prohibited.
476  if constexpr( is_mutable_message< Msg_Type >::value )
477  {
478  switch( underlying_mbox->type() )
479  {
483  "an attempt to make MPMC mbox for mutable message, "
484  "msg_type=" + std::string(typeid(Msg_Type).name()) );
485  break;
486 
488  break;
489  }
490  }
491  }
492 
493 } /* namespace impl */
494 
495 /*!
496  * \brief Create an instance of inflight_limit_mbox.
497  *
498  * Usage example:
499  *
500  * \code
501  * namespace mbox_ns = so_5::extra::mboxes::inflight_limit;
502  *
503  * so_5::environment_t & env = ...;
504  *
505  * // Create an inflight_limit_mbox with underlying MPMC mbox for immutable message.
506  * auto my_mbox = mbox_ns::make_mbox<my_msg>(env.create_mbox(), 15u);
507  *
508  * // Create an inflight_limit_mbox with underlying MPSC mbox for mutable message.
509  * class demo_agent : public so_5::agent_t
510  * {
511  * const so_5::mbox_t my_mbox_;
512  * public:
513  * demo_agent(context_t ctx)
514  * : so_5::agent_t{std::move(ctx)}
515  * , my_mbox{ mbox_ns::make_mbox< so_5::mutable_msg<my_msg> >(so_direct_mbox(), 4u) }
516  * {...}
517  * ...
518  * };
519  * \endcode
520  *
521  * \tparam Msg_Type type of message to be used with a new mbox.
522  *
523  * \since v.1.5.2
524  */
525 template< typename Msg_Type >
526 [[nodiscard]]
527 mbox_t
529  //! Actual destination mbox.
531  //! The limit of inflight messages.
533  {
535  std::move(dest_mbox) );
536 
537  // Use of mutable message type for MPMC mbox should be prohibited.
540 
541  auto & env = underlying_mbox.value()->environment();
542 
543  return env.make_custom_mbox(
545  {
546  mbox_t result;
547 
549  {
550  using T = impl::actual_mbox_t<
552 
553  result = mbox_t{ new T{
557  data.m_tracer
558  } };
559  }
560  else
561  {
562  using T = impl::actual_mbox_t<
564 
565  result = mbox_t{ new T{
569  } };
570  }
571 
572  return result;
573  } );
574  }
575 
576 } /* namespace inflight_limit */
577 
578 } /* namespace mboxes */
579 
580 } /* namespace extra */
581 
582 } /* namespace so_5 */
special_envelope_t(message_ref_t payload, instances_counter_shptr_t counter)
Initializing constructor.
void fill_trace_data_1(actual_trace_data_t &, extra_inflight_limit_specifics::limit_info)
so_5::extra::mboxes::inflight_limit::underlying_counter_t m_limit
void set_delivery_filter(const std::type_index &msg_type, const delivery_filter_t &filter, abstract_message_sink_t &subscriber) override
void ensure_expected_msg_type(const std::type_index &msg_type, std::string_view error_description) const
const int rc_nullptr_as_underlying_mbox
Null pointer to underlying mbox.
mbox_t make_mbox(mbox_t dest_mbox, underlying_counter_t inflight_limit)
Create an instance of inflight_limit_mbox.
const int rc_different_message_type
An attempt to use a message type that differs from mbox&#39;s message type.
counter_incrementer_t(outliving_reference_t< instances_counter_t > counter) noexcept
std::atomic< underlying_counter_t > m_instances
Counter of inflight instances.
Separate type for holding inflight message counter as a separate object.
Ranges for error codes of each submodules.
Definition: details.hpp:13
void do_deliver_message(message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const message_ref_t &message, unsigned int redirection_deep) override
so_5::environment_t & environment() const noexcept override
const std::type_index m_msg_type
Type of a message for that mbox is created.
actual_mbox_t(const not_null_underlying_mbox_t &dest_mbox, std::type_index msg_type, underlying_counter_t limit, Tracing_Args &&... args)
Initializing constructor.
Helper class for incrementing/decrementing number of messages in RAII style.
not_null_underlying_mbox_t ensure_underlying_mbox_not_null(so_5::mbox_t mbox)
Ensure that underlying mbox is not nullptr.
so_5::extra::mboxes::inflight_limit::underlying_counter_t m_current_number
instances_counter_shptr_t m_instances_counter
Counter for inflight instances.
void ensure_valid_message_type_for_underlying_mbox(const so_5::mbox_t &underlying_mbox)
Check for compatibility between mbox type and message type.
void unsubscribe_event_handler(const std::type_index &msg_type, abstract_message_sink_t &subscriber) noexcept override
void subscribe_event_handler(const std::type_index &msg_type, abstract_message_sink_t &subscriber) override
void drop_delivery_filter(const std::type_index &msg_type, abstract_message_sink_t &subscriber) noexcept override
Helper type that tells that underlying mbox isn&#39;t nullptr.
const underlying_counter_t m_limit
The limit of inflight messages.
void make_trace_to_1(std::ostream &s, extra_inflight_limit_specifics::limit_info info)
const so_5::mbox_t m_underlying_mbox
Actual underlying mbox to be used for all calls.
Actual implementation of inflight_limit_mbox.