我的线程池代码

类别:Java 点击:0 评论:0 推荐:

(1)根据xml文件来管理线程池的最大最小线程数
(2)对线程池通过Timer定期扫描以防止线程未激活;
(3)通过某一个变量(本程序中是freeThreadCount)来得到空闲线程的数目;

一、配置xml(listen.xml)是:
<?xml version="1.0" encoding="UTF-8"?>
  <config>
    <ConsumeThreadPool>
         <minPools>10</minPools>      <!--线程池最小线程-->
            <maxPools>100</maxPools>        <!--线程池最大线程-->
            <checkThreadPeriod>5</checkThreadPeriod>  <!--检查线程池中线程的周期5分钟-->
    </ConsumeThreadPool>
  </config>

二、对于ConsumeThreadPoolPara的javabean:

import java.io.*;
public class ConsumeThreadPoolPara implements Serializable{
  private int minPools;
  private int maxPools;
  private int checkThreadPeriod;

  public int getMinPools(){
    return minPools;
  }
  public int getMaxPools(){
    return maxPools;
  }
  public int getCheckThreadPeriod(){
    return checkThreadPeriod;
  }
  public void setMinPools(int minPools){
    this.minPools = minPools;
  }
  public void setMaxPools(int maxPools){
    this.maxPools = maxPools;
  }
  public void setCheckThreadPeriod(int checkThreadPeriod){
    this.checkThreadPeriod = checkThreadPeriod;
  }
  public String toString(){
    return minPools+" " + maxPools+" "+checkThreadPeriod;
  }
  public ConsumeThreadPoolPara() {
  }
  public static void main(String[] args) {
    ConsumeThreadPoolPara consumeThreadPool1 = new ConsumeThreadPoolPara();
  }

}

三、解析xml程序代码(生成ConsumeThreadPoolPara):
使用jdom解析:
import org.jdom.*;
import org.jdom.input.SAXBuilder;
import java.io.*;
import java.util.*;

public class ParseConfig {
  static Hashtable Listens = null;
  static ConnPara connpara = null;
  static ConsumeThreadPoolPara consumeThreadPoolPara = null;
  private static String configxml = "listen.xml";

  static{
    getConsumeThreadPoolPara();  //得到消费的线程池的参数
  }

  /**
   * 装载文档
   * @return 返回根结点
   * @throws JDOMException
   */
  public static Element loadDocument() throws JDOMException{
    SAXBuilder parser = new SAXBuilder(); // 新建立构造器
    try {
      Document document = parser.build(configxml);
      Element root = document.getRootElement();
      return root;
    }catch(JDOMException e){
      logger.error("listen.xml文件格式非法!");
      throw new JDOMException();
    }
  }

  public static ConsumeThreadPoolPara getConsumeThreadPoolPara(){
    if(consumeThreadPoolPara ==null){
      try {
        Element root = loadDocument();
        Element consumeThreadPool = root.getChild("ConsumeThreadPool");
        if (consumeThreadPool != null) { //代表有数据库配置
          consumeThreadPoolPara = new ConsumeThreadPoolPara();
          Element minPools = consumeThreadPool.getChild("minPools");
          consumeThreadPoolPara.setMinPools(Integer.parseInt(minPools.getTextTrim()));
          Element maxPools = consumeThreadPool.getChild("maxPools");
          consumeThreadPoolPara.setMaxPools(Integer.parseInt(maxPools.getTextTrim()));
          Element checkThreadPeriod = consumeThreadPool.getChild("checkThreadPeriod");
          consumeThreadPoolPara.setCheckThreadPeriod(Integer.parseInt(checkThreadPeriod.getTextTrim()));
        }
      }
      catch (JDOMException e) {
      }
    }
    return consumeThreadPoolPara;
  }
}

四、线程池源代码:
import java.util.*;

/**
 * <p>Title: 线程池</p>
 * <p>Description: 采集消费模块</p>
 * <p>Copyright: Copyright (c) 2004</p>
 * <p>Company: </p>
 * @author 张荣斌
 * @version 1.0
 */

public class ThreadPool {
  private static int minPools = 10; //最小连接池数目
  private static int maxPools = 100; //最大连接池数目
  private static int checkThreadPeriod = 5; //检查连接池的周期
  ArrayList m_ThreadList;  //工作线程列表
  LinkedList m_RunList = null;  //工作任务列表
  int totalThread = 0;  //总线程数
  static int freeThreadCount = 0;  //未被使用的线程数目
  private java.util.Timer timer = null;  //定时器
  static Object o = new Object();

  static{  //先初始化线程池的参数
    ConsumeThreadPoolPara consumeThreadPoolPara = ParseConfig.getConsumeThreadPoolPara();
    if(consumeThreadPoolPara!=null){
      minPools = consumeThreadPoolPara.getMinPools();
      maxPools = consumeThreadPoolPara.getMaxPools();
      checkThreadPeriod = consumeThreadPoolPara.getCheckThreadPeriod()*60*1000;
    }
  }
  public void setMinPools(int minPools){
    this.minPools = minPools;
  }
  public void setMaxPools(int maxPools){
    this.maxPools = maxPools;
  }
  public void setCheckThreadPeriod(int checkThreadPeriod){
    this.checkThreadPeriod = checkThreadPeriod;
  }
  public ThreadPool() {

    m_ThreadList=new ArrayList();
    m_RunList=new LinkedList();
    for(int i=0;i<minPools;i++){
        WorkerThread temp=new WorkerThread();
        totalThread = totalThread + 1;
        m_ThreadList.add(temp);
        temp.start();
        try{
          Thread.sleep(100);
        }catch(Exception e){
        }
    }
    timer = new Timer(true);  //启动定时器
    timer.schedule(new CheckThreadTask(this),0,checkThreadPeriod);
  }

  /**
   * 当有一个工作来的时候启动线程池的线程
   * 1.当空闲线程数为0的时候,看总线程是否小于最大线程池的数目,就new一个新的线程,否则sleep,直到有空闲线程为止;
   * 2.当空闲线程不为0,则将任务丢给空闲线程去完成
   * @param work
   */
  public synchronized void run(String work)
  {
          if (freeThreadCount == 0) {
                  if(totalThread<maxPools){
                    WorkerThread temp = new WorkerThread();
                    totalThread = totalThread + 1;
                    m_ThreadList.add(temp);
                    temp.start();
                    synchronized(m_RunList){
                      m_RunList.add(work);
                      m_RunList.notify();
                    }
                  }else{
                    while (freeThreadCount == 0) {
                      try {
                        Thread.sleep(200);
                      }
                      catch (InterruptedException e) {
                      }
                    }
                    synchronized(m_RunList){
                      m_RunList.add(work);
                      m_RunList.notify();
                    }
                  }
          } else {
            synchronized(m_RunList){
              m_RunList.add(work);
              m_RunList.notify();
            }
          }
  }

  /**
   * 检查所有的线程的有效性
   */
  public synchronized void checkAllThreads() {

    Iterator lThreadIterator = m_ThreadList.iterator();

    while (lThreadIterator.hasNext()) { //逐个遍厉
      WorkerThread lTestThread = (WorkerThread) lThreadIterator.next();

      if (! (lTestThread.isAlive())) { //如果处在非活动状态时
        lTestThread = new WorkerThread(); //重新生成个线程
        lTestThread.start(); //启动
      }
    }
  }

  /**
   * 打印调试信息
   */
  public void printDebugInfo(){
        System.out.println("totalThread="+totalThread);
        System.out.println("m_ThreadList.size()="+m_ThreadList.size());
}

      /**
       *
       * <p>Title: 工作线程类</p>
       * @author 张荣斌
       * @version 1.0
       */
class WorkerThread extends Thread{
           boolean running = true;
           String work;

            public void run(){
              while(running){
                synchronized(o){
                  freeThreadCount++;
                }
                synchronized(m_RunList){
                  while(m_RunList.size() == 0){
                       try{
                         m_RunList.wait();
                         if(!running) return;
                       }catch(InterruptedException e){
                       }
                  }
                  synchronized(o){
                    freeThreadCount--;
                  }
                  work = (String)m_RunList.removeLast();
                  if(work==null) return;
                }

  // 得到了work 进行工作,这里work可以换成自己的工作类
              }
            }
}

}
        /**
         *
         * <p>Title: 定时器调动的任务</p>
         * @author 张荣斌
         * @version 1.0
         */
        class CheckThreadTask extends TimerTask{
          private static boolean isRunning = false;
          private ThreadPool pool;

          public CheckThreadTask(ThreadPool pool){
            this.pool = pool;
          }
          public void run() {
            if (!isRunning)  {
              isRunning = true;
              pool.checkAllThreads();
              isRunning = false;
            }
          }
        }

本文地址:http://com.8s8s.com/it/it15670.htm