nio 的使用方法

类别:Java 点击:0 评论:0 推荐:
使用nio最尴尬的莫过于一不小心cpu利用率就维持在100%,一般的原因可能是
注册了OP_WRITE事件 对某个注册事件一直没处理(或没处理完)使用nio应该注意:
只在一个线程中操作selector(在多个线程中操作同一个selector就是一场噩梦) 只注册当前感兴趣的事件 要发送数据时直接写,一次写不完再注册OP_WRITE事件,在下一次可写时发送

实践代码:
SelectorProcessor类,分发事件(最好使用线程池)
package zzzhc;

import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;

/**
 * @author <a href="zzzhc'>mailto:[email protected]">zzzhc </a>
 * 
 */
public class SelectorProcessor implements Runnable {
    private final Selector selector;

    private final Queue waitCloseQueue;

    private final Queue waitRegisterQueue;

    private final Queue waitAddInterestQueue;

    private final Thread processorThread;

    private boolean shutdown = false;
   
    private final static SelectorProcessor instance;
   
    static {
        try {
            instance = new SelectorProcessor();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
   
    public static SelectorProcessor getDefaultInstance() {
        return instance;
    }

    public SelectorProcessor() throws IOException {
        selector = Selector.open();
        waitCloseQueue = new Queue();
        waitRegisterQueue = new Queue();
        waitAddInterestQueue = new Queue();
        processorThread = new Thread(this);
        //processorThread.setDaemon(true);//is this needed?
        processorThread.start();
    }

    public void register(SelectableChannel sc, Handler handler, int ops) {
        if (Thread.currentThread() == processorThread) {
            doRegister(sc, handler, ops);
        } else {
            ChannelAssociater r = new ChannelAssociater(sc, handler, ops);
            synchronized (waitRegisterQueue) {
                waitRegisterQueue.push(r);
                selector.wakeup();
            }
        }
    }

    public void addInterestOps(SelectableChannel sc, int addOps) {
        if (Thread.currentThread() == processorThread) {
            addOps(sc, addOps);
        } else {
            ChannelAssociater r = new ChannelAssociater(sc, null, addOps);
            synchronized (waitAddInterestQueue) {
                waitAddInterestQueue.push(r);
                selector.wakeup();
            }
        }
    }

    public void closeChannel(SelectableChannel sc) {
        if (Thread.currentThread() == processorThread) {
            doClose(sc);
        } else {
            synchronized (waitCloseQueue) {
                waitCloseQueue.push(sc);
                selector.wakeup();
            }
        }
    }

    protected void doRegister(SelectableChannel sc, Handler handler, int ops) {
        if (Thread.currentThread() == processorThread) {
            try {
                sc.register(selector, ops, handler);
            } catch (ClosedChannelException e) {
                e.printStackTrace();
            }
        }
    }

    protected void addOps(SelectableChannel sc, int addOps) {
        if (Thread.currentThread() == processorThread) {
            SelectionKey key = sc.keyFor(selector);
            key.interestOps(key.interestOps() | addOps);
        }
    }

    protected void doClose(SelectableChannel sc) {
        if (Thread.currentThread() == processorThread) {
            try {
                sc.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    protected void dealRegister() {
        if (Thread.currentThread() == processorThread) {
            synchronized (waitRegisterQueue) {
                while (!waitRegisterQueue.isEmpty()) {
                    ChannelAssociater ca = (ChannelAssociater) waitRegisterQueue
                            .shift();
                    doRegister(ca.sc, ca.handler, ca.ops);
                }
            }
        }
    }

    protected void dealAddInterest() {
        if (Thread.currentThread() == processorThread) {
            synchronized (waitAddInterestQueue) {
                while (!waitAddInterestQueue.isEmpty()) {
                    ChannelAssociater ca = (ChannelAssociater) waitAddInterestQueue
                            .shift();
                    addOps(ca.sc, ca.ops);
                }
            }
        }
    }

    protected void dealClose() {
        if (Thread.currentThread() == processorThread) {
            synchronized (waitCloseQueue) {
                while (!waitCloseQueue.isEmpty()) {
                    SelectableChannel sc = (SelectableChannel) waitCloseQueue
                            .shift();
                    doClose(sc);
                }
            }
        }
    }

    protected void dealShutdown() {
        Iterator iterator = selector.keys().iterator();
        while (iterator.hasNext()) {
            try {
                SelectionKey key = (SelectionKey) iterator.next();
                key.channel().close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        try {
            selector.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void shutdown() {
        this.shutdown = true;
        selector.wakeup();
    }

    public void run() {
        int keyCount = 0;
        while (!shutdown) {
            dealRegister();
            dealAddInterest();
            dealClose();
            try {
                keyCount = selector.select();
                if (keyCount == 0) {
                    continue;
                }
                Iterator iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = (SelectionKey) iterator.next();
                    iterator.remove();
                    key.interestOps(key.interestOps() & ~key.readyOps());
                    Handler handler = (Handler) key.attachment();
                    if (key.isAcceptable()) {
                        ((AcceptHandler) handler).handleAccept();
                    } else if (key.isConnectable()) {
                        ((ConnectHandler) handler).handleConnect();
                    } else {
                        ReadWriteHandler rwh = ((ReadWriteHandler) handler);
                        if (key.isValid() && key.isReadable()) {
                            rwh.handleRead();
                        } else if (key.isValid() && key.isWritable()) {
                            rwh.handleWrite();
                        }
                    }
                }
            } catch (ClosedSelectorException cse) {
                System.err.println("selector closed:" + cse.getMessage()
                        + "\nquit");
                return;
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        dealShutdown();
    }

    class ChannelAssociater {
        SelectableChannel sc;

        Handler handler;

        int ops;

        ChannelAssociater(SelectableChannel sc, Handler handler, int ops) {
            this.sc = sc;
            this.handler = handler;
            this.ops = ops;
        }
    }

}

//Queue类,对LinkedArrayList的简单包装,提供push,pop,shift,unshift操作,用起来顺手
package zzzhc;

import java.util.*;

/**
 * @author <a href="zzzhcmailto:[email protected]">zzzhc</a>
 *
 */
public class Queue {
    private LinkedList content = new LinkedList();
   
    public void unshift(Object o) {
        content.addFirst(o);
    }
   
    public Object shift() {
        return content.removeFirst();
    }
   
    public void push(Object o) {
        content.addLast(o);
    }
   
    public Object pop() {
        return content.removeLast();
    }
   
    public boolean isEmpty() {
        return content.isEmpty();
    }
   
    public int size() {
        return content.size();
    }

}

//Handler,ConnectHandler ,AcceptHandler ,ReadWriteHandler 接口,处理事件
package zzzhc;

/**
 * @author <a href="zzzhcmailto:[email protected]">zzzhc</a>
 *
 */
public interface Handler {

}


package zzzhc;

/**
 * @author <a href="zzzhcmailto:[email protected]">zzzhc</a>
 *
 */
public interface ConnectHandler extends Handler {
   
    void handleConnect();

}


package zzzhc;

/**
 * @author <a href="zzzhcmailto:[email protected]">zzzhc</a>
 *
 */
public interface AcceptHandler extends Handler {
   
    void handleAccept();

}

package zzzhc;

/**
 * @author <a href="zzzhcmailto:[email protected]">zzzhc</a>
 *
 */
public interface ReadWriteHandler extends Handler {
   
    void handleRead();
   
    void handleWrite();

}

本文地址:http://com.8s8s.com/it/it12699.htm