SObjectizer-5 Extra
round_robin.hpp
Go to the documentation of this file.
1 /*!
2  * \file
3  * \brief Implementation of round-robin mbox.
4  */
5 
6 #pragma once
7 
8 #include <so_5_extra/error_ranges.hpp>
9 
10 #include <so_5/impl/msg_tracing_helpers.hpp>
11 
12 namespace so_5 {
13 
14 namespace extra {
15 
16 namespace mboxes {
17 
18 namespace round_robin {
19 
20 namespace errors {
21 
22 /*!
23  * \brief An attempt to set delivery filter to round_robin mbox.
24  *
25  * \since
26  * v.1.0.1
27  */
30 
31 } /* namespace errors */
32 
33 namespace details {
34 
35 //
36 // subscriber_info_t
37 //
38 
39 /*!
40  * \brief An information block about one subscriber.
41  */
43 {
44  //! Subscriber.
46 
47  //! Constructor for the case when subscriber info is being
48  //! created during event subscription.
50  so_5::abstract_message_sink_t & sink )
51  : m_sink( sink )
52  {}
53 };
54 
55 //
56 // subscriber_container_t
57 //
58 
59 /*!
60  * \brief Type of container for holding subscribers for one message type.
61  */
63  {
64  public :
66 
67  bool
68  empty() const noexcept
69  {
70  return m_subscribers.empty();
71  }
72 
73  void
75  so_5::abstract_message_sink_t & sink )
76  {
77  m_subscribers.emplace_back( sink );
78  }
79 
81  end() noexcept
82  {
83  return m_subscribers.end();
84  }
85 
88  {
89  return std::find_if( std::begin(m_subscribers), std::end(m_subscribers),
90  [&]( const auto & info ) {
91  return std::addressof( info.m_sink.get() ) ==
92  std::addressof( sink );
93  } );
94  }
95 
96  void
97  erase( storage_t::iterator it ) noexcept
98  {
99  m_subscribers.erase(it);
101  }
102 
103  const subscriber_info_t &
104  current_subscriber() const noexcept
105  {
106  return m_subscribers[ m_current_subscriber ];
107  }
108 
109  void
111  {
112  ++m_current_subscriber;
114  }
115 
116  private :
119 
120  void
122  {
123  if( m_current_subscriber >= m_subscribers.size() )
124  m_current_subscriber = 0;
125  }
126  };
127 
128 //
129 // data_t
130 //
131 
132 /*!
133  * \brief Common part of round-robin mbox implementation.
134  *
135  * This part depends only on Lock type but not on tracing facilities.
136  *
137  * \tparam Lock type of lock object to be used.
138  */
139 template< typename Lock >
140 struct data_t
141  {
142  //! Initializing constructor.
144  environment_t & env,
145  mbox_id_t id )
146  : m_env{ env }
147  , m_id{ id }
148  {}
149 
150  //! SObjectizer Environment to work in.
152 
153  //! ID of this mbox.
155 
156  //! Object lock.
157  Lock m_lock;
158 
159  /*!
160  * \brief Map from message type to subscribers.
161  */
162  using messages_table_t = std::map<
163  std::type_index,
165 
166  //! Map of subscribers to messages.
168  };
169 
170 //
171 // mbox_template_t
172 //
173 
174 //! A template with implementation of round-robin mbox.
175 /*!
176  * \tparam Lock_Type type of lock to be used for thread safety.
177  * \tparam Tracing_Base base class with implementation of message
178  * delivery tracing methods. Expected to be tracing_enabled_base or
179  * tracing_disabled_base from so_5::impl::msg_tracing_helpers namespace.
180  */
181 template<
182  typename Lock_Type,
183  typename Tracing_Base >
185  : public abstract_message_box_t
186  , private data_t< Lock_Type >
187  , private Tracing_Base
188  {
189  using data_type = data_t< Lock_Type >;
190 
191  public:
192  //! Initializing constructor.
193  template< typename... Tracing_Args >
195  //! SObjectizer Environment to work in.
196  environment_t & env,
197  //! ID of this mbox.
198  mbox_id_t id,
199  //! Optional parameters for Tracing_Base's constructor.
200  Tracing_Args &&... args )
201  : data_type{ env, id }
203  {}
204 
205  mbox_id_t
206  id() const override
207  {
208  return this->m_id;
209  }
210 
211  void
213  const std::type_index & msg_type,
214  so_5::abstract_message_sink_t & subscriber ) override
215  {
216  std::lock_guard< Lock_Type > lock( this->m_lock );
217 
218  auto it = this->m_subscribers.find( msg_type );
219  if( it == this->m_subscribers.end() )
220  {
221  // There isn't such message type yet.
224 
226  }
227  else
228  {
229  auto & sinks = it->second;
230 
231  auto pos = sinks.find( subscriber );
232  if( pos == sinks.end() )
233  // There is no subscriber in the container.
234  // It must be added.
236  }
237  }
238 
239  void
241  const std::type_index & msg_type,
242  so_5::abstract_message_sink_t & subscriber ) noexcept override
243  {
244  std::lock_guard< Lock_Type > lock( this->m_lock );
245 
246  auto it = this->m_subscribers.find( msg_type );
247  if( it != this->m_subscribers.end() )
248  {
249  auto & sinks = it->second;
250 
251  auto pos = sinks.find( subscriber );
252  if( pos != sinks.end() )
253  {
254  sinks.erase( pos );
255  }
256 
257  if( sinks.empty() )
258  this->m_subscribers.erase( it );
259  }
260  }
261 
262  std::string
263  query_name() const override
264  {
266  s << "<mbox:type=RRMPSC:id=" << this->m_id << ">";
267 
268  return s.str();
269  }
270 
272  type() const override
273  {
275  }
276 
277  void
279  so_5::message_delivery_mode_t delivery_mode,
280  const std::type_index & msg_type,
281  const so_5::message_ref_t & message,
282  unsigned int redirection_deep ) override
283  {
285  *this, // as Tracing_Base
286  *this, // as abstract_message_box_t
287  "deliver_message",
290 
292  tracer,
294  msg_type,
295  message,
297  }
298 
299  void
301  const std::type_index & /*msg_type*/,
302  const so_5::delivery_filter_t & /*filter*/,
303  so_5::abstract_message_sink_t & /*subscriber*/ ) override
304  {
305  using namespace so_5::extra::mboxes::round_robin::errors;
306 
309  "set_delivery_filter is called for round_robin-mbox" );
310  }
311 
312  void
314  const std::type_index & /*msg_type*/,
315  so_5::abstract_message_sink_t & /*subscriber*/ ) noexcept override
316  {
317  // Nothing to do.
318  }
319 
321  environment() const noexcept override
322  {
323  return this->m_env;
324  }
325 
326  private :
327  void
329  typename Tracing_Base::deliver_op_tracer const & tracer,
330  so_5::message_delivery_mode_t delivery_mode,
331  const std::type_index & msg_type,
332  const so_5::message_ref_t & message,
333  unsigned int redirection_deep )
334  {
335  std::lock_guard< Lock_Type > lock( this->m_lock );
336 
337  auto it = this->m_subscribers.find( msg_type );
338  if( it != this->m_subscribers.end() )
339  {
340  const auto & subscriber_info = it->second.current_subscriber();
342 
345  tracer,
347  msg_type,
348  message,
350  }
351  else
353  }
354 
355  void
357  const subscriber_info_t & subscriber_info,
358  typename Tracing_Base::deliver_op_tracer const & tracer,
359  so_5::message_delivery_mode_t delivery_mode,
360  const std::type_index & msg_type,
361  const so_5::message_ref_t & message,
362  unsigned int redirection_deep )
363  {
364  using namespace so_5::message_limit::impl;
365 
367  this->m_id,
369  msg_type,
370  message,
373  }
374  };
375 
376 } /* namespace details */
377 
378 //
379 // make_mbox
380 //
381 /*!
382  * \brief Create an implementation of round-robin mbox.
383  *
384  * Usage example:
385  * \code
386  so_5::environment_t & env = ...;
387  const so_5::mbox_t rrmbox = so_5::extra::mboxes::round_robin::make_mbox<>( env );
388  ...
389  so_5::send< some_message >( rrmbox, ... );
390  * \endcode
391  *
392  * \tparam Lock_Type type of lock to be used for thread safety.
393  */
394 template< typename Lock_Type = std::mutex >
395 mbox_t
397  {
398  return env.make_custom_mbox(
399  []( const mbox_creation_data_t & data ) {
400  mbox_t result;
401 
403  {
404  using T = details::mbox_template_t<
405  Lock_Type,
407 
409  data.m_env.get(),
410  data.m_id,
411  data.m_tracer )
412  };
413  }
414  else
415  {
416  using T = details::mbox_template_t<
417  Lock_Type,
419 
421  data.m_env.get(),
422  data.m_id )
423  };
424  }
425 
426  return result;
427  } );
428  }
429 
430 } /* namespace round_robin */
431 
432 } /* namespace mboxes */
433 
434 } /* namespace extra */
435 
436 } /* namespace so_5 */
storage_t::iterator find(so_5::abstract_message_sink_t &sink) noexcept
Definition: round_robin.hpp:87
void emplace_back(so_5::abstract_message_sink_t &sink)
Definition: round_robin.hpp:74
environment_t & m_env
SObjectizer Environment to work in.
void do_deliver_message_to_subscriber(const subscriber_info_t &subscriber_info, typename Tracing_Base::deliver_op_tracer const &tracer, so_5::message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const so_5::message_ref_t &message, unsigned int redirection_deep)
void unsubscribe_event_handler(const std::type_index &msg_type, so_5::abstract_message_sink_t &subscriber) noexcept override
const subscriber_info_t & current_subscriber() const noexcept
Ranges for error codes of each submodules.
Definition: details.hpp:13
void subscribe_event_handler(const std::type_index &msg_type, so_5::abstract_message_sink_t &subscriber) override
void drop_delivery_filter(const std::type_index &, so_5::abstract_message_sink_t &) noexcept override
A template with implementation of round-robin mbox.
subscriber_info_t(so_5::abstract_message_sink_t &sink)
Constructor for the case when subscriber info is being created during event subscription.
Definition: round_robin.hpp:49
so_5::environment_t & environment() const noexcept override
mbox_t make_mbox(environment_t &env)
Create an implementation of round-robin mbox.
Common part of round-robin mbox implementation.
Type of container for holding subscribers for one message type.
Definition: round_robin.hpp:62
const int rc_delivery_filter_cannot_be_used_on_round_robin_mbox
An attempt to set delivery filter to round_robin mbox.
Definition: round_robin.hpp:28
std::reference_wrapper< so_5::abstract_message_sink_t > m_sink
Subscriber.
Definition: round_robin.hpp:45
void do_deliver_message(so_5::message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const so_5::message_ref_t &message, unsigned int redirection_deep) override
messages_table_t m_subscribers
Map of subscribers to messages.
data_t(environment_t &env, mbox_id_t id)
Initializing constructor.
mbox_template_t(environment_t &env, mbox_id_t id, Tracing_Args &&... args)
Initializing constructor.
void set_delivery_filter(const std::type_index &, const so_5::delivery_filter_t &, so_5::abstract_message_sink_t &) override
void do_deliver_message_impl(typename Tracing_Base::deliver_op_tracer const &tracer, so_5::message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const so_5::message_ref_t &message, unsigned int redirection_deep)
An information block about one subscriber.
Definition: round_robin.hpp:42