实践代码:
SelectorProcessor类,分发事件(最好使用线程池)
package zzzhc;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
/**
* @author <a href="zzzhc'>mailto:[email protected]">zzzhc </a>
*
*/
public class SelectorProcessor implements Runnable {
private final Selector selector;
private final Queue waitCloseQueue;
private final Queue waitRegisterQueue;
private final Queue waitAddInterestQueue;
private final Thread processorThread;
private boolean shutdown = false;
private final static SelectorProcessor instance;
static {
try {
instance = new SelectorProcessor();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static SelectorProcessor getDefaultInstance() {
return instance;
}
public SelectorProcessor() throws IOException {
selector = Selector.open();
waitCloseQueue = new Queue();
waitRegisterQueue = new Queue();
waitAddInterestQueue = new Queue();
processorThread = new Thread(this);
//processorThread.setDaemon(true);//is this needed?
processorThread.start();
}
public void register(SelectableChannel sc, Handler handler, int ops) {
if (Thread.currentThread() == processorThread) {
doRegister(sc, handler, ops);
} else {
ChannelAssociater r = new ChannelAssociater(sc, handler, ops);
synchronized (waitRegisterQueue) {
waitRegisterQueue.push(r);
selector.wakeup();
}
}
}
public void addInterestOps(SelectableChannel sc, int addOps) {
if (Thread.currentThread() == processorThread) {
addOps(sc, addOps);
} else {
ChannelAssociater r = new ChannelAssociater(sc, null, addOps);
synchronized (waitAddInterestQueue) {
waitAddInterestQueue.push(r);
selector.wakeup();
}
}
}
public void closeChannel(SelectableChannel sc) {
if (Thread.currentThread() == processorThread) {
doClose(sc);
} else {
synchronized (waitCloseQueue) {
waitCloseQueue.push(sc);
selector.wakeup();
}
}
}
protected void doRegister(SelectableChannel sc, Handler handler, int ops) {
if (Thread.currentThread() == processorThread) {
try {
sc.register(selector, ops, handler);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
}
}
protected void addOps(SelectableChannel sc, int addOps) {
if (Thread.currentThread() == processorThread) {
SelectionKey key = sc.keyFor(selector);
key.interestOps(key.interestOps() | addOps);
}
}
protected void doClose(SelectableChannel sc) {
if (Thread.currentThread() == processorThread) {
try {
sc.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
protected void dealRegister() {
if (Thread.currentThread() == processorThread) {
synchronized (waitRegisterQueue) {
while (!waitRegisterQueue.isEmpty()) {
ChannelAssociater ca = (ChannelAssociater) waitRegisterQueue
.shift();
doRegister(ca.sc, ca.handler, ca.ops);
}
}
}
}
protected void dealAddInterest() {
if (Thread.currentThread() == processorThread) {
synchronized (waitAddInterestQueue) {
while (!waitAddInterestQueue.isEmpty()) {
ChannelAssociater ca = (ChannelAssociater) waitAddInterestQueue
.shift();
addOps(ca.sc, ca.ops);
}
}
}
}
protected void dealClose() {
if (Thread.currentThread() == processorThread) {
synchronized (waitCloseQueue) {
while (!waitCloseQueue.isEmpty()) {
SelectableChannel sc = (SelectableChannel) waitCloseQueue
.shift();
doClose(sc);
}
}
}
}
protected void dealShutdown() {
Iterator iterator = selector.keys().iterator();
while (iterator.hasNext()) {
try {
SelectionKey key = (SelectionKey) iterator.next();
key.channel().close();
} catch (IOException e) {
e.printStackTrace();
}
}
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public void shutdown() {
this.shutdown = true;
selector.wakeup();
}
public void run() {
int keyCount = 0;
while (!shutdown) {
dealRegister();
dealAddInterest();
dealClose();
try {
keyCount = selector.select();
if (keyCount == 0) {
continue;
}
Iterator iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = (SelectionKey) iterator.next();
iterator.remove();
key.interestOps(key.interestOps() & ~key.readyOps());
Handler handler = (Handler) key.attachment();
if (key.isAcceptable()) {
((AcceptHandler) handler).handleAccept();
} else if (key.isConnectable()) {
((ConnectHandler) handler).handleConnect();
} else {
ReadWriteHandler rwh = ((ReadWriteHandler) handler);
if (key.isValid() && key.isReadable()) {
rwh.handleRead();
} else if (key.isValid() && key.isWritable()) {
rwh.handleWrite();
}
}
}
} catch (ClosedSelectorException cse) {
System.err.println("selector closed:" + cse.getMessage()
+ "\nquit");
return;
} catch (IOException e) {
e.printStackTrace();
}
}
dealShutdown();
}
class ChannelAssociater {
SelectableChannel sc;
Handler handler;
int ops;
ChannelAssociater(SelectableChannel sc, Handler handler, int ops) {
this.sc = sc;
this.handler = handler;
this.ops = ops;
}
}
}
//Queue类,对LinkedArrayList的简单包装,提供push,pop,shift,unshift操作,用起来顺手
package zzzhc;
import java.util.*;
/**
* @author <a href="zzzhcmailto:[email protected]">zzzhc</a>
*
*/
public class Queue {
private LinkedList content = new LinkedList();
public void unshift(Object o) {
content.addFirst(o);
}
public Object shift() {
return content.removeFirst();
}
public void push(Object o) {
content.addLast(o);
}
public Object pop() {
return content.removeLast();
}
public boolean isEmpty() {
return content.isEmpty();
}
public int size() {
return content.size();
}
}
//Handler,ConnectHandler ,AcceptHandler ,ReadWriteHandler 接口,处理事件
package zzzhc;
/**
* @author <a href="zzzhcmailto:[email protected]">zzzhc</a>
*
*/
public interface Handler {
}
package zzzhc;
/**
* @author <a href="zzzhcmailto:[email protected]">zzzhc</a>
*
*/
public interface ConnectHandler extends Handler {
void handleConnect();
}
package zzzhc;
/**
* @author <a href="zzzhcmailto:[email protected]">zzzhc</a>
*
*/
public interface AcceptHandler extends Handler {
void handleAccept();
}
package zzzhc;
/**
* @author <a href="zzzhcmailto:[email protected]">zzzhc</a>
*
*/
public interface ReadWriteHandler extends Handler {
void handleRead();
void handleWrite();
}
本文地址:http://com.8s8s.com/it/it12699.htm