nio 的使用方法 2

类别:Java 点击:0 评论:0 推荐:

前面的可能还是不方便,再具体一点:
package zzzhc;


/**
 * @author <a href="zzzhcmailto:[email protected]">zzzhc</a>
 *
 */
public interface SocketHandler extends ConnectHandler, ReadWriteHandler {
    void onConnected();
   
    void onConnectFailed(String msg);
   
    /**
     * 在数据从channel中读出后被调用.
     *
     */
    void onRead();
   
    /**
     * 在要写的数所已写入channnel后被调用.
     *
     */
    void onWrite();
   
    void onClosed(String msg);

}

//抽象实现
package zzzhc;

import java.io.IOException;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;

/**
 * @author <a href="zzzhc'>mailto:[email protected]">zzzhc </a>
 * 
 */
public abstract class AbstractSocketHandler implements SocketHandler {
    public final static int DEFAULT_BUFFER_SIZE = 2 * 1024;

    protected final SelectorProcessor processor;

    protected final SocketChannel sc;

    protected final ByteBuffer readBuf;

    protected final ByteBuffer writeBuf;
   
    protected SocketAddress localAddress;

    protected SocketAddress remoteAddress;

    protected boolean connected = false;

    protected boolean closed = false;

    public AbstractSocketHandler(SelectorProcessor processor,
            SocketAddress remoteAddress) throws IOException {
        this.processor = processor;
        this.remoteAddress = remoteAddress;
        this.sc = SocketChannel.open();
        this.sc.configureBlocking(false);
        this.sc.connect(remoteAddress);
        int readSize = DEFAULT_BUFFER_SIZE;
        int writeSize = DEFAULT_BUFFER_SIZE;
        try {
            readSize = sc.socket().getReceiveBufferSize();
            writeSize = sc.socket().getSendBufferSize();
        } catch (SocketException e1) {
        }
        readBuf = ByteBuffer.allocate(readSize);
        writeBuf = ByteBuffer.allocate(writeSize);
        processor.register(sc, this, SelectionKey.OP_CONNECT);
    }

    public AbstractSocketHandler(SelectorProcessor processor,
            SocketAddress remoteAddress, ByteBuffer readBuf, ByteBuffer writeBuf)
            throws IOException {
        this.processor = processor;
        this.remoteAddress = remoteAddress;
        this.sc = SocketChannel.open();
        this.sc.configureBlocking(false);
        this.sc.connect(remoteAddress);
        readBuf.clear();
        writeBuf.clear();
        this.readBuf = readBuf;
        this.writeBuf = writeBuf;
        processor.register(sc, this, SelectionKey.OP_CONNECT);
    }

