你好,欢迎来到电脑编程技巧与维护杂志社! 杂志社简介广告服务读者反馈编程社区  
合订本订阅
 
 
您的位置:杂志经典 / 跟高手学编程
1.3 用Java实现非阻塞通信(中)
 

receive()方法的许多代码都涉及对ByteBuffer的三个属性(positionlimitcapacity)的操作,图4演示了以上readBuffbuffer变量的三个属性的变化过程。假定SocketChannelread()方法读入了6个字节,把它存放在readBuff中,并假定buffer中原来有10个字节,buffer.put(readBuff)方法把readBuff中的6个字节拷贝到buffer中,buffer中最后有16个字节。


 

                  图4  receive()方法操纵readBuffbuffer的过程

3)处理写就绪事件

如果SelectionKeyisWritable()方法返回true,就意味着这个SelectionKey所感兴趣的写就绪事件已经发生了。EchoServer类的send()方法负责处理这一事件:

public void send(SelectionKey key)throws IOException{

  //获得与SelectionKey关联的ByteBuffer

  ByteBuffer buffer=(ByteBuffer)key.attachment();

  //获得与SelectionKey关联的SocketChannel

  SocketChannel socketChannel=(SocketChannel)key.channel();

  buffer.flip();  //把极限设为位置,把位置设为0

  //按照GBK编码,把buffer中的字节转换为字符串

  String data=decode(buffer);

  //如果还没有读到一行数据,就返回

  if(data.indexOf("\r\n")==-1)return;

  //截取一行数据

  String outputData=data.substring(0,data.indexOf("\n")+1);

  System.out.print(outputData);

  //把输出的字符串按照GBK编码,转换为字节,把它放在outputBuffer

  ByteBuffer outputBuffer=encode("echo:"+outputData);

  //输出outputBuffer中的所有字节

  while(outputBuffer.hasRemaining())

    socketChannel.write(outputBuffer);

 

  //outputData字符串按照GBK编码,转换为字节,把它放在ByteBuffer

  ByteBuffer temp=encode(outputData);

  //buffer的位置设为temp的极限

  buffer.position(temp.limit());

  //删除buffer中已经处理的数据

  buffer.compact();

  //如果已经输出了字符串“bye\r\n”,就使SelectionKey失效,并关闭SocketChannel

  if(outputData.equals("bye\r\n")){

    key.cancel();

    socketChannel.close();

    System.out.println("关闭与客户的连接");

  }

}

EchoServerreceive()方法把读入的数据都放到一个ByteBuffer中,send()方法就从这个ByteBuffer中取出数据。如果ByteBuffer中还没有一行字符串,就什么也不做,直接退出send()方法;否则,就从ByteBuffer中取出一行字符串XXX,然后向客户发送echo:XXX。接着,send()方法把ByteBuffer中的字符串XXX删除。如果send()方法处理的字符串为“bye\r\n”,就使SelectionKey失效,并关闭SocketChannel,从而断开与客户的连接。

4)编码与解码

ByteBuffer中存放的是字节,它表示字符串的编码。而程序需要把字节转换为字符串,才能进行字符串操作,比如判断里面是否包含“\r\n”,以及截取子字符串。EchoServer类的实用方法decode()负责解码,也就是把字节序列转换为字符串:

public String decode(ByteBuffer buffer){  //解码

  CharBuffer charBuffer= charset.decode(buffer);

  return charBuffer.toString();

}

decode()方法中的charset变量是EchoServer类的成员变量,它表示GBK中文编码,它的定义如下:

private Charset charset=Charset.forName("GBK");

send()方法中,当通过SocketChannelwrite(ByteBuffer buffer)方法发送数据时,write(ByteBuffer buffer)方法不能直接发送字符串,而只能发送ByteBuffer中的字节。因此程序需要对字符串进行编码,把它们转换为字节序列,放在ByteBuffer中,然后再发送。

ByteBuffer outputBuffer=encode("echo:"+outputData);

while(outputBuffer.hasRemaining())

  socketChannel.write(outputBuffer);

EchoServer类的实用方法encode()负责编码,也就是把字符串转换为字节序列:

public ByteBuffer encode(String str){  //编码

  return charset.encode(str);

}

5)在非阻塞模式下确保发送一行数据

send()方法的outputBuffer中存放了字符串echo:XXX的编码。在非阻塞模式下,SocketChannel.write(outputBuffer)方法并不保证一次就把outputBuffer中的所有字节发送完,而是奉行能发送多少就发送多少的原则。如果希望把outputBuffer中的所有字节发送完,需要采用以下循环:

