SObjectizer 5.8
Loading...
Searching...
No Matches
mpsc_queue_traits/pub.cpp
Go to the documentation of this file.
1/*
2 * SObjectizer 5
3 */
4
5/*!
6 * \since
7 * v.5.5.10
8 *
9 * \file
10 * \brief Various traits for MPSC queues.
11 */
12
13#include <so_5/disp/mpsc_queue_traits/pub.hpp>
14
15#include <so_5/spinlocks.hpp>
16
17#include <so_5/details/invoke_noexcept_code.hpp>
18
19#include <mutex>
20#include <condition_variable>
21#include <iostream>
22
23namespace so_5 {
24
25namespace disp {
26
27namespace mpsc_queue_traits {
28
29namespace impl {
30
31//
32// combined_lock_t
33//
34/*!
35 * \since
36 * v.5.5.10
37 *
38 * \brief A special combined lock for queue protection.
39 *
40 * This lock used spinlocks for efficiency and std::mutex and
41 * std::condition_variable for signalization.
42 *
43 * \attention This lock can be used only for single-consumer queues!
44 * It is because there is no way found to implement notify_all on
45 * just two int variables (m_waiting and m_signaled).
46 */
47class combined_lock_t : public lock_t
48 {
49 public :
50 inline
52 //! Max waiting time for waiting on spinlock before switching to mutex.
53 std::chrono::high_resolution_clock::duration waiting_time )
54 : m_waiting_time{ waiting_time }
55 , m_waiting( false )
56 , m_signaled( false )
57 {}
58
59 virtual void
60 lock() noexcept override
61 {
63 }
64
65 virtual void
66 unlock() noexcept override
67 {
69 }
70
71 protected :
72 virtual void
73 wait_for_notify() noexcept override
74 {
75 using clock = std::chrono::high_resolution_clock;
76
77 m_waiting = true;
78 auto stop_point = clock::now() + m_waiting_time;
79
80 do
81 {
83
84 std::this_thread::yield();
85
87
88 if( m_signaled )
89 {
90 m_waiting = false;
91 m_signaled = false;
92 return;
93 }
94 }
95 while( stop_point > clock::now() );
96
97 // m_lock is locked now.
98
99 // Must use heavy std::mutex and std::condition_variable
100 // to allow OS to efficiently use the resources while
101 // we are waiting for signal.
102 std::unique_lock< std::mutex > mlock( m_mutex );
103
105
106 m_condition.wait( mlock, [this]{ return m_signaled; } );
107
108 // At this point m_signaled must be 'true'.
109
111
112 m_waiting = false;
113 m_signaled = false;
114 }
115
116 //! Notify one waiting thread if it exists.
117 /*!
118 * \attention Must be called only when object is locked.
119 */
120 virtual void
121 notify_one() noexcept override
122 {
123 if( m_waiting )
124 {
125 // There is a waiting thread.
126 m_mutex.lock();
127 m_signaled = true;
128 m_condition.notify_one();
129 m_mutex.unlock();
130 }
131 }
132
133 private :
134 const std::chrono::high_resolution_clock::duration m_waiting_time;
135
136 default_spinlock_t m_spinlock;
137
138 std::mutex m_mutex;
139 std::condition_variable m_condition;
140
143 };
144
145//
146// simple_lock_t
147//
148/*!
149 * \since
150 * v.5.5.10
151 *
152 * \brief A very simple lock based on usage of std::mutex and
153 * std::condition_variable.
154 */
155class simple_lock_t : public lock_t
156 {
157 public :
158 virtual void
159 lock() noexcept override
160 {
161 m_mutex.lock();
162 }
163
164 virtual void
165 unlock() noexcept override
166 {
167 m_mutex.unlock();
168 }
169
170 protected :
171 virtual void
172 wait_for_notify() noexcept override
173 {
175 // Mutex already locked. We must not try to reacquire it.
176 std::unique_lock< std::mutex > mlock{ m_mutex, std::adopt_lock };
177 m_condition.wait( mlock, [this]{ return m_signaled; } );
178 mlock.release();
179 } );
180
181 // At this point m_signaled must be 'true'.
182 m_signaled = false;
183 }
184
185 virtual void
186 notify_one() noexcept override
187 {
188 m_signaled = true;
189 m_condition.notify_one();
190 }
191
192 private :
193 std::mutex m_mutex;
194 std::condition_variable m_condition;
195
196 bool m_signaled = { false };
197 };
198
199} /* namespace impl */
200
201//
202// combined_lock_factory
203//
204SO_5_FUNC lock_factory_t
206 std::chrono::high_resolution_clock::duration waiting_time )
207 {
208 return [waiting_time] {
209 return lock_unique_ptr_t{ new impl::combined_lock_t{ waiting_time } };
210 };
211 }
212
213//
214// simple_lock_factory
215//
216SO_5_FUNC lock_factory_t
218 {
219 return [] { return lock_unique_ptr_t{ new impl::simple_lock_t{} }; };
220 }
221
222} /* namespace mpsc_queue_traits */
223
224} /* namespace disp */
225
226} /* namespace so_5 */
A special combined lock for queue protection.
virtual void unlock() noexcept override
Unlock object locked in exclusive mode.
virtual void wait_for_notify() noexcept override
Waiting for nofication.
combined_lock_t(std::chrono::high_resolution_clock::duration waiting_time)
virtual void notify_one() noexcept override
Notify one waiting thread if it exists.
const std::chrono::high_resolution_clock::duration m_waiting_time
virtual void lock() noexcept override
Lock object in exclusive mode.
A very simple lock based on usage of std::mutex and std::condition_variable.
virtual void lock() noexcept override
Lock object in exclusive mode.
virtual void wait_for_notify() noexcept override
Waiting for nofication.
virtual void unlock() noexcept override
Unlock object locked in exclusive mode.
virtual void notify_one() noexcept override
Notify one waiting thread if it exists.
An interface for lock for MPSC queue.
void lock()
Lock object.
void unlock()
Unlock object.
#define SO_5_FUNC
Definition declspec.hpp:48
Some reusable and low-level classes/functions which can be used in public header files.
auto invoke_noexcept_code(L lambda) noexcept -> decltype(lambda())
Implementation details for MPSC event queue stuff.
Various stuff related to MPSC event queue implementation and tuning.
SO_5_FUNC lock_factory_t simple_lock_factory()
Factory for creation of very simple implementation based on usage of mutex and condition_variable onl...
SO_5_FUNC lock_factory_t combined_lock_factory(std::chrono::high_resolution_clock::duration waiting_time)
Factory for creation of combined queue lock with the specified waiting time.
Event dispatchers.
Private part of message limit implementation.
Definition agent.cpp:33