Implement Windows Message Pump In C++
文/刘景仰
最近在阅读一些C++中高级程序员的代码,发现一些程序员虽然对Windows下多线程编程有一些了解,但还是应用的并不是特别到位。我思考了一下产生这些问题的根源,原因估计有二:其一,程序员对多线程同步对象的了解并不是很深刻;其二,程序员在系统设计初期并没有十分认真地去规划一个系统,去分清楚哪些objects是临界区,哪些objects该上临界锁,该如何上锁。当然,多线程程序设计本身是一个复杂的过程;即使一个经验丰富的程序设计师也不敢打包票说他一定能设计好某个复杂的并发系统。对此,我想把我的一些个人经验分享出来,希望对一些正在学习中的朋友有所帮助。
Windows Message Pump的机制相信大家都很熟悉了。简单地说,一个窗口对象,他所接收的消息来自五湖四海,有用户对界面的操作,有系统内部发向该Window的Notification,可能还有来自网络的一些事件,还有其他线程发送来的事件。值得注意的是这些消息可能来自多个线程,可能并发同时到达该Window。如何处理,Windows给出了一个Message Pump的消息处理机制。该机制在Window的主线程上做一个死循环,不断地去消息泵里去检查是否有消息到达,如果有消息抵达该窗口对象,则取出该消息,交付窗口过程(Window Process)去做处理。
设计一个Windows Message Pump对一个程序员来说应该不是难事,难的是如何去设计一个好的Message Pump。我不敢说我给出的设计是最好的,但我敢说它是可以满足一些应用的需求的,它是相对稳定与高效的。
代码很简短,约400行。有以下的类:
- CriticalSection 封装 Windows CRICITAL_SECTION 用户对象
- Event 封装Windows EVENT 内核对象
- AutoLock 封装CriticalSection的操作,让它进入一个Scope的时候自动加锁,离开一个Scope的时候
自动解锁
- MessageHandler 封装对消息的操作,你可以继承之to meet your need。
- SimpleQueue 封装队列操作。不采用STL的原因是,一方面,我不想再引入STL库,另一方面,我不想要
过多小粒度的内存申请/释放操作,避免产生内存碎片影响效率
- MessagePump 封装MessagePump的操作。该MessagePump采用Singleton设计模式。因为我期望在一
个运行实例中只有一个MessagePump在运行。当然可以有更多的MessagePump在运行,
可以修改之!
使用该代码分为三个步骤:
1, 创建一个MessagePump对象
g_msgPump = MessagePump::CreateMessagePump();
2,配置MessagePump对象
//config it MessageHandler mh; g_msgPump->SetMessageHandler( &mh ); |
3,启动MessagePump
g_msgPump->StartMessagePump();
4, 开启多个线程,并在线程中往MessagePump发送消息:
g_msgPump->PostMessage( &msg, sizeof(Message));
5,停止MessagePump,并销毁MessagePump对象
g_msgPump->StopMessagePump();
g_msgPump->DestroyMessagePump();
所有代码:
- MessagePump.h
#include <iostream.h>
#include <stdlib.h>
#include <assert.h>
/*********************************************************************************
* Message pump implementation, By Henry Lau.
* [2006/12/12] - First create it
* [2007/1/26 ] - Add SimpleQueue as messages container instead of STL::vector
* - Add MessagePump::Pause()
*********************************************************************************/
#ifndef MESSAGE_PUMP_H__
#define MESSAGE_PUMP_H__
typedef char Message;
#define IN
#define OUT
class CriticalSection
{
private:
CriticalSection( CriticalSection& ref );
void operator=(const CriticalSection& ref );
CRITICAL_SECTION m_cs;
public:
CriticalSection(){ ::InitializeCriticalSection(&m_cs); }
~CriticalSection(){ ::DeleteCriticalSection(&m_cs); }
public:
void Lock() { ::EnterCriticalSection(&m_cs); }
void Unlock() { ::LeaveCriticalSection(&m_cs); }
};
class AutoLock
{
private:
AutoLock( AutoLock& ref );
AutoLock& operator=(const AutoLock& ref );
CriticalSection* m_pcs;
public:
AutoLock( CriticalSection* pcs ) : m_pcs(pcs) { m_pcs->Lock(); }
~AutoLock() { m_pcs->Unlock(); }
};
class Event
{
private:
Event( Event& ref );
void operator=( const Event& ref );
HANDLE m_hEvent;
public:
Event( BOOL bManualReset, BOOL bInitState )
: m_hEvent( NULL ){
m_hEvent = ::CreateEvent( NULL, bManualReset, bInitState, NULL );
}
~Event(){ ::CloseHandle(m_hEvent); }
public:
void Wait( unsigned long dwTimeout = INFINITE ){ ::WaitForSingleObject( m_hEvent, dwTimeout); }
void Set(){ ::SetEvent( m_hEvent); }
void Reset() { ::ResetEvent( m_hEvent); }
//TRUE: signaled state, FALSE un-signaled state
BOOL Check() { return WAIT_OBJECT_0 == ::WaitForSingleObject( m_hEvent, 0); }
};
//NOTE: SimpleQueue is NOT a thread-safe class!
class SimpleQueue
{
private:
//make copy constructor and assignment operation un-accessable
SimpleQueue( SimpleQueue& ref );
void operator=( const SimpleQueue& ref );
public:
//with nAutoInc auto increasement!
SimpleQueue( long nAutoInc )
: m_nAutoInc(nAutoInc)
, m_cCapacity(nAutoInc)
, m_nHeader(0L)
, m_nTail(0L)
, m_nPeekPos(-1L)
, m_cElements(0L)
, m_pElements(NULL) {
if( m_nAutoInc ){
m_pElements = new void*[m_nAutoInc];
memset( m_pElements, 0, sizeof(void*)*m_nAutoInc);
}else{
const int size = sizeof(void*)*1024;
m_pElements = new void*[size];
memset(m_pElements, 0, size);
}
}
~SimpleQueue(){
delete[] m_pElements;
m_pElements = NULL;
}
private:
void** m_pElements;
long m_nAutoInc;
long m_cElements;
long m_cCapacity;
long m_nHeader;
long m_nTail;
long m_nPeekPos;
public:
bool EnterQueue(void* pElement) {
if( pElement == NULL )
return false;//NULL pointer NOT Allowed!
if( m_cElements >= m_cCapacity ){ //queue is already full?
if( m_nAutoInc ){
ReAllocBuffer( m_nAutoInc );
}else{
return false;//is full!
}
m_pElements[m_nTail] = pElement;
}else{
if( m_nTail + 1 > m_cCapacity )
m_nTail = 0;
m_pElements[m_nTail] = pElement;
}
++ m_nTail;
++ m_cElements;
return true;
}
void* PopQueue(){
if( m_cElements == 0 ){
//NOT any element exists in this queue
return NULL;
}
if( m_nHeader > m_cCapacity - 1 ){
m_nHeader = 0;
}
void* pElement = m_pElements[m_nHeader++];
if( -- m_cElements == 0 )
ResetQueue();
return pElement;
}
/* To peek elements without poping them out, do coding like this:
*
* StartPeekQueue();
* long nSize = GetSize();
* for( int i = 0; i < nSize; i ++ )
* Element* pElement = (Element*)PeekQueue();
* EndPeekQueue();
*/
void StartPeekQueue(){
m_nPeekPos = m_nHeader;
}
void* PeekQueue(){
if( m_cElements == 0 )
return NULL;
if( m_nPeekPos == -1 )
return NULL;//StartPeekQueue()?
if( m_nPeekPos > m_cCapacity - 1 )
m_nPeekPos = 0;
return m_pElements[m_nPeekPos++];
}
void EndPeekQueue(){
m_nPeekPos = -1;
}
bool IsQueueFull() const {
return m_cElements >= m_cCapacity;
}
long GetSize() const {
return m_cElements;
}
void ResetQueue(){
memset( m_pElements, 0, sizeof(void*)m_cCapacity);
m_nHeader = m_nTail = 0;
m_cElements = 0;
}
private:
void ReAllocBuffer( long nAutoInc ){
if( !IsQueueFull() )
return;//call ReAllocBuffer() ONLY when this queue is full!
void** pElements = new void*[m_cCapacity + nAutoInc];
memset(pElements, 0, sizeof(void*)*(m_cCapacity+nAutoInc));
StartPeekQueue();
for( long i = 0; i < m_cCapacity; i ++ ){
pElements[i] = PeekQueue();
}
EndPeekQueue();
delete[] m_pElements;
m_pElements = pElements;
//re-calculate the members
m_nTail = m_cCapacity;
m_nHeader = 0;
m_cCapacity += nAutoInc;
}
};
class MessageHandler
{
private:
MessageHandler(MessageHandler& ref );
MessageHandler& operator=(const MessageHandler& ref );
long m_fStop;
CriticalSection m_cs;
public:
MessageHandler() : m_fStop(0L) {}
virtual ~MessageHandler() {}
void StopHandlingMessages(){
AutoLock alk( &m_cs );
m_fStop = 1;
}
//Override HanleMessages() to fit your need!
virtual void HandleMessages( IN SimpleQueue& msgsQueue ){
{
AutoLock alk(&m_cs);
m_fStop = 0;
}
long nSize = msgsQueue.GetSize();
msgsQueue.StartPeekQueue();
for( long i = 0; i < nSize; i ++ ){
if( m_fStop )
break;
Message* pMsg = (Message*)msgsQueue.PeekQueue();
cout << "----------------------------------------------------------------" <<endl;
cout << "Message handled: " << *pMsg << endl;
::Sleep(50);
}
msgsQueue.EndPeekQueue();
}
};
#define QUEUE_AUTO_INC 256
class MessagePump
{
private:
MessagePump()
: m_hMessageThread(NULL)
, m_fRunning(0)
, m_pMessageHandler(NULL)
, m_msgQueue(QUEUE_AUTO_INC)
, m_msgReady(QUEUE_AUTO_INC)
, m_nIdleCounter(0L)
, m_event(TRUE, FALSE) {
m_msgQueue.ResetQueue();
}
MessagePump( MessagePump& ref );
MessagePump& operator=(const MessagePump& ref );
~MessagePump(){
FreeAllMessages( m_msgQueue );
}
static MessagePump* m_pMsgRvr;
DWORD m_nIdleCounter;
//critical section to protect m_msgQueue from multi-threads access!
CriticalSection m_cs;
HANDLE m_hMessageThread;
Event m_event;
long m_fRunning;
MessageHandler* m_pMessageHandler;
//Messages which have been pushed into message pump!
SimpleQueue m_msgQueue;
//Messages which have been pop from message pump and ready to be handled!
SimpleQueue m_msgReady;
private:
long PopAllMessages( OUT SimpleQueue& msgs ){
long nSize = 0;
long i = 0;
msgs.ResetQueue();
AutoLock alk(&m_cs);
nSize = m_msgQueue.GetSize();
for( ;i < nSize; i ++ ){
msgs.EnterQueue( m_msgQueue.PopQueue() );
}
m_msgQueue.ResetQueue();
return nSize;
}
void FreeAllMessages(IN SimpleQueue& msgsQueue ){
long nSize = msgsQueue.GetSize();
for( int i = 0; i < nSize; i ++ ){
free( msgsQueue.PopQueue() );
}
msgsQueue.ResetQueue();
}
static DWORD WINAPI MessagePumpThread( LPVOID lpvoid ){
MessagePump* pMsgRcvr = (MessagePump*)lpvoid;
do{
if( false == pMsgRcvr->HandleAllMessages()){//if no msgs has been handled.
if( pMsgRcvr->IncreaseIdleCounter() >= 0x2FFFF ){
//Enter OS Kernel mode to wait for incoming message
pMsgRcvr->GetEvent().Reset();
//wait until a message comes in!
pMsgRcvr->GetEvent().Wait();
}
}
}while( pMsgRcvr->IsPumping() );
return 0;
}
public:
static MessagePump* CreateMessagePump() {
if( m_pMsgRvr == NULL )
return m_pMsgRvr = new MessagePump;
else
return m_pMsgRvr;
}
static void DestroyMessagePump() {
delete m_pMsgRvr;
m_pMsgRvr = NULL;
}
void SetMessageHandler( MessageHandler* pmsgHandler ){
m_pMessageHandler = pmsgHandler;
}
DWORD IncreaseIdleCounter() {
return ++ m_nIdleCounter;
}
Event& GetEvent(){
return m_event;
}
BOOL IsPumping() const{
return m_fRunning == 1;
}
void StartMessagePump(){
m_fRunning = 1;
m_hMessageThread = ::CreateThread( NULL, 0, MessagePumpThread, this, 0, 0 );
}
void Pause(){
m_pMessageHandler->StopHandlingMessages();
m_fRunning = 0;
m_event.Set();
WaitForSingleObject( m_hMessageThread, INFINITE );
CloseHandle( m_hMessageThread );
//DON'T free un-handled msgs here, keep them for later use!
m_hMessageThread = NULL;
}
void StopMessagePump(){
m_pMessageHandler->StopHandlingMessages();
m_fRunning = 0;
m_event.Set();//go on Message pump loop!
WaitForSingleObject( m_hMessageThread, INFINITE);
CloseHandle( m_hMessageThread );
//free all un-handled msgs!
FreeAllMessages( m_msgQueue );
m_hMessageThread = NULL;
}
bool PostMessage( Message* msg, long msgSize ){
void* pMsg = malloc(msgSize);
bool ret = false;
memcpy( pMsg, msg, msgSize );
{
AutoLock alk(&m_cs);
ret = m_msgQueue.EnterQueue(pMsg);
}
if( !m_event.Check() ){
//release blocked thread.
m_event.Set();
}
return ret;
}
bool HandleAllMessages(){
//!Attention: DON'T lock this scope!
assert( m_pMessageHandler != NULL );
m_msgReady.ResetQueue();
if( 0 == PopAllMessages( m_msgReady ))
return false; //No msgs has been handled!
//reset idle counter
m_nIdleCounter = 0;
m_pMessageHandler->HandleMessages( m_msgReady );
FreeAllMessages( m_msgReady );
return true; // some msgs have been handled
}
};
MessagePump* MessagePump::m_pMsgRvr = NULL;
#endif
测试
#include "MessagePump.h"
#include <assert.h>
#include <iostream.h>
MessagePump* g_msgPump = NULL;
long g_fPushing= 1;
DWORD __stdcall PushMessageThread( LPVOID lpvoid );
int main(int argc, char* argv[])
{
g_msgPump = MessagePump::CreateMessagePump();
//config it
MessageHandler mh;
g_msgPump->SetMessageHandler( &mh );
//start
g_msgPump->StartMessagePump();
HANDLE hThread = ::CreateThread( NULL, 0, PushMessageThread, NULL, 0, 0 );
HANDLE hThread2 = ::CreateThread( NULL, 0, PushMessageThread, NULL, 0, 0 );
HANDLE hThread3 = ::CreateThread( NULL, 0, PushMessageThread, NULL, 0, 0 );
//stop sending messages to message pump
getchar();
cout << "stop sending message to message pump!" << endl;
g_fPushing = 0;
WaitForSingleObject( hThread, INFINITE );
WaitForSingleObject( hThread3, INFINITE );
WaitForSingleObject( hThread2, INFINITE );
CloseHandle( hThread );
CloseHandle( hThread2 );
CloseHandle( hThread3 );
getchar();
cout << "stop message pump loop!" <<endl;
//stop message pump loop.
g_msgPump->StopMessagePump();
g_msgPump->DestroyMessagePump();
return 0;
}
DWORD __stdcall PushMessageThread(LPVOID lpvoid )
{
Message msg;
long i = 0;
do{
msg = (++i)%52 + 'a';
cout << "Post message " << msg << endl;
g_msgPump->PostMessage( &msg, sizeof(Message));
//cout << "SendMessage() return" <<endl;
::Sleep(100);
}while( g_fPushing);
return 0;
}