while(outputBuffer.hasRemaining())  //hasRemaining()方法判断是否还有未处理的字节

  socketChannel.write(outputBuffer);

6)删除ByteBuffer中的已处理数据

SelectionKey关联的ByteBuffer附件中存放了读操作与写操作的共享数据。receive()方法把读到的数据放入ByteBuffer,而send()方法从ByteBuffer中一行行地取出数据。当send()方法从ByteBuffer中取出一行字符串XXX,就要把字符串从ByteBuffer中删除。在send()方法中,outputData变量就表示取出的一行字符串XXX,程序先把它编码为字节序列,放在一个名为tempByteBuffer中。接着把buffer的位置设为temp的极限,然后调用buffercompact()方法删除代表字符串XXX的数据。

ByteBuffer temp=encode(outputData);

buffer.position(temp.limit());

buffer.compact();

5演示了以上代码操纵buffer的过程。图5中假定temp中有10个字节,buffer中本来有16个字节,buffer.compact()方法删除缓冲区开头的10个字节,最后剩下6个字节。


5  buffer中删除已经处理过的一行字符串XXX

下例程1EchoServer的源程序。

//例程1  EchoServer.java(非阻塞模式)

package nonblock;

import java.io.*;

import java.nio.*;

import java.nio.channels.*;

import java.nio.charset.*;

import java.net.*;

import java.util.*;

 

public class EchoServer{

  private Selector selector = null;

  private ServerSocketChannel serverSocketChannel = null;

  private int port = 8000;

  private Charset charset=Charset.forName("GBK");

 

  public EchoServer()throws IOException{

    selector = Selector.open();

    serverSocketChannel= ServerSocketChannel.open();

    serverSocketChannel.socket().setReuseAddress(true);

    serverSocketChannel.configureBlocking(false);

    serverSocketChannel.socket().bind(new InetSocketAddress(port));

    System.out.println("服务器启动");

  }

 

  public void service() throws IOException{

    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT );

    while (selector.select() > 0 ){

      Set readyKeys = selector.selectedKeys();

      Iterator it = readyKeys.iterator();

      while (it.hasNext()){

         SelectionKey key=null;

         try{

            key = (SelectionKey) it.next();

            it.remove();

 

            if (key.isAcceptable()) {

              ServerSocketChannel ssc = (ServerSocketChannel) key.channel();

              SocketChannel socketChannel = (SocketChannel) ssc.accept();

              System.out.println("接收到客户连接,来自:" +

                                 socketChannel.socket().getInetAddress() +

                                 ":" + socketChannel.socket().getPort());

              socketChannel.configureBlocking(false);

              ByteBuffer buffer = ByteBuffer.allocate(1024);

              socketChannel.register(selector,

                                     SelectionKey.OP_READ |

                                     SelectionKey.OP_WRITE, buffer);

            }

            if (key.isReadable()) {

                receive(key);

            }

            if (key.isWritable()) {

                send(key);

            }

        }catch(IOException e){

           e.printStackTrace();

           try{

               if(key!=null){

                   key.cancel();

                   key.channel().close();

               }

           }catch(Exception ex){e.printStackTrace();}

        }

      }//#while

    }//#while

  }

 

  public void send(SelectionKey key)throws IOException{

    ByteBuffer buffer=(ByteBuffer)key.attachment();

    SocketChannel socketChannel=(SocketChannel)key.channel();

    buffer.flip();  //把极限设为位置,把位置设为0

    String data=decode(buffer);

    if(data.indexOf("\r\n")==-1)return;

    String outputData=data.substring(0,data.indexOf("\n")+1);

    System.out.print(outputData);

    ByteBuffer outputBuffer=encode("echo:"+outputData);

    while(outputBuffer.hasRemaining())  //发送一行字符串

      socketChannel.write(outputBuffer);

 

    ByteBuffer temp=encode(outputData);

    buffer.position(temp.limit());

    buffer.compact(); //删除已经处理的字符串

 

    if(outputData.equals("bye\r\n")){

      key.cancel();

      socketChannel.close();

      System.out.println("关闭与客户的连接");

    }

  }

 

  public void receive(SelectionKey key)throws IOException{

    ByteBuffer buffer=(ByteBuffer)key.attachment();

 

    SocketChannel socketChannel=(SocketChannel)key.channel();

    ByteBuffer readBuff= ByteBuffer.allocate(32);

    socketChannel.read(readBuff);

    readBuff.flip();

 

    buffer.limit(buffer.capacity());

    buffer.put(readBuff);  //把读到的数据放到buffer

  }

 

  public String decode(ByteBuffer buffer){  //解码

    CharBuffer charBuffer= charset.decode(buffer);

    return charBuffer.toString();

  }

  public ByteBuffer encode(String str){  //编码

    return charset.encode(str);

  }

 

  public static void main(String args[])throws Exception{

    EchoServer server = new EchoServer();

    server.service();

  }

}

