AAX SDK  2.4.1
Avid Audio Extensions Development Kit
AAX_CAtomicQueue.h
Go to the documentation of this file.
1 /*================================================================================================*/
2 /*
3  * Copyright 2015 by Avid Technology, Inc.
4  * All rights reserved.
5  *
6  * CONFIDENTIAL: This document contains confidential information. Do not
7  * read or examine this document unless you are an Avid Technology employee
8  * or have signed a non-disclosure agreement with Avid Technology which protects
9  * the confidentiality of this document. DO NOT DISCLOSE ANY INFORMATION
10  * CONTAINED IN THIS DOCUMENT TO ANY THIRD-PARTY WITHOUT THE PRIOR WRITTEN CONSENT
11  * OF Avid Technology, INC.
12  *
13  */
14 
21 /*================================================================================================*/
23 #ifndef AAX_CATOMICQUEUE_H
24 #define AAX_CATOMICQUEUE_H
26 
27 
28 // AAX Includes
29 #include "AAX_IPointerQueue.h"
30 #include "AAX_Atomic.h"
31 #include "AAX_CMutex.h"
32 
33 // Standard Includes
34 #include <cstring>
35 
36 
54 template <typename T, size_t S>
56 {
57 public:
58  virtual ~AAX_CAtomicQueue() {}
60 
61 public:
62  static const size_t template_size = S;
63 
66 
67 public: // AAX_IContainer
68  virtual void Clear();
69 
70 public: // AAX_IPointerQueue
72  virtual value_type Pop();
73  virtual value_type Peek() const;
74 
75 private:
76  AAX_CMutex mMutex;
77  uint32_t mReadIdx;
78  uint32_t mWriteIdx;
79  value_type mRingBuffer[S];
80 };
81 
82 
84 
85 template <typename T, size_t S>
87 : AAX_IPointerQueue<T>()
88 , mMutex()
89 , mReadIdx(0)
90 , mWriteIdx(0)
91 {
92  Clear();
93 }
94 
95 template <typename T, size_t S>
97 {
98  std::memset((void*)mRingBuffer, 0x0, sizeof(mRingBuffer));
99 }
100 
101 template <typename T, size_t S>
103 {
104  if (NULL == inElem)
105  {
107  }
108 
110 
111  AAX_StLock_Guard guard(mMutex);
112  //
113  // Possible failure case without mutex is because of several write threads try to modify
114  // mWriteIdx concurrently
115  //
116  // Example:
117  //
118  // -
119  // Notation:
120  // First number - write thread number
121  // Second number - value number
122  // 1/15 - 1st thread that write number 15
123  //
124  // -
125  // Queue may look like this:
126  // mReadIdx
127  // |
128  // |..... | 4/3 | 4/4 | 1/4 | 1/5 | 2/7 | 2/8 | 2/9 | .....|
129  // |
130  // mWriteIdx
131  // place# | 0 | 1 | 2 | 3 | 4 | 5 | 6 | .....|
132  //
133  // -
134  // Possible operation order (w stands for mWriteIdx, r - for mReadIdx):
135  //-------------------------------------------------------
136  // thread#| action | write index value | mWriteIdx |
137  // | | internal variable | |
138  //-------------------------------------------------------
139  // 5 | w++ | 2 | 2 |
140  //-------------------------------------------------------
141  // 6 | w++ | 3 | 3 |
142  //-------------------------------------------------------
143  // 5 | false | - | 2not=3 => 2--=1 |
144  //-------------------------------------------------------
145  // read | r++ | - | - |
146  //-------------------------------------------------------
147  // read | r++ | - | - |
148  //-------------------------------------------------------
149  // -
150  // Queue state:
151  // mReadIdx
152  // |
153  // |..... | 4/3 | 0 | 0 | 1/5 | 2/7 | 2/8 | 2/9 | .....|
154  // |
155  // mWriteIdx
156  // place# | 0 | 1 | 2 | 3 | 4 | 5 | 6 | .....|
157  //
158  // -
159  //-------------------------------------------------------
160  // 6 | false | - | 3not=1 => 3--=2 | // place 3 is still not empty to write
161  //-------------------------------------------------------
162  //
163  // -
164  // Now, some other thread (5, for example) can successfully write
165  // it's value to queue and move mWriteIdx forward:
166  //
167  // -
168  // Queue state:
169  // mReadIdx
170  // |
171  // |..... | 4/3 | 0 | 5/1 | 1/5 | 2/7 | 2/8 | 2/9 | .....|
172  // |
173  // mWriteIdx
174  // place# | 0 | 1 | 2 | 3 | 4 | 5 | 6 | .....|
175  //
176  // -
177  // Thus, we have one place with NULL value left. In the next round mReadIdx will
178  // stuck on place #1 (queue thinks that it's empty) until one of the write threads
179  // will write the value into place #1. It could be thread #5, so we have:
180  //
181  // -
182  // Queue state:
183  // mReadIdx
184  // |
185  // |..... | 9/1 | 5/9 | 5/1 | 1/5 | 2/7 | 2/8 | 2/9 | .....|
186  // |
187  // mWriteIdx
188  // place# | 0 | 1 | 2 | 3 | 4 | 5 | 6 | .....|
189  //
190  // -
191  // And we will read 5/9 before 5/1
192  //
193  //
194  // Note that read/write both begin at index 1
195  const uint32_t idx = AAX_Atomic_IncThenGet_32(mWriteIdx);
196  const uint32_t widx = idx % S;
197 
198  // Do the push. If the value at the current write index is non-NULL then we have filled the buffer.
199  const bool cxResult = AAX_Atomic_CompareAndExchange_Pointer(mRingBuffer[widx], (value_type)0x0, inElem);
200 
201  if (false == cxResult)
202  {
204 
205  const uint32_t ridx = (0 == idx) ? S : idx-1;
206 
207  // Note the write index has already been incremented, so in the event of an overflow we must
208  // return the write index to its previous location.
209  //
210  // Note: if multiple write threads encounter concurrent push overflows then the write pointer
211  // will not be fully decremented back to the overflow location, and the read index will need
212  // to increment multiple positions to clear the overflow state.
213 // const bool resetResult = AAX_Atomic_CompareAndExchange_32(mWriteIdx, idx, ridx);
214  AAX_Atomic_CompareAndExchange_32(mWriteIdx, idx, ridx);
215 
216 // printf("AAX_CAtomicQueue: overflow - reset: %s, idx: %lu, widx: %lu, inElem: %p\n",
217 // resetResult ? "yes" : " no",
218 // (unsigned long)idx,
219 // (unsigned long)widx,
220 // inElem);
221  }
222  else
223  {
225 
226  // Handle wraparound
227  //
228  // There may be multiple write threads pushing elements at the same time, so we use
229  // (wrapped index < raw index) instead of (raw index == boundary)
230  //
231  // This assumes overhead between S and UINT_32_MAX of at least as many elements as
232  // there are write threads.
233 
234  bool exchResult = false;
235  if (widx < idx)
236  {
237  exchResult = AAX_Atomic_CompareAndExchange_32(mWriteIdx, idx, widx);
238  }
239 
240 // printf("AAX_CAtomicQueue: pushed - reset: %s, idx: %lu, widx: %lu, inElem: %p\n",
241 // (widx < idx) ? exchResult ? "yes" : " no" : "n/a",
242 // (unsigned long)idx,
243 // (unsigned long)widx,
244 // inElem);
245  }
246 
247  return result;
248 }
249 
250 template <typename T, size_t S>
252 {
253  // Note that read/write both begin at index 1
254  mReadIdx = (mReadIdx+1) % template_size;
255  value_type const val = AAX_Atomic_Exchange_Pointer(mRingBuffer[mReadIdx], (value_type)0x0);
256 
257 // printf("AAX_CAtomicQueue: popped - reset: %s, idx: %lu, val: %p\n",
258 // (0x0 == val) ? "yes" : " no",
259 // (unsigned long)mReadIdx,
260 // val);
261 
262  if (0x0 == val)
263  {
264  // If the value is NULL then no value has yet been written to this location. Decrement the read index
265  --mReadIdx; // No need to handle wraparound since the read index will be incremented before the next read
266  }
267 
268  return val;
269 }
270 
271 template <typename T, size_t S>
273 {
274  // I don't think that we need a memory barrier here because:
275  // a) mReadIdx will only be modified from the read thread, and therefore presumably
276  // using the same CPU (or at least I can't see any way for mReadIndex modification
277  // ordering to be a problem between Peek() and Pop() on a single thread.)
278  // b) We don't care if mRingBuffer modifications are run out of order between the read
279  // and write threads, as long as they are "close".
280  const uint32_t testIdx = (mReadIdx+1) % template_size;
281  return AAX_Atomic_Load_Pointer(&mRingBuffer[testIdx]);
282 }
283 
284 // Attempt to support multiple read threads
285 //
286 // This approach is broken in the following scenario:
287 //
288 // Thread | Operation
289 // A Pop v enter
290 // A Pop - increment/get read index (get 1)
291 // A Pop - exchange pointer (get 0x0)
292 // other Push ptr1
293 // other Push ptr2
294 // B Pop v enter
295 // B Pop - increment/get read index (get 2)
296 // B Pop - exchange pointer (get ptr2)
297 // ERROR: popped ptr2 before ptr1
298 // B Pop ^ exit
299 // A Pop - decrement read index (set 1)
300 // A Pop ^ exit
301 // any Pop v enter
302 // any Pop - increment/get read index (get 2)
303 // any Pop - exchange pointer (get 0x0)
304 // ERROR: should be ptr2
305 // This NULL state continues for further Pop calls until either Push wraps around
306 // or another pair of concurrent calls to Pop just happens to re-aligign the read
307 // index by incrementing twice before any reads occur
308 // any Pop - decrement read index (set 1)
309 // any Pop ^ exit
310 //
311 // This could be fixed by incrementing the read index until either a non-NULL value is found or
312 // the initial position is reached, but that would have terrible performance.
313 //
314 // In any case, assuming a single read thread is optimal when we want maximum performance for read
315 // operations, since this requires the fewest number of atomic operations in the read methods
316 /*
317 template <typename T, size_t S>
318 inline typename AAX_CAtomicQueue<T, S>::value_type AAX_CAtomicQueue<T, S>::Pop()
319 {
320  const uint32_t idx = AAX_Atomic_IncThenGet_32(mReadIdx);
321  const uint32_t widx = idx % S;
322 
323  value_type const val = AAX_Atomic_Exchange_Pointer(mRingBuffer[widx], (value_type)0x0);
324 
325  if (0x0 == val)
326  {
327  // If the value is NULL then no value has yet been written to this location. Decrement the read index
328  AAX_Atomic_DecThenGet_32(mReadIdx);
329  }
330  else
331  {
332  // Handle wraparound (assumes some overhead between S and UINT_32_MAX)
333  if (widx < idx)
334  {
335  AAX_Atomic_CompareAndExchange_32(mReadIdx, idx, widx);
336  }
337  }
338 
339  return val;
340 }
341  */
342 
344 
346 #endif /* defined(AAX_CATOMICQUEUE_H) */
Atomic operation utilities.
TPointer *AAX_CALLBACK AAX_Atomic_Exchange_Pointer(TPointer *&ioValue, TPointer *inExchangeValue)
Perform an exchange operation on a pointer value.
Definition: AAX_Atomic.h:53
uint32_t AAX_CALLBACK AAX_Atomic_IncThenGet_32(uint32_t &ioData)
Increments a 32-bit value and returns the result.
bool AAX_CALLBACK AAX_Atomic_CompareAndExchange_32(volatile uint32_t &ioValue, uint32_t inCompareValue, uint32_t inExchangeValue)
Perform a compare and exchange operation on a 32-bit value.
bool AAX_CALLBACK AAX_Atomic_CompareAndExchange_Pointer(TPointer *&ioValue, TPointer *inCompareValue, TPointer *inExchangeValue)
Perform a compare and exchange operation on a pointer value.
Definition: AAX_Atomic.h:77
TPointer *AAX_CALLBACK AAX_Atomic_Load_Pointer(TPointer const *const volatile *inValue)
Atomically loads a pointer value.
Abstract interface for a basic FIFO queue of pointers-to-objects.
Definition: AAX_CAtomicQueue.h:56
virtual value_type Peek() const
virtual value_type Pop()
virtual AAX_IContainer::EStatus Push(value_type inElem)
static const size_t template_size
The size used for this template instance.
Definition: AAX_CAtomicQueue.h:62
virtual ~AAX_CAtomicQueue()
Definition: AAX_CAtomicQueue.h:58
virtual void Clear()
AAX_IPointerQueue< T >::template_type template_type
The type used for this template instance.
Definition: AAX_CAtomicQueue.h:64
AAX_IPointerQueue< T >::value_type value_type
The type of values stored in this queue.
Definition: AAX_CAtomicQueue.h:65
Mutex with try lock functionality.
Definition: AAX_CMutex.h:30
Helper class for working with mutex.
Definition: AAX_CMutex.h:50
EStatus
Definition: AAX_IContainer.h:38
@ eStatus_Unsupported
Operation is unsupported.
Definition: AAX_IContainer.h:43
@ eStatus_Unavailable
An internal resource was not available.
Definition: AAX_IContainer.h:42
@ eStatus_Overflow
Internal buffer overflow.
Definition: AAX_IContainer.h:40
@ eStatus_Success
Operation succeeded.
Definition: AAX_IContainer.h:39
Definition: AAX_IPointerQueue.h:35
T * value_type
The type of values stored in this queue.
Definition: AAX_IPointerQueue.h:41
T template_type
The type used for this template instance.
Definition: AAX_IPointerQueue.h:40