(转载)异步IO、APC、IO完成端口、线程池与高性能服务器 (四)

类别:编程语言 点击:0 评论:0 推荐:

转载: http://www.vchelp.net/
原作者姓名 Fang([email protected])

正文

异步IO、APC、IO完成端口、线程池与高性能服务器之四 线程池

线程池

下面摘抄于MSDN《Thread Pooling》。
有许多应用程序创建的线程花费了大量时间在睡眠状态来等待事件的发生。还有一些线程进入睡眠状态后定期被唤醒以轮询工作方式来改变或者更新状态信息。线程池可以让你更有效地使用线程,它为你的应用程序提供一个由系统管理的工作者线程池。至少会有一个线程来监听放到线程池的所有等待操作,当等待操作完成后,线程池中将会有一个工作者线程来执行相应的回调函数。
你也可以把没有等待操作的工作项目放到线程池中,用QueueUserWorkItem函数来完成这个工作,把要执行的工作项目函数通过一个参数传递给线程池。工作项目被放到线程池中后,就不能再取消了。
Timer-queue timers和Registered wait operations也使用线程池来实现。他们的回调函数也放在线程池中。你也可以用BindIOCompletionCallback函数来投递一个异步IO操作,在IO完成端口上,回调函数也是由线程池线程来执行。
当第一次调用QueueUserWorkItem函数或者BindIOCompletionCallback函数的时候,线程池被自动创建,或者Timer-queue timers或者Registered wait operations放入回调函数的时候,线程池也可以被创建。线程池可以创建的线程数量不限,仅受限于可用的内存,每一个线程使用默认的初始堆栈大小,运行在默认的优先级上。
线程池中有两种类型的线程:IO线程和非IO线程。IO线程等待在可告警状态,工作项目作为APC放到IO线程中。如果你的工作项目需要线程执行在可警告状态,你应该将它放到IO线程。
非IO工作者线程等待在IO完成端口上,使用非IO线程比IO线程效率更高,也就是说,只要有可能的话,尽量使用非IO线程。IO线程和非IO线程在异步IO操作没有完成之前都不会退出。然而,不要在非IO线程中发出需要很长时间才能完成的异步IO请求。
正确使用线程池的方法是,工作项目函数以及它将会调用到的所有函数都必须是线程池安全的。安全的函数不应该假设线程是一次性线程的或者是永久线程。一般来说,应该避免使用线程本地存储和发出需要永久线程的异步IO调用,比如说RegNotifyChangeKeyValue函数。如果需要在永久线程中执行这样的函数的话,可以给QueueUserWorkItem传递一个选项WT_EXECUTEINPERSISTENTTHREAD。
注意,线程池不能兼容COM的单线程套间(STA)模型。

    为了更深入地讲解操作系统实现的线程池的优越性,我们首先尝试着自己实现一个简单的线程池模型。

    代码如下:

/************************************************************************/
/* Test Our own thread pool.                                            */
/************************************************************************/

typedef struct _THREAD_POOL
{
    HANDLE QuitEvent;
    HANDLE WorkItemSemaphore;

    LONG WorkItemCount;
    LIST_ENTRY WorkItemHeader;
    CRITICAL_SECTION WorkItemLock;

    LONG ThreadNum;
    HANDLE *ThreadsArray;

}THREAD_POOL, *PTHREAD_POOL;

typedef VOID (*WORK_ITEM_PROC)(PVOID Param);

typedef struct _WORK_ITEM
{
    LIST_ENTRY List;

    WORK_ITEM_PROC UserProc;
    PVOID UserParam;
    
}WORK_ITEM, *PWORK_ITEM;


DWORD WINAPI WorkerThread(PVOID pParam)
{
    PTHREAD_POOL pThreadPool = (PTHREAD_POOL)pParam;
    HANDLE Events[2];
    
    Events[0] = pThreadPool->QuitEvent;
    Events[1] = pThreadPool->WorkItemSemaphore;

    for(;;)
    {
        DWORD dwRet = WaitForMultipleObjects(2, Events, FALSE, INFINITE);

        if(dwRet == WAIT_OBJECT_0)
            break;

        //
        // execute user's proc.
        //

        else if(dwRet == WAIT_OBJECT_0 +1)
        {
            PWORK_ITEM pWorkItem;
            PLIST_ENTRY pList;

            EnterCriticalSection(&pThreadPool->WorkItemLock);
            _ASSERT(!IsListEmpty(&pThreadPool->WorkItemHeader));
            pList = RemoveHeadList(&pThreadPool->WorkItemHeader);
            LeaveCriticalSection(&pThreadPool->WorkItemLock);

            pWorkItem = CONTAINING_RECORD(pList, WORK_ITEM, List);
            pWorkItem->UserProc(pWorkItem->UserParam);

            InterlockedDecrement(&pThreadPool->WorkItemCount);
            free(pWorkItem);
        }

        else
        {
            _ASSERT(0);
            break;
        }
    }

    return 0;
}

BOOL InitializeThreadPool(PTHREAD_POOL pThreadPool, LONG ThreadNum)
{
    pThreadPool->QuitEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
    pThreadPool->WorkItemSemaphore = CreateSemaphore(NULL, 0, 0x7FFFFFFF, NULL);
    pThreadPool->WorkItemCount = 0;
    InitializeListHead(&pThreadPool->WorkItemHeader);
    InitializeCriticalSection(&pThreadPool->WorkItemLock);
    pThreadPool->ThreadNum = ThreadNum;
    pThreadPool->ThreadsArray = (HANDLE*)malloc(sizeof(HANDLE) * ThreadNum);

    for(int i=0; i<ThreadNum; i++)
    {
        pThreadPool->ThreadsArray[i] = CreateThread(NULL, 0, WorkerThread, pThreadPool, 0, NULL);
    }

    return TRUE;
}