2.在EchoServer中混合用阻塞模式与非阻塞模式

在例程1中,EchoServerServerSocketChannel以及SocketChannel都被设置为非阻塞模式,这使得接收连接、接收数据和发送数据的操作都采用非阻塞模式,EchoServer采用一个线程同时完成这些操作。假如有许多客户请求连接,可以把接收客户连接的操作单独由一个线程完成,把接收数据和发送数据的操作由另一个线程完成,这可以提高服务器的并发性能。

负责接收客户连接的线程按照阻塞模式工作,如果收到客户连接,就向Selector注册读就绪和写就绪事件,否则进入阻塞状态,直到接收到了客户的连接。负责接收数据和发送数据的线程按照非阻塞模式工作,只有在读就绪或写就绪事件发生时,才执行相应的接收数据和发送数据操作。

例程2EchoServer类的源程序。其中receive()send()decode()encode()方法的代码与例程1EchoServer类相同,为了节省篇幅,不再重复显示。

//例程2  EchoServer.java(混合使用阻塞模式与非阻塞模式)

package thread2;

import java.io.*;

import java.nio.*;

import java.nio.channels.*;

import java.nio.charset.*;

import java.net.*;

import java.util.*;

 

public class EchoServer{

  private Selector selector = null;

  private ServerSocketChannel serverSocketChannel = null;

  private int port = 8000;

  private Charset charset=Charset.forName("GBK");

 

  public EchoServer()throws IOException{

    selector = Selector.open();

    serverSocketChannel= ServerSocketChannel.open();

    serverSocketChannel.socket().setReuseAddress(true);

    serverSocketChannel.socket().bind(new InetSocketAddress(port));

    System.out.println("服务器启动");

  }

 

  public void accept(){

      for(;;){

        try{

            SocketChannel socketChannel = serverSocketChannel.accept();

            System.out.println("接收到客户连接,来自:" +

                               socketChannel.socket().getInetAddress() +

                               ":" + socketChannel.socket().getPort());

            socketChannel.configureBlocking(false);

 

            ByteBuffer buffer = ByteBuffer.allocate(1024);

            synchronized(gate){

                selector.wakeup();

                socketChannel.register(selector,

                                       SelectionKey.OP_READ |

                                       SelectionKey.OP_WRITE, buffer);

            }

        }catch(IOException e){e.printStackTrace();}

      }

  }

  private Object gate=new Object();

  public void service() throws IOException{

    for(;;){

      synchronized(gate){}

      int n = selector.select();

     

      if(n==0)continue;

      Set readyKeys = selector.selectedKeys();

      Iterator it = readyKeys.iterator();

      while (it.hasNext()){

        SelectionKey key=null;

        try{

            key = (SelectionKey) it.next();

            it.remove();

            if (key.isReadable()) {

                receive(key);

            }

            if (key.isWritable()) {

                send(key);

            }

        }catch(IOException e){

           e.printStackTrace();

           try{

               if(key!=null){

                   key.cancel();

                   key.channel().close();

               }

           }catch(Exception ex){e.printStackTrace();}

        }

      }//#while

    }//#while

  }

 

  public void send(SelectionKey key)throws IOException{…}

 

  public void receive(SelectionKey key)throws IOException{… }

  推荐精品文章

·2024年12月目录 
·2024年11月目录 
·2024年10月目录 
·2024年9月目录 
·2024年8月目录 
·2024年7月目录 
·2024年6月目录 
·2024年5月目录 
·2024年4月目录 
·2024年3月目录 
·2024年2月目录 
·2024年1月目录
·2023年12月目录
·2023年11月目录

  联系方式
TEL:010-82561037
Fax: 010-82561614
QQ: 100164630
Mail:gaojian@comprg.com.cn

  友情链接
 
Copyright 2001-2010, www.comprg.com.cn, All Rights Reserved
京ICP备14022230号-1,电话/传真:010-82561037 82561614 ,Mail:gaojian@comprg.com.cn
地址:北京市海淀区远大路20号宝蓝大厦E座704,邮编:100089