自己写的线程池类(使用C++,pure API,Mutex,Event)

类别:编程语言 点击:0 评论:0 推荐:

计算机网络试验要求写一个文件传输程序。由于以前自己写的线程类和SOCKET类丢失掉了(寝室电脑被盗),现在重写这几个类,干脆就写了一个线程池,用的是C++STL和纯API。而且,为了保证这个线程类本身是线程安全的,我还使用了WinApi中的互斥量。同时仿照C#的类库,在线程类中加入了Join方法。调用线程对象Join方法的线程将等待线程对象直到执行完毕。以下是源代码。

/////////////////////////////////MyThread.h//////////////////////////////////////

#ifndef _MYTHREAD_H_
#define _MYTHREAD_H_
#include "MyException.h"
#include <windows.h>
#include <list>
using namespace std;

//前向声明
class CMyThread;//线程类
class CMyThreadPool;//线程池类
class CMyThreadExp;//线程异常类

#define MyThreadProc LPTHREAD_START_ROUTINE

//线程状态枚举
enum MyThreadStatus
{
 THREADRUN,//运行
 THREADPAUSE,//暂停
};
//线程类,可动态更换执行函数
class CMyThread
{
 struct MyThreadParam
 {
  LPTHREAD_START_ROUTINE proc;//用户线程函数
  void *lpParam;//用户线程参数
  CMyThread *pCMyThread;//线程类对象
 };
public:
 //构造,proc线程函数
 CMyThread();
 //析构
 ~CMyThread();
 //运行线程
 bool Run();
 //暂停线程
 bool Pause();
 //调用Join的线程将阻塞,直到该线程执行完毕
 void Join();
 //设置线程运行的函数,和要传给线程的参数
 void SetProc(LPTHREAD_START_ROUTINE proc, void* lpParam);

protected:
 CMyThread(CMyThread&) {}
 //线程实际运行的函数
 static DWORD WINAPI RealThreadProc(void* lpParam);
 friend class CMyThreadPool;

protected:
 HANDLE m_hThread;
 DWORD m_id;
 HANDLE m_hEvt;
 MyThreadParam m_param;
 MyThreadStatus m_status;
 HANDLE m_hMtx;
};

//线程池类
class CMyThreadPool
{
public:
 //构造,initNum初始情况线程数量
 CMyThreadPool(int initNum);
 //析构
 ~CMyThreadPool();
 //申请线程进行工作,proc工作函数,lpParam工作函数参数,run是否立即运行,返回线程ID
 DWORD DoWork(LPTHREAD_START_ROUTINE proc, void *lpParam, bool run=true);
 //运行线程,id线程ID
 bool Run(DWORD id);
 //暂停运行线程,id线程ID
 bool Pause(DWORD id);
 //调整线程池大小为size,返回调整后线程池大小
 unsigned SetSize(unsigned size);//when decrease num, its dangerous

protected:
 list<CMyThread*> m_lst;
};

//线程异常种类枚举
enum EnumMyThreadExp
{
 EXPCREATETHREAD = 0,
 EXPTERMINATETHREAD,
 EXPRESUMETHREAD,
 EXPSUSPENDTHREAD,
 EXPCREATEEVENT,
 EXPCREATEMUTEX,
};
//线程异常类
class CMyThreadExp : public CMyException
{
public:
 CMyThreadExp(EnumMyThreadExp exp, CMyThread *pobj);
 ~CMyThreadExp();
 void GetInfo();
 EnumMyThreadExp m_exp;
 CMyThread *m_pobj;
};

#endif//_MYTHREAD_H_

//////////////////////MyThread.cpp//////////////////////////////////

#include "mythread.h"
#include <iostream>
using namespace std;