    public AbstractSocketHandler(SelectorProcessor processor, SocketChannel sc) {
        this.processor = processor;
        this.sc = sc;
        this.connected = true;
        if (this.sc.isBlocking()) {
            try {
                this.sc.configureBlocking(false);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        int readSize = DEFAULT_BUFFER_SIZE;
        int writeSize = DEFAULT_BUFFER_SIZE;
        try {
            readSize = sc.socket().getReceiveBufferSize();
            writeSize = sc.socket().getSendBufferSize();
        } catch (SocketException e1) {
        }
        readBuf = ByteBuffer.allocateDirect(readSize);
        writeBuf = ByteBuffer.allocateDirect(writeSize);
        processor.register(sc, this, SelectionKey.OP_READ);
    }

    public AbstractSocketHandler(SelectorProcessor processor, SocketChannel sc,
            ByteBuffer readBuf, ByteBuffer writeBuf) {
        this.processor = processor;
        this.sc = sc;
        this.connected = true;
        if (this.sc.isBlocking()) {
            try {
                this.sc.configureBlocking(false);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        readBuf.clear();
        writeBuf.clear();
        this.readBuf = readBuf;
        this.writeBuf = writeBuf;
        processor.register(sc, this, SelectionKey.OP_READ);
    }

    public void onConnected() {
        System.out.println("connect to "+this.getRemoteAddress()+" ok");
    }

    public void onConnectFailed(String msg) {
        System.out.println("connect to "+this.getRemoteAddress()+" failed:"+msg);
    }

    /**
     * 如果一次没读完,最后须调用readBuf.compact().
     * 如果已读完,须调用readBuf.clear().
     */
    public void onRead() {
        readBuf.flip();
        int len = readBuf.limit();
        byte[] buf = new byte[len];
        readBuf.get(buf);
        readBuf.clear();
        System.out.print(new String(buf));
        writeBuf.put(buf);
        writeBuf.flip();
        enableWrite();
    }

    public void onWrite() {
       
    }

    public void onClosed(String msg) {
        System.out.println("channel closed:"+msg);
    }

    public void handleConnect() {
        try {
            if (sc.finishConnect()) {
                onConnected();
                connected = true;
            } else {
                onConnectFailed("");
            }
            processor.addInterestOps(sc, SelectionKey.OP_READ);
        } catch (IOException e) {
            e.printStackTrace();
            onConnectFailed(e.getMessage());
        }
    }

    public void handleRead() {
        try {
            //System.out.println("read");
            int len = sc.read(readBuf);
            if (len == -1) {//closed
                dispose("channel been closed correct");
            } else {
                //readBuf.flip();
                onRead();
                processor.addInterestOps(sc,SelectionKey.OP_READ);
            }
        } catch (IOException e) {
            e.printStackTrace();
            dispose(e.getMessage());
        }
    }

    public void handleWrite() {
        if (write()==true) {
            onWrite();
        }
    }
   
    /**
     * 将writeBuf中的数据写入socketchannel中,如写完清空writeBuf返回true,否则返加false.
     * 在使用该方法前应先对writeBuf调用flip()方法.
     * @return
     */
    protected boolean  write() {
        if (writeBuf.hasRemaining()) {
            try {
                sc.write(writeBuf);
            } catch (IOException e) {
                e.printStackTrace();
                dispose(e.getMessage());
                return false;
            }
        }
        if (writeBuf.hasRemaining()) {
            enableWrite();
            return false;
        }
        else {
            writeBuf.clear();
            return true;
        }
    }

    public SocketAddress getLocalAddress() {
        if (localAddress == null) {
            localAddress = this.sc.socket().getLocalSocketAddress();
        }
        return localAddress;
    }

    public SocketAddress getRemoteAddress() {
        if (remoteAddress == null) {
            remoteAddress = this.sc.socket().getRemoteSocketAddress();
        }
        return remoteAddress;
    }

    public void dispose(String msg) {
        if (!closed) {
            closed = true;
            processor.closeChannel(sc);
            onClosed(msg);
        }
    }

    public boolean isConnected() {
        return connected;
    }

    public boolean isClosed() {
        return closed;
    }
   
    public void close() {
        processor.closeChannel(sc);
    }

    public void enableRead() {
        processor.addInterestOps(sc, SelectionKey.OP_READ);
    }

    public void enableWrite() {
        processor.addInterestOps(sc, SelectionKey.OP_WRITE);
    }

}

//使用方法,先在console运行nc -l -p 1234再运行EchoClient
package zzzhc;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;

/**
 * @author <a href="zzzhcmailto:[email protected]">zzzhc</a>
 *
 */
public class EchoClient extends AbstractSocketHandler {
   
    public EchoClient(SelectorProcessor processor,SocketAddress remote) throws IOException {
        super(processor,remote);
    }

    public static void main(String[] args) throws IOException{
        EchoClient client = new EchoClient(SelectorProcessor.getDefaultInstance(),new InetSocketAddress("localhost",1234));
    }
}

//复杂一点的
package zzzhc;

import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URL;
import java.nio.channels.FileChannel;

/**
 * @author <a href="zzzhc'>mailto:[email protected]">zzzhc </a>
 * 
 */
public class SimpleHttpClient extends AbstractSocketHandler {

    public final static int START_CONNECT = 0;

    public final static int CONNECT_OK = 1;

    public final static int CONNECT_FAILED = 2;

    public final static int TRY_SEND_REQUEST = 10;

    public final static int REQUEST_SENT = 11;

    //public final static int RECV_RESPONSE_HEADER = 20;
    public final static int RESPONSE_HEADER_END = 21;

    public final static int START_DATA_TRANSFER = 30;

    public final static int END_DATA_TRANSFER = 31;

    protected int status = START_CONNECT;

    protected URL url;

    protected FileChannel fc;

    protected FileOutputStream out;
   
    private StringBuffer header = new StringBuffer();

    public SimpleHttpClient(SelectorProcessor processor, SocketAddress remote,
            URL url) throws IOException {
        super(processor, remote);
        this.url = url;
    }

    public void onConnected() {
        super.onConnected();
        try {
            Thread.sleep(10);
        }catch (Exception e){}
        status = CONNECT_OK;
        fillHeader(url);
        enableWrite();
        status = TRY_SEND_REQUEST;
    }

    protected void fillHeader(URL url) {
        writeBuf.clear();
        writeBuf.put("GET ".getBytes()).put(url.getPath().getBytes()).put(
                " HTTP/1.1".getBytes()).put((byte) '\r').put((byte) '\n');
        writeBuf.put("Host: ".getBytes()).put(url.getHost().getBytes()).put((byte) '\r')
                .put((byte) '\n');
        writeBuf.put((byte) '\r').put((byte) '\n');
        writeBuf.flip();
    }

    public void onConnectFailed(String msg) {
        super.onConnectFailed(msg);
        status = CONNECT_FAILED;
        System.exit(0);
    }

    public void onRead() {
        readBuf.flip();
        switch (status) {
        case REQUEST_SENT:
            int pos = readBuf.limit();
            byte b;
            while (readBuf.hasRemaining()) {
                b = readBuf.get();
                if (b == '\r') {
                    if (pos - readBuf.position() >= 3) {
                        b = readBuf.get();//\n
                        b = readBuf.get();
                        if (b == '\r') {
                            b = readBuf.get();//\n
                            readBuf.compact();
                            status = RESPONSE_HEADER_END;
                            return;
                        }
                    }
                }
            }
            readBuf.position(pos);
            readBuf.limit(readBuf.capacity());
            break;
        case RESPONSE_HEADER_END:
            status = START_DATA_TRANSFER;
        case START_DATA_TRANSFER:
            System.out.println("start data transfer.");
            if (out == null) {
                String file = url.getFile().trim();
                if ("".equals(file)) {
                    file = "index.html";
                }
                int idx = file.lastIndexOf('/');
                if (idx != -1) {
                    file = file.substring(idx + 1);
                }
                if ("".equals(file)) {
                    file = "index.html";
                }
                try {
                    System.out.println("open "+file+" to write.");
                    out = new FileOutputStream(file);
                    fc = out.getChannel();
                } catch (FileNotFoundException e) {
                    //this should not happend.
                    e.printStackTrace();
                    close();
                }
            }
            try {
                while (readBuf.hasRemaining()) {
                    fc.write(readBuf);
                }
            } catch (IOException e) {
                e.printStackTrace();
                close();
            }
            readBuf.clear();
            break;
        default:
            System.out.println("status:"+status);
        }
    }

    public void onClosed(String msg) {
        super.onClosed(msg);
        try {
            if (out != null) {
                out.close();
                fc.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.exit(0);
    }

    public void onWrite() {
        status = REQUEST_SENT;
        System.out.println("request sent.");
    }

    public static void main(String[] args) throws IOException{
        URL url = new URL("http://www.sohu.com/");
        new SimpleHttpClient(SelectorProcessor.getDefaultInstance(),new InetSocketAddress("www.sohu.com",80),url);
    }
}

本文地址:http://com.8s8s.com/it/it12701.htm