菜鸟初学Java的备忘录(九)

类别:Java 点击:0 评论:0 推荐:

2003年1月25日 星期六 雨

我突然发现了利用接口实现多线程和利用类构造线程体的不同了,以前我好像并没有太多的注意.利用类构造线程体的时候,需要使用这个类来定义线程对象,比如MyThread thread1=New MyThread(),而使用接口创建线程体的时候,只需要用到Thread类就可以了,比如Thread thread1=new Thread(MyThread).这在几天前所讲的多线程的Socket中均有体现.

前面不懈的努力使我现在可以继续原先未完成的工作,正式向连接池版本的Socket进发了.


回顾一下我们是如何创建多线程的服务器MultiThreadRemoteFileServer的,这得看看前几天的内容了.概括起来,就是为每个等待连接的客户new一个线程使用,来一个分配一个.每当有客户机申请一个连接时都在一个新 Thread 中创建一个新 ConnectionHandler(注:使用接口构造的线程体).这意味着可能有一打 Thread在整个系统中运行着.显然,系统的开销会因为连接客户的数目的增长而不断增加.我们不能够不考虑到开销增加到一定程度的时候系统会承受不住的可能.因此,得想个办法限制连接客户的数量,提高服务器的效率.

那么解决的方案是:在服务器端,我们在服务器启动时创建一定数量的PooledConnectionHandler,我们把进入的连接放入一个连接池中并让PooledConnectionHandler 打理剩下的事情.客户端的程序完全不用修改,这样设计的优点如下:

1.限定了允许同时连接的数目。
2.只需要启动PooledConnectionHandler线程有限次

这两句话的涵义将在其后的程序中得到体现,下面是 PooledRemoteFileServer 类的结构:


import java.io.*;
import java.net.*;
import java.util.*;

public class PooledRemoteFileServer {
    protected int maxConnections;//定义能同时处理的客户机连接的最大数目     protected int listenPort;//定义要监听的端口号
    protected ServerSocket serverSocket;

    public PooledRemoteFileServer(int aListenPort, int maxConnections) {
        listenPort = aListenPort;
        this.maxConnections = maxConnections;
    }
    public static void main(String[] args) {
    }
    public void setUpHandlers(){//创建数目为maxConnections的 PooledConnectionHandler
    }
    public void acceptConnections(){//在 ServerSocket 上监听传入的客户机连接,和前面的RemoteFileServer,MultiThreadRemoteFileServer中的监听程序完全一样
    }
    protected void handleConnection(Socket incomingConnection) {
    }//连接处理程序
}