CMyThreadExp::CMyThreadExp(EnumMyThreadExp exp, CMyThread *pobj)
: m_exp(exp), m_pobj(pobj)
{
 GetInfo();
 ShowExp();
}
CMyThreadExp::~CMyThreadExp()
{
}
void CMyThreadExp::GetInfo()
{
 sprintf(m_strInfo, "\nCMyThread Exception When %p Execute ", (void*)(m_pobj));
 switch (m_exp)
 {
 case EXPCREATETHREAD: strcat(m_strInfo, "CreateThread()"); break;
 case EXPTERMINATETHREAD: strcat(m_strInfo,"TerminateThread()"); break;
 case EXPRESUMETHREAD: strcat(m_strInfo, "ResumeThread()"); break;
 case EXPSUSPENDTHREAD: strcat(m_strInfo, "SuspendThread()"); break;
 case EXPCREATEEVENT: strcat(m_strInfo, "CreateEvent()"); break;
 case EXPCREATEMUTEX: strcat(m_strInfo, "CreateMutex()"); break;
 }
 char temp[100];
 sprintf(temp, "\nError Code = %d", GetLastError());
 strcat(m_strInfo, temp);
}
/**********************************************************************************************/
CMyThread::CMyThread()
: m_hThread(NULL), m_status(THREADPAUSE), m_hEvt(0), m_hMtx(0)
{
 m_hThread = CreateThread(0, 0, (LPTHREAD_START_ROUTINE)(CMyThread::RealThreadProc),
      (void*)&m_param, CREATE_SUSPENDED, &m_id);
 if (m_hThread == NULL)//fail to create thread
 {
  CMyThreadExp(EXPCREATETHREAD, this);
  throw;//construct exception
 }
 m_param.proc = NULL;
 m_param.lpParam = NULL;
 m_param.pCMyThread = this;

 m_hEvt = CreateEvent(0, FALSE, FALSE, 0);//自动复位
 if (m_hEvt == 0)
 {
  CloseHandle(m_hThread);
  CMyThreadExp(EXPCREATEEVENT, this);
  throw;//construct exception
 }
 m_hMtx = CreateMutex(0, 0, 0);
 if (m_hMtx == 0)//fail to create mutex
 {
  unsigned long i = GetLastError();
  CloseHandle(m_hEvt);
  CloseHandle(m_hThread);
  CMyThreadExp(EXPCREATEMUTEX, this);
  throw;//construct exception
 }
}
CMyThread::~CMyThread()
{
 CloseHandle(m_hMtx);
 if (TerminateThread(m_hThread, -1) == 0)//fail to terminate
  CMyThreadExp(EXPTERMINATETHREAD, this);
}
bool CMyThread::Run()
{
 WaitForSingleObject(m_hMtx, INFINITE);//get mutex
 if (m_status == THREADPAUSE)
  if (ResumeThread(m_hThread) == -1)//fail to resume
  {
   CMyThreadExp(EXPRESUMETHREAD, this);
   ReleaseMutex(m_hMtx);//release mutex
   return false;
  }
 m_status = THREADRUN;
 ReleaseMutex(m_hMtx);//release mutex
 return true;
}
bool CMyThread::Pause()
{
 WaitForSingleObject(m_hMtx, INFINITE);//get mutex
 if (m_status == THREADRUN)
  if (SuspendThread(m_hThread) == -1)//fail to suspend
  {
   CMyThreadExp(EXPSUSPENDTHREAD, this);
   ReleaseMutex(m_hMtx);//release mutex
   return false;
  }
 m_status = THREADPAUSE;
 ReleaseMutex(m_hMtx);//release mutex
 return true;
}
void CMyThread::Join()
{
 WaitForSingleObject(m_hEvt, INFINITE);
}
void CMyThread::SetProc(LPTHREAD_START_ROUTINE proc, void* lpParam)
{
 WaitForSingleObject(m_hMtx, INFINITE);//get mutex
 m_param.proc = proc;
 m_param.lpParam = lpParam;
 ReleaseMutex(m_hMtx);//release mutex
}
DWORD WINAPI CMyThread::RealThreadProc(void* lpParam)
{
 LPTHREAD_START_ROUTINE proc;
 MyThreadParam *pp = (MyThreadParam*)lpParam;
 while (true)
 {
  proc = pp->proc;
  if (proc)
   (*proc)(pp->lpParam);
  pp->proc = NULL;//clear function
  pp->lpParam = NULL;//clear param
  pp->pCMyThread->Pause();//pause automatic
  SetEvent(pp->pCMyThread->m_hEvt);
 }
}
/**********************************************************************************************/
CMyThreadPool::CMyThreadPool(int initNum)
{
 CMyThread *pt;
 for (int i=0; i<initNum; i++)
 {
  pt = new CMyThread;
  m_lst.push_back(pt);
 }
}
CMyThreadPool::~CMyThreadPool()
{
 list<CMyThread*>::iterator it = m_lst.begin();
 list<CMyThread*>::iterator ite = m_lst.end();
 for ( ; it != ite; it++)//terminate all thread
 {
  delete (*it);
 }
}
DWORD CMyThreadPool::DoWork(LPTHREAD_START_ROUTINE proc, void *lpParam, bool run)
{
 list<CMyThread*>::iterator it = m_lst.begin();
 list<CMyThread*>::iterator ite = m_lst.end();
 for ( ; it != ite; it++)//is there a idle thread?
 {
  if ((*it)->m_param.proc == NULL
   && (*it)->m_status == THREADPAUSE
   )
  {
   (*it)->SetProc(proc, lpParam);
   if (run)//run at once
    (*it)->Run();
   return (*it)->m_id;
  }
 }
 //no idle thread
 CMyThread *pt = new CMyThread;//create a new thread
 m_lst.push_back(pt);
 pt->SetProc(proc, lpParam);
 if (run)
  pt->Run();
 return pt->m_id;
}
bool CMyThreadPool::Run(DWORD id)
{
 list<CMyThread*>::iterator it = m_lst.begin();
 list<CMyThread*>::iterator ite = m_lst.end();
 for ( ; it != ite; it++)
 {
  if ((*it)->m_id == id)
   return ((*it)->Run());
 }
 return false;
}
bool CMyThreadPool::Pause(DWORD id)
{
 list<CMyThread*>::iterator it = m_lst.begin();
 list<CMyThread*>::iterator ite = m_lst.end();
 for ( ; it != ite; it++)
 {
  if ((*it)->m_id == id)
   return ((*it)->Pause());
 }
 return false;
}
unsigned CMyThreadPool::SetSize(unsigned size)
{
 unsigned nowsize = m_lst.size();
 if (nowsize <= size)
 {
  CMyThread *pt;
  unsigned inc = size-nowsize;
  for (unsigned i=0; i<inc; i++)
  {
   pt = new CMyThread;
   m_lst.push_back(pt);
  }
  return size;
 }
 else
 {
  unsigned dec = nowsize - size;
  list<CMyThread*>::iterator it = m_lst.begin();
  list<CMyThread*>::iterator ite = m_lst.end();
  list<CMyThread*>::iterator itemp;
  unsigned i=0;
  for ( ; it!=ite && i<dec; )
  {
   if ((*it)->m_status == THREADPAUSE)
   {
    itemp = it++;
    delete ((*itemp));
    m_lst.erase(itemp);
    i++;
    continue;
   }
   it++;
  }
  Sleep(100*i);
  return m_lst.size();
 }
}

本文地址:http://com.8s8s.com/it/it22002.htm