发动态

没有新消息

更多内容

#Java面试题库#写一个生产者消费者模式

咔啡

  生产者消费者问题是一个多线程同步问题的经典案例,大多数多线程编程问题都是以生产者-消费者模式为基础,扩展衍生来的。在生产者消费者模式中,缓冲区起到了连接两个模块的作用:生产者把数据放入缓冲区,而消费者从缓冲区取出数据,如下图所示:      可以看出Buffer缓冲区作为一个中介,将生产者和消费者分开,使得两部分相对独立,生产者消费者不需要知道对方的实现逻辑,对其中一个的修改,不会影响另一个,从设计模式的角度看,降低了耦合度。而对于图中处在多线程环境中Buffer,需要共享给多个多个生产者和消费者,为了保证读写数据和操作的正确性与时序性,程序需要对Buffer结构进行同步处理。通常情况下,生产者-消费者模式中的广泛使用的Buffer缓冲区结构是阻塞队列。   阻塞队列的特点为:“当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。试图从空的阻塞队列中获取元素的线程将会被阻 塞,直到其他的线程往空的队列插入新的元素。同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从 队列中移除一个或者多个元素,或者完全清空队列。” 阻塞队列的实现通常是对普通队列进行同步控制,封装起来。一个linux下的通用队列定义如下,代码所示的队列没有为满的情况,不设定队列元素总数的上限: 1 template<class T> 2 class BlockQueue 3 { 4 public: 5 BlockQueue(); 6 ~BlockQueue(); 7 8 void Add(T *pData); 9 T *Get(int timeout=0); 10 int Count(); 11 void SetBlockFlag(); 12 private: 13 typedef struct _Node 14 { 15 T * m_pData; 16 struct _Node * m_pNext; 17 } Node; 18 Node * m_pHead; //队列头 19 Node * m_pTail; //队列尾 20 int m_nCount; //队列中元素个数 21 22 bool m_bBlockFlag; //是否阻塞 23 CCond m_condQueue; 24 }; 25 26 template<class T> 27 BlockQueue<T>::BlockQueue():m_pHead(NULL), m_pTail(NULL), m_nCount(0), m_bBlockFlag(false) 28 { 29 } 30 31 template<class T> 32 void BlockQueue<T>::SetBlockFlag() 33 { 34 m_bBlockFlag = true; 35 } 36 37 template<class T> 38 BlockQueue<T>::~BlockQueue() 39 { 40 Node *pTmp = NULL; 41 while (m_pHead != NULL) 42 { 43 pTmp = m_pHead; 44 m_pHead = m_pHead->m_pNext; 45 delete pTmp; 46 pTmp = NULL; 47 } 48 m_pTail = NULL; 49 } 50 51 template<class T> 52 void BlockQueue<T>::Add(T *pData) 53 { 54 (m_condQueue.GetMutex()).EnterMutex(); 55 56 Node *pTmp = new Node; 57 pTmp->m_pData = pData; 58 pTmp->m_pNext = NULL; 59 60 if (m_nCount == 0) 61 { 62 m_pHead = pTmp; 63 m_pTail = pTmp; 64 } 65 else 66 { 67 m_pTail->m_pNext = pTmp; 68 m_pTail = pTmp; 69 } 70 m_nCount++; 71 72 if (m_bBlockFlag) 73 { 74 m_condQueue.Signal(); 75 } 76 77 (m_condQueue.GetMutex()).LeaveMutex(); 78 } 79 80 template<class T> 81 T *BlockQueue<T>::Get(int timeout) 82 { 83 (m_condQueue.GetMutex()).EnterMutex(); 84 85 while (m_nCount == 0) 86 { 87 if (!m_bBlockFlag) 88 { 89 (m_condQueue.GetMutex()).LeaveMutex(); 90 return NULL; 91 } 92 else 93 { 94 if (m_condQueue.WaitNoLock(timeout) == 1) 95 { 96 (m_condQueue.GetMutex()).LeaveMutex(); 97 return NULL; 98 } 99 } 100 } 101 102 T * pTmpData = m_pHead->m_pData; 103 Node *pTmp = m_pHead; 104 105 m_pHead = m_pHead->m_pNext; 106 delete pTmp; 107 pTmp = NULL; 108 m_nCount--; 109 if (m_nCount == 0) 110 { 111 m_pTail = NULL; 112 } 113 114 (m_condQueue.GetMutex()).LeaveMutex(); 115 116 return pTmpData; 117 } 118 119 template<class T> 120 int BlockQueue<T>::Count() 121 { 122 (m_condQueue.GetMutex()).EnterMutex(); 123 int nCount = m_nCount; 124 (m_condQueue.GetMutex()).LeaveMutex(); 125 126 return nCount; 127 }   队列中使用了Cond条件变量,实现多线程环境中队列的同步与阻塞: 条件变量自己持有mutex,在向队列中Add/Get元素时,mutex将队列锁住,同一时刻只允许一个线程对队列进行操作; 消费者从队列中获取数据时,如果使用SetBlockFlag函数设置了阻塞模式,那么当队列元素为空时,需要阻塞在条件变量上进行等待,其他线程调用Add函数,向队列中添加元素后,唤醒阻塞在条件变量上的线程。注意这里用到的等待条件是while(count == 0)而不是if(count == 0),因为在多核处理器环境中,Signal唤醒操作可能会激活多于一个线程(阻塞在条件变量上的线程),使得多个调用等待的线程返回。所以用while循环对condition多次判断,可以避免这种假唤醒。 队列元素为包含T类型指针的Node类型指针,析构时只负责释放队列中的Node元素,而真正指向T类型数据的指针,需要调用者进行分配和释放,一般由生产者调用new分配,消费者使用后调用delete释放。这里需要特别注意,因为如果忘记释放,会造成内存泄露。   这段代码是我在多线程环境下,较为简单的生产者-消费者模式中,在通用的队列模型进行的修改。目前队列在服务器中稳定运行,还未出现太大的性能问题,当然这与应用模块逻辑较为简单、交易量较小有很大关。此版本中,BlockQueue的Add函数每次调用都是指向Signal操作,唤醒阻塞在环境变量上线程,如果系统中有大量的写线程,而读线程的操作只有少数,那么很多情况下Signal调用都是无意义的,系统将其忽略。如何使Signal操作变得有意义,即每次Signal操作都会唤醒阻塞线程,最初的想法是在m_nCount == 0时即队列为空时进行调用,根据这个想法修改Add函数如下: 1 template<class T> 2 void BlockQueue<T>::Add(T *pData) 3 { 4 (m_condQueue.GetMutex()).EnterMutex(); 5 6 Node *pTmp = new Node; 7 pTmp->m_pData = pData; 8 pTmp->m_pNext = NULL; 9 10 bool bEmpty = false; 11 if (m_nCount == 0) 12 { 13 m_pHead = pTmp; 14 m_pTail = pTmp; 15 bEmpty = true; 16 } 17 else 18 { 19 m_pTail->m_pNext = pTmp; 20 m_pTail = pTmp; 21 } 22 m_nCount++; 23 24 if (m_bBlockFlag && bEmpty) 25 { 26 m_condQueue.Signal(); 27 } 28 29 (m_condQueue.GetMutex()).LeaveMutex(); 30 }   目前来看这样处理没什么逻辑问题,很快发现这样做还是会有一些Signal操作时无意义的,比如当读线程读取到队列中最后一个元素时,m_nCount--,值为零,那么此时写线程调用Add函数,因为m_nCount==0,调用了Signal操作,而这时如果没有读线程在阻塞,那么这个操作仍然会被系统忽略。仔细想想,这里不能根据m_nCount的值来判断是否有线程在等待,而应该以是否有队列阻塞来进行Signal的调用。这里可以用一个计数器来表示当前阻塞在Get操作上的线程个数,只有当它大于零时,才说明需要激活,根据这个条件,继续修改代码,下面只展示修改后与之前相比的差异代码: 1 template<class T> 2 class BlockQueue 3 { 4 public: 5 /*与之前代码相同*/ 6 private: 7 /*与之前代码相同*/ 8 int m_nCount; 9 int m_nBlockCnt; //阻塞线程数 10 bool m_bBlockFlag; //是否阻塞 11 CCond m_condQueue; 12 }; 13 14 template<class T> 15 BlockQueue<T>::BlockQueue():m_pHead(NULL), m_pTail(NULL), m_nCount(0), m_nBlockCnt(0), m_bBlockFlag(false) 16 {} 17 18 template<class T> 19 void BlockQueue<T>::Add(T *pData) 20 { 21 (m_condQueue.GetMutex()).EnterMutex(); 22 23 Node *pTmp = new Node; 24 pTmp->m_pData = pData; 25 pTmp->m_pNext = NULL; 26 27 if (m_nCount == 0) 28 { 29 m_pHead = pTmp; 30 m_pTail = pTmp; 31 bEmpty = true; 32 } 33 else 34 { 35 m_pTail->m_pNext = pTmp; 36 m_pTail = pTmp; 37 } 38 m_nCount++; 39 40 if (m_bBlockFlag && m_nBlockCnt>0) //阻塞模式,并且有线程等待时调用Signal进行唤醒 41 { 42 m_condQueue.Signal(); 43 } 44 45 (m_condQueue.GetMutex()).LeaveMutex(); 46 } 47 48 template<class T> 49 T *BlockQueue<T>::Get(int timeout) 50 { 51 (m_condQueue.GetMutex()).EnterMutex(); 52 53 while (m_nCount == 0) 54 { 55 if (!m_bBlockFlag) 56 { 57 (m_condQueue.GetMutex()).LeaveMutex(); 58 return NULL; 59 } 60 else 61 { 62 m_nBlockCnt++; //线程阻塞数增加 63 if (m_condQueue.WaitNoLock(timeout) == 1) 64 { 65 (m_condQueue.GetMutex()).LeaveMutex(); 66 m_nBlockCnt--; 67 return NULL; 68 } 69 m_nBlockCnt--; //唤醒后线程阻塞数减少 70 } 71 } 72 73 T * pTmpData = m_pHead->m_pData; 74 Node *pTmp = m_pHead; 75 76 m_pHead = m_pHead->m_pNext; 77 delete pTmp; 78 pTmp = NULL; 79 m_nCount--; 80 if (m_nCount == 0) 81 { 82 m_pTail = NULL; 83 } 84 85 (m_condQueue.GetMutex()).LeaveMutex(); 86 87 return pTmpData; 88 }    现在代码已经修改完成,继续分析可以看出几乎对每个函数的操作都进行了加锁保护,临界区从始到末,粒度较大;再者,队列中持有的T对象指针,均是由调用者动态分配和释放的,而内部也是有链表实现,Add和Get操作时,会调用链表元素的内存分配和释放。如此一来,在交易量很大的情况下,频繁读写,不仅会导致加锁解锁的性能消耗,也会使系统产生大量内存碎片。

6 赞+1
0
评论

0 条评论

暂无评论,快来写下您的评论

热门职位推荐
热门公司推荐