VOID DestroyThreadPool(PTHREAD_POOL pThreadPool)
{
    SetEvent(pThreadPool->QuitEvent);

    for(int i=0; i<pThreadPool->ThreadNum; i++)
    {
        WaitForSingleObject(pThreadPool->ThreadsArray[i], INFINITE);
        CloseHandle(pThreadPool->ThreadsArray[i]);
    }

    free(pThreadPool->ThreadsArray);

    CloseHandle(pThreadPool->QuitEvent);
    CloseHandle(pThreadPool->WorkItemSemaphore);
    DeleteCriticalSection(&pThreadPool->WorkItemLock);

    while(!IsListEmpty(&pThreadPool->WorkItemHeader))
    {
        PWORK_ITEM pWorkItem;
        PLIST_ENTRY pList;
        
        pList = RemoveHeadList(&pThreadPool->WorkItemHeader);
        pWorkItem = CONTAINING_RECORD(pList, WORK_ITEM, List);
        
        free(pWorkItem);
    }
}

BOOL PostWorkItem(PTHREAD_POOL pThreadPool, WORK_ITEM_PROC UserProc, PVOID UserParam)
{
    PWORK_ITEM pWorkItem = (PWORK_ITEM)malloc(sizeof(WORK_ITEM));
    if(pWorkItem == NULL)
        return FALSE;

    pWorkItem->UserProc = UserProc;
    pWorkItem->UserParam = UserParam;

    EnterCriticalSection(&pThreadPool->WorkItemLock);
    InsertTailList(&pThreadPool->WorkItemHeader, &pWorkItem->List);
    LeaveCriticalSection(&pThreadPool->WorkItemLock);

    InterlockedIncrement(&pThreadPool->WorkItemCount);

    ReleaseSemaphore(pThreadPool->WorkItemSemaphore, 1, NULL);

    return TRUE;
}

VOID UserProc1(PVOID dwParam)
{
    WorkItem(dwParam);
}

void TestSimpleThreadPool(BOOL bWaitMode, LONG ThreadNum)
{
    THREAD_POOL ThreadPool;    
    InitializeThreadPool(&ThreadPool, ThreadNum);
    
    CompleteEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
    BeginTime = GetTickCount();
    ItemCount = 20;

    for(int i=0; i<20; i++)
    {
        PostWorkItem(&ThreadPool, UserProc1, (PVOID)bWaitMode);
    }
    
    WaitForSingleObject(CompleteEvent, INFINITE);
    CloseHandle(CompleteEvent);

    DestroyThreadPool(&ThreadPool);
}
    我们把工作项目放到一个队列中,用一个信号量通知线程池,线程池中任意一个线程取出工作项目来执行,执行完毕之后,线程返回线程池,继续等待新的工作项目。
    线程池中线程的数量是固定的,预先创建好的,永久的线程,直到销毁线程池的时候,这些线程才会被销毁。
    线程池中线程获得工作项目的机会是均等的,随机的,并没有特别的方式保证哪一个线程具有特殊的优先获得工作项目的机会。
    而且,同一时刻可以并发运行的线程数目没有任何限定。事实上,在我们的执行计算任务的演示代码中,所有的线程都并发执行。
    下面,我们再来看一下,完成同样的任务,系统提供的线程池是如何运作的。

/************************************************************************/
/* QueueWorkItem Test.                                                  */
/************************************************************************/

DWORD BeginTime;
LONG  ItemCount;
HANDLE CompleteEvent;

int compute()
{
    srand(BeginTime);

    for(int i=0; i<20 *1000 * 1000; i++)
        rand();

    return rand();
}

DWORD WINAPI WorkItem(LPVOID lpParameter)
{
    BOOL bWaitMode = (BOOL)lpParameter;

    if(bWaitMode)
        Sleep(1000);
    else
        compute();

    if(InterlockedDecrement(&ItemCount) == 0)
    {
        printf("Time total %d second.\n", GetTickCount() - BeginTime);
        SetEvent(CompleteEvent);
    }

    return 0;
}

void TestWorkItem(BOOL bWaitMode, DWORD Flag)
{
    CompleteEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
    BeginTime = GetTickCount();
    ItemCount = 20;
    
    for(int i=0; i<20; i++)
    {
        QueueUserWorkItem(WorkItem, (PVOID)bWaitMode, Flag);
    }    

    WaitForSingleObject(CompleteEvent, INFINITE);
    CloseHandle(CompleteEvent);
}
    很简单,是吧?我们仅需要关注于我们的回调函数即可。但是与我们的简单模拟来比,系统提供的线程池有着更多的优点。
    首先,线程池中线程的数目是动态调整的,其次,线程池利用IO完成端口的特性,它可以限制并发运行的线程数目,默认情况下,将会限制为CPU的数目,这可以减少线程切换。它挑选最近执行过的线程再次投入执行,从而避免了不必要的线程切换。
    系统提供的线程池背后的策略,我们下一节继续再谈。

参考书目

1,    MSDN Library
2,    《Windows高级编程指南》
3,    《Windows核心编程》
4,    《Windows 2000 设备驱动程序设计指南》

正文完

本文地址:http://com.8s8s.com/it/it25750.htm