inlineypipe_t() { // Insert terminator element into the queue. queue.push(); //yqueue_t的尾指针加1,开始back_chunk为空,现在back_chunk指向第一个chunk_t块的第一个位置
// Let all the pointers to point to the terminator. // (unless pipe is dead, in which case c is set to NULL). r = w = f = &queue.back(); //就是让r、w、f、c四个指针都指向这个end迭代器 c.set(&queue.back()); }
write状态变化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
// 写入数据,incomplete参数表示写入是否还没完成,在没完成的时候不会修改flush指针,即这部分数据不会让读线程看到。 inlinevoidwrite(const T &value_, bool incomplete_) { // Place the value to the queue, add new terminator element. queue.back() = value_; queue.push();
// Move the "flush up to here" poiter. if (!incomplete_) { f = &queue.back(); // 记录要刷新的位置 false 更新f // printf("1 f:%p, w:%p\n", f, w); } else { // printf("0 f:%p, w:%p\n", f, w); } }
// 刷新所有已经完成的数据到管道,返回false意味着读线程在休眠,在这种情况下调用者需要唤醒读线程。 // 批量刷新的机制, 写入批量后唤醒读线程; // 反悔机制 unwrite inlineboolflush() { // If there are no un-flushed items, do nothing. if (w == f) // 不需要刷新,即是还没有新元素加入 returntrue;
// Try to set 'c' to 'f'. // c何时为NULL?read时如果没有数据可以读取则c的值会被置为NULL,此时必然不会==w /* cas函数,原子操作,线程安全的,将ptr(调用者)与cmp_(第一个参数)进行比较: 相等:把ptr设置尾val_(第二个参数)的值,返回ptr设置之前的值 不相等:直接返回ptr的值 */ if (c.cas(w, f) != w) // 尝试将c设置为f,即是准备更新w的位置 {
// Compare-and-swap was unseccessful because 'c' is NULL. // This means that the reader is asleep. Therefore we don't // care about thread-safeness and update c in non-atomic // manner. We'll return false to let the caller know // that reader is sleeping. c.set(f); // 更新w的位置 w = f; // 注意这里就是flush返回false的逻辑,所以返不返回false仅和c的状态有关系,所以c就是用来控制唤醒逻辑的变量 returnfalse; //线程看到flush返回false之后会发送一个消息给读线程,这需要写业务去做处理 } else// 读端还有数据可读取 { // Reader is alive. Nothing special to do now. Just move // the 'first un-flushed item' pointer to 'f'. w = f; // 只需要更新w的位置 returntrue; } }
// Check whether item is available for reading. // 这里面有两个点,一个是检查是否有数据可读,一个是预取 inlineboolcheck_read() { // Was the value prefetched already? If so, return. if (&queue.front() != r && r) //判断是否在前几次调用read函数时已经预取数据了return true; returntrue;
// There's no prefetched value, so let us prefetch more values. // Prefetching is to simply retrieve the // pointer from c in atomic fashion. If there are no // items to prefetch, set c to NULL (using compare-and-swap). // 两种情况 // 1. 如果c值和queue.front()相等,此时还没有写入数据, 返回c值并将c值置为NULL,此时没有数据可读 // 2. 如果c值和queue.front()不相等, 返回c值,此时可能有数据度的去 r = c.cas(&queue.front(), NULL); //尝试预取数据
// If there are no elements prefetched, exit. // During pipe's lifetime r should never be NULL, however, // it can happen during pipe shutdown when items are being deallocated. if (&queue.front() == r || !r) //判断是否成功预取数据 returnfalse;
// There was at least one value prefetched. returntrue; }
// Reads an item from the pipe. Returns false if there is no value. // available. inlineboolread(T *value_) { // Try to prefetch a value. if (!check_read()) returnfalse;
// There was at least one value prefetched. // Return it to the caller. *value_ = queue.front(); queue.pop(); returntrue; }
template <typename ELEM_T, QUEUE_INT Q_SIZE> bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::enqueue(const ELEM_T &a_data) { QUEUE_INT currentWriteIndex; // 获取写指针的位置 QUEUE_INT currentReadIndex; // 1. 获取可写入的位置 do { currentWriteIndex = m_writeIndex; currentReadIndex = m_readIndex; if(countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) { returnfalse; // 队列已经满了 } // 目的是为了获取一个能写入的位置,因为存在多个线程请求写入位置,所以同一个位置不能被多个线程同时获得,通过cas去判断 /* 比如线程1: currentWriteIndex = 0, currentReadIndex =0 线程2: currentWriteIndex = 0, currentReadIndex =0 此时就只有一个线程能CAS更新成功,比如此时线程1正常更新后,m_writeIndex=1(被设置,实际是(currentWriteIndex+1)=0+1=1),更新成功返回true,线程1退出循环 此时线程2 去做CAS比如的时候,变成了m_writeIndex=1和currentWriteIndex=0比如,需要返回50行更新新的currentWriteIndex = m_writeIndex = 1 */ // CAS(a_ptr, a_oldVal, a_newVal) // 如果 a_ptr == a_oldVal 则 更新为 a_ptr = a_newVal,并返回true // 如果 a_ptr != a_oldVal 则 直接返回false } while(!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex+1))); // 获取写入位置后 currentWriteIndex 是一个临时变量,保存我们写入的位置 // We know now that this index is reserved for us. Use it to save the data m_thequeue[countToIndex(currentWriteIndex)] = a_data; // 把数据更新到对应的位置
// 2. 更新可读的位置,按着m_maximumReadIndex+1的操作, 这里要更新可读的位置,同样可能存在多线程来操作,这里让让给currentWriteIndex顺序的写入 // update the maximum read index after saving the data. It wouldn't fail if there is only one thread // inserting in the queue. It might fail if there are more than 1 producer threads because this // operation has to be done in the same order as the previous CAS while(!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) // 更新可读取的位置 { // this is a good place to yield the thread in case there are more // software threads than hardware processors and you have more // than 1 producer thread // have a look at sched_yield (POSIX.1b) sched_yield(); // 当线程超过cpu核数的时候如果不让出cpu导致一直循环在此。 }
do { // to ensure thread-safety when there is more than 1 producer thread // a second index is defined (m_maximumReadIndex) currentReadIndex = m_readIndex; currentMaximumReadIndex = m_maximumReadIndex;
if(countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex)) // 如果不为空,获取到读索引的位置 { // the queue is empty or // a producer thread has allocate space in the queue but is // waiting to commit the data into it returnfalse; } // retrieve the data from the queue a_data = m_thequeue[countToIndex(currentReadIndex)]; // 从临时位置读取的
// try to perfrom now the CAS operation on the read index. If we succeed // a_data already contains what m_readIndex pointed to before we // increased it if(CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) { AtomicSub(&m_count, 1); // 真正读取到了数据,元素-1 returntrue; } } while(true);
assert(0); // Add this return statement to avoid compiler warnings returnfalse;