计算机网络试验要求写一个文件传输程序。由于以前自己写的线程类和SOCKET类丢失掉了(寝室电脑被盗),现在重写这几个类,干脆就写了一个线程池,用的是C++STL和纯API。而且,为了保证这个线程类本身是线程安全的,我还使用了WinApi中的互斥量。同时仿照C#的类库,在线程类中加入了Join方法。调用线程对象Join方法的线程将等待线程对象直到执行完毕。以下是源代码。
/////////////////////////////////MyThread.h//////////////////////////////////////
#ifndef _MYTHREAD_H_
#define _MYTHREAD_H_
#include "MyException.h"
#include <windows.h>
#include <list>
using namespace std;
//前向声明
class CMyThread;//线程类
class CMyThreadPool;//线程池类
class CMyThreadExp;//线程异常类
#define MyThreadProc LPTHREAD_START_ROUTINE
//线程状态枚举
enum MyThreadStatus
{
THREADRUN,//运行
THREADPAUSE,//暂停
};
//线程类,可动态更换执行函数
class CMyThread
{
struct MyThreadParam
{
LPTHREAD_START_ROUTINE proc;//用户线程函数
void *lpParam;//用户线程参数
CMyThread *pCMyThread;//线程类对象
};
public:
//构造,proc线程函数
CMyThread();
//析构
~CMyThread();
//运行线程
bool Run();
//暂停线程
bool Pause();
//调用Join的线程将阻塞,直到该线程执行完毕
void Join();
//设置线程运行的函数,和要传给线程的参数
void SetProc(LPTHREAD_START_ROUTINE proc, void* lpParam);
protected:
CMyThread(CMyThread&) {}
//线程实际运行的函数
static DWORD WINAPI RealThreadProc(void* lpParam);
friend class CMyThreadPool;
protected:
HANDLE m_hThread;
DWORD m_id;
HANDLE m_hEvt;
MyThreadParam m_param;
MyThreadStatus m_status;
HANDLE m_hMtx;
};
//线程池类
class CMyThreadPool
{
public:
//构造,initNum初始情况线程数量
CMyThreadPool(int initNum);
//析构
~CMyThreadPool();
//申请线程进行工作,proc工作函数,lpParam工作函数参数,run是否立即运行,返回线程ID
DWORD DoWork(LPTHREAD_START_ROUTINE proc, void *lpParam, bool run=true);
//运行线程,id线程ID
bool Run(DWORD id);
//暂停运行线程,id线程ID
bool Pause(DWORD id);
//调整线程池大小为size,返回调整后线程池大小
unsigned SetSize(unsigned size);//when decrease num, its dangerous
protected:
list<CMyThread*> m_lst;
};
//线程异常种类枚举
enum EnumMyThreadExp
{
EXPCREATETHREAD = 0,
EXPTERMINATETHREAD,
EXPRESUMETHREAD,
EXPSUSPENDTHREAD,
EXPCREATEEVENT,
EXPCREATEMUTEX,
};
//线程异常类
class CMyThreadExp : public CMyException
{
public:
CMyThreadExp(EnumMyThreadExp exp, CMyThread *pobj);
~CMyThreadExp();
void GetInfo();
EnumMyThreadExp m_exp;
CMyThread *m_pobj;
};
#endif//_MYTHREAD_H_
//////////////////////MyThread.cpp//////////////////////////////////
#include "mythread.h"
#include <iostream>
using namespace std;
CMyThreadExp::CMyThreadExp(EnumMyThreadExp exp, CMyThread *pobj)
: m_exp(exp), m_pobj(pobj)
{
GetInfo();
ShowExp();
}
CMyThreadExp::~CMyThreadExp()
{
}
void CMyThreadExp::GetInfo()
{
sprintf(m_strInfo, "\nCMyThread Exception When %p Execute ", (void*)(m_pobj));
switch (m_exp)
{
case EXPCREATETHREAD: strcat(m_strInfo, "CreateThread()"); break;
case EXPTERMINATETHREAD: strcat(m_strInfo,"TerminateThread()"); break;
case EXPRESUMETHREAD: strcat(m_strInfo, "ResumeThread()"); break;
case EXPSUSPENDTHREAD: strcat(m_strInfo, "SuspendThread()"); break;
case EXPCREATEEVENT: strcat(m_strInfo, "CreateEvent()"); break;
case EXPCREATEMUTEX: strcat(m_strInfo, "CreateMutex()"); break;
}
char temp[100];
sprintf(temp, "\nError Code = %d", GetLastError());
strcat(m_strInfo, temp);
}
/**********************************************************************************************/
CMyThread::CMyThread()
: m_hThread(NULL), m_status(THREADPAUSE), m_hEvt(0), m_hMtx(0)
{
m_hThread = CreateThread(0, 0, (LPTHREAD_START_ROUTINE)(CMyThread::RealThreadProc),
(void*)&m_param, CREATE_SUSPENDED, &m_id);
if (m_hThread == NULL)//fail to create thread
{
CMyThreadExp(EXPCREATETHREAD, this);
throw;//construct exception
}
m_param.proc = NULL;
m_param.lpParam = NULL;
m_param.pCMyThread = this;
m_hEvt = CreateEvent(0, FALSE, FALSE, 0);//自动复位
if (m_hEvt == 0)
{
CloseHandle(m_hThread);
CMyThreadExp(EXPCREATEEVENT, this);
throw;//construct exception
}
m_hMtx = CreateMutex(0, 0, 0);
if (m_hMtx == 0)//fail to create mutex
{
unsigned long i = GetLastError();
CloseHandle(m_hEvt);
CloseHandle(m_hThread);
CMyThreadExp(EXPCREATEMUTEX, this);
throw;//construct exception
}
}
CMyThread::~CMyThread()
{
CloseHandle(m_hMtx);
if (TerminateThread(m_hThread, -1) == 0)//fail to terminate
CMyThreadExp(EXPTERMINATETHREAD, this);
}
bool CMyThread::Run()
{
WaitForSingleObject(m_hMtx, INFINITE);//get mutex
if (m_status == THREADPAUSE)
if (ResumeThread(m_hThread) == -1)//fail to resume
{
CMyThreadExp(EXPRESUMETHREAD, this);
ReleaseMutex(m_hMtx);//release mutex
return false;
}
m_status = THREADRUN;
ReleaseMutex(m_hMtx);//release mutex
return true;
}
bool CMyThread::Pause()
{
WaitForSingleObject(m_hMtx, INFINITE);//get mutex
if (m_status == THREADRUN)
if (SuspendThread(m_hThread) == -1)//fail to suspend
{
CMyThreadExp(EXPSUSPENDTHREAD, this);
ReleaseMutex(m_hMtx);//release mutex
return false;
}
m_status = THREADPAUSE;
ReleaseMutex(m_hMtx);//release mutex
return true;
}
void CMyThread::Join()
{
WaitForSingleObject(m_hEvt, INFINITE);
}
void CMyThread::SetProc(LPTHREAD_START_ROUTINE proc, void* lpParam)
{
WaitForSingleObject(m_hMtx, INFINITE);//get mutex
m_param.proc = proc;
m_param.lpParam = lpParam;
ReleaseMutex(m_hMtx);//release mutex
}
DWORD WINAPI CMyThread::RealThreadProc(void* lpParam)
{
LPTHREAD_START_ROUTINE proc;
MyThreadParam *pp = (MyThreadParam*)lpParam;
while (true)
{
proc = pp->proc;
if (proc)
(*proc)(pp->lpParam);
pp->proc = NULL;//clear function
pp->lpParam = NULL;//clear param
pp->pCMyThread->Pause();//pause automatic
SetEvent(pp->pCMyThread->m_hEvt);
}
}
/**********************************************************************************************/
CMyThreadPool::CMyThreadPool(int initNum)
{
CMyThread *pt;
for (int i=0; i<initNum; i++)
{
pt = new CMyThread;
m_lst.push_back(pt);
}
}
CMyThreadPool::~CMyThreadPool()
{
list<CMyThread*>::iterator it = m_lst.begin();
list<CMyThread*>::iterator ite = m_lst.end();
for ( ; it != ite; it++)//terminate all thread
{
delete (*it);
}
}
DWORD CMyThreadPool::DoWork(LPTHREAD_START_ROUTINE proc, void *lpParam, bool run)
{
list<CMyThread*>::iterator it = m_lst.begin();
list<CMyThread*>::iterator ite = m_lst.end();
for ( ; it != ite; it++)//is there a idle thread?
{
if ((*it)->m_param.proc == NULL
&& (*it)->m_status == THREADPAUSE
)
{
(*it)->SetProc(proc, lpParam);
if (run)//run at once
(*it)->Run();
return (*it)->m_id;
}
}
//no idle thread
CMyThread *pt = new CMyThread;//create a new thread
m_lst.push_back(pt);
pt->SetProc(proc, lpParam);
if (run)
pt->Run();
return pt->m_id;
}
bool CMyThreadPool::Run(DWORD id)
{
list<CMyThread*>::iterator it = m_lst.begin();
list<CMyThread*>::iterator ite = m_lst.end();
for ( ; it != ite; it++)
{
if ((*it)->m_id == id)
return ((*it)->Run());
}
return false;
}
bool CMyThreadPool::Pause(DWORD id)
{
list<CMyThread*>::iterator it = m_lst.begin();
list<CMyThread*>::iterator ite = m_lst.end();
for ( ; it != ite; it++)
{
if ((*it)->m_id == id)
return ((*it)->Pause());
}
return false;
}
unsigned CMyThreadPool::SetSize(unsigned size)
{
unsigned nowsize = m_lst.size();
if (nowsize <= size)
{
CMyThread *pt;
unsigned inc = size-nowsize;
for (unsigned i=0; i<inc; i++)
{
pt = new CMyThread;
m_lst.push_back(pt);
}
return size;
}
else
{
unsigned dec = nowsize - size;
list<CMyThread*>::iterator it = m_lst.begin();
list<CMyThread*>::iterator ite = m_lst.end();
list<CMyThread*>::iterator itemp;
unsigned i=0;
for ( ; it!=ite && i<dec; )
{
if ((*it)->m_status == THREADPAUSE)
{
itemp = it++;
delete ((*itemp));
m_lst.erase(itemp);
i++;
continue;
}
it++;
}
Sleep(100*i);
return m_lst.size();
}
}
本文地址:http://com.8s8s.com/it/it22002.htm