同样,首先来看main函数
public static void main(String[] args) {
  PooledRemoteFileServer server = new PooledRemoteFileServer(3000, 3);
  server.setUpHandlers();//同前面所有服务器的main函数不同,我们先要创建一个连接池,这个池里面有三个可用的connectionHandler
  server.acceptConnections();//一旦就绪,就开始监听

下面我们就来看创建三个connectionHandler的程序如何实现:

public void setUpHandlers(){
  for (int i = 0; i < maxConnections; i++) {
     PooledConnectionHandler currentHandler = new PooledConnectionHandler();
     new Thread(currentHandler, "Handler " + i).start();
  }
}

setUpHandlers() 方法创建maxConnections个PooledConnectionHandler并在新 Thread 中激活它们. PooledConnectionHandler在这里是一个用接口(Runnable)实现的线程体.用实现了Runnable的对象来创建Thread使我们可以在 Thread 调用 start() 并且可以期望在Runnable上调用了 run()。也就是说,我们的 PooledConnectionHandler 将等着处理进入的连接,每个都在它自己的Thread中进行。我们在示例中只创建三个Thread,而且一旦服务器运行,这就不能被改变。

acceptConnections()方法这里就略去不写了,看看连接处理程序handleConnection(Socket incomingConnection)

protected void handleConnection(Socket connectionToHandle) {
  PooledConnectionHandler.processRequest(connectionToHandle);
}

这里连接处理程序直接调用了PooledConnectionHandler线程类的类方法processRequest对监听到的连接进行处理,显然这个processRequest是一个静态方法.

PooledRemoteFileServer类中的方法均涉及到一个重要的线程类,PooledConnectionHandler.下面就看看这样一个用接口实现的类长什么样:

public class PooledConnectionHandler implements Runnable {
  protected Socket connection;//代表当前正在处理的Socket
  protected static List pool = new LinkedList();//名为 pool 的静态 LinkedList 保存需被处理的连接,也就是用LinkedList来模拟一个连接池
  public PooledConnectionHandler() {//构造函数
  }
  public void handleConnection() {//对连接的I/O操作在这里了
  }
  public static void processRequest(Socket requestToHandle) {//处理客户连接,将他们加入连接池
  }
  public void run() {//等待有连接来,来了,就调handleConnection()处理
  }
}

可以看出,这个类与多线程Socket那一天的ConnectionHandler非常相似,但不同的是,它带有处理连接池的手段.

首先看看要在监听程序中用到的processRequest()方法
public static void processRequest(Socket requestToHandle) {
    synchronized (pool) {
        pool.add(pool.size(), requestToHandle);
        pool.notifyAll();
    }
}

这里的requestToHandle就是要处理的客户连接socket.可以这样说,processRequest所做的工作就是把客户连接加入到连接池当中.但是要确保在对连接池(Pool)进行操作的时候没有其他的线程干扰,就需要获取连接池的对象锁,并使用同步块,前面所了解有关synchronized的概念在这里就可以派上用场了.

那么,既然已经保证了我们是唯一“涉水”的人,我们就可以把传入的 Socket 添加到 LinkedList 的尾端.一旦我们添加了新的连接,我们就可以使用pool.notifyall通知其它正在等待该池的 Thread,连接池的对象锁解除,现在可用了.从另外一个角度来说,也可以说是通知另外一个正在等待的线程,一些条件已经具备了.

那么这个在等待的线程是什么呢?

我们来实现PooledConnectionHandler上的run()方法,它将在连接池上等待,并且池中一有连接就处理它,所以是这个要处理连接的线程在等待着呢:

public void run() {
  while (true) {
    synchronized (pool) {
      while (pool.isEmpty()) {
        try {
        pool.wait();
        } catch (InterruptedException e) {return;}
      }
      connection = (Socket) pool.remove(0);//攫取池中的第一个连接,使之成为马上就要处理的connection
    }
  handleConnection();//然后交给handleConnection处理
  }
}

这个函数告诉了我们每个PooledConnectionHandler线程主要都在run些什么.显然,它是要不停的去看连接池中有没有接入的连接,如果有,马上处理,因此它在等待"连接池有连接了"这样一个条件.那么谁来告诉它这个条件满足了呢,显然是刚才的processRequest.当processRequest发出通知(pool.notify())的时候,这个条件就满足了,这时run()中的处理程序就不用再继续等待了,就可以马上去出一个连接进行处理.反过来说,在这个条件没有满足之前,wait()所在的线程还是要处于阻塞状态,或者是说停滞状态.由于同样要对pool进行操作,所以这里也需要使用到同步块.

让我们再次复习一下对象锁的概念.当 run() 拥有池的互斥锁时,processRequest() 如何能够把连接放到池中呢?答案是对池上的 wait() 的调用释放锁,而 wait() 接着就在自己返回之前再次攫取该锁。这就使得池对象的其它同步代码可以获取该锁。

 

最后,我们看看PooledConnectionHandler线程中的handleConnection()方法.跟在多线程服务器中不同,我们的PooledConnectionHandler有一个 handleConnection() 方法。这个方法的代码跟非池式的,也就是多线程服务器中的ConnectionHandler上的 run() 方法的代码完全一样。首先,我们把 OutputStream 和 InputStream 分别包装进(用 Socket 上的 getOutputStream() 和 getInputStream())BufferedReader 和 PrintWriter。然后我们逐行读目标文件,就象我们在多线程示例中做的那样。再一次,我们获取一些字节之后就把它们放到本地的 line 变量中,然后写出到客户机。完成读写操作之后,我们关闭 FileReader 和打开的流。

讲到这里,我们可以看到,程序中有两个类,PooledRemoteFileServer和PooledConnectionHandler.PooledRemoteFileServer并不直接处理连接请求,它只是负责监听这些连接,并把他们仍到连接池里面,至于处理的具体环节,都交给PooledConnectionHandler负责了。


我们的带有连接池的服务器研究完了。让我们回顾一下创建和使用“池版”服务器的步骤:

1.创建一个新种类的连接处理程序(我们称之为 PooledConnectionHandler)来处理池中的连接。
2.修改服务器以创建和使用一组 PooledConnectionHandler。

附:PooledRemoteFileServer.java的源码

import java.io.*;
import java.net.*;
import java.util.*;

public class PooledRemoteFileServer {
    protected int maxConnections;
    protected int listenPort;
    protected ServerSocket serverSocket;
    public PooledRemoteFileServer(int aListenPort, int maxConnections) {
        listenPort = aListenPort;
        this.maxConnections = maxConnections;
    }
    public void acceptConnections() {
        try {
            ServerSocket server = new ServerSocket(listenPort, 5);
            Socket incomingConnection = null;
            while (true) {
                incomingConnection = server.accept();
                handleConnection(incomingConnection);
            }
        } catch (BindException e) {
            System.out.println("Unable to bind to port " + listenPort);
        } catch (IOException e) {
            System.out.println("Unable to instantiate a ServerSocket on port: " + listenPort);
        }
    }
    protected void handleConnection(Socket connectionToHandle) {
        PooledConnectionHandler.processRequest(connectionToHandle);
    }
    public static void main(String[] args) {
        PooledRemoteFileServer server = new PooledRemoteFileServer(3000, 3);
        server.setUpHandlers();
        server.acceptConnections();
    }
    public void setUpHandlers() {
        for (int i = 0; i < maxConnections; i++) {
            PooledConnectionHandler currentHandler = new PooledConnectionHandler();
            new Thread(currentHandler, "Handler " + i).start();
        }
    }
}

class PooledConnectionHandler implements Runnable {
    protected Socket connection;
    protected static List pool = new LinkedList();
    public PooledConnectionHandler() {
    }
    public void handleConnection() {
        try {
            PrintWriter streamWriter = new PrintWriter(connection.getOutputStream());
            BufferedReader streamReader = new BufferedReader(new InputStreamReader(connection.getInputStream()));

            String fileToRead = streamReader.readLine();
            BufferedReader fileReader = new BufferedReader(new FileReader(fileToRead));

            String line = null;
            while ((line = fileReader.readLine()) != null)
                streamWriter.println(line);

            fileReader.close();
            streamWriter.close();
            streamReader.close();
        } catch (FileNotFoundException e) {
            System.out.println("Could not find requested file on the server.");
        } catch (IOException e) {
            System.out.println("Error handling a client: " + e);
        }
    }
    public static void processRequest(Socket requestToHandle) {
        synchronized (pool) {
            pool.add(pool.size(), requestToHandle);
            pool.notifyAll();
        }
    }
    public void run() {
        while (true) {
            synchronized (pool) {
                while (pool.isEmpty()) {
                    try {
                        pool.wait();
                    } catch (InterruptedException e) {
                        return;
                    }
                }
                connection = (Socket) pool.remove(0);
            }
            handleConnection();
        }
    }
}


 

本文地址:http://com.8s8s.com/it/it18040.htm