你好,欢迎来到电脑编程技巧与维护杂志社! 杂志社简介广告服务读者反馈编程社区  
合订本订阅
 
 
您的位置:杂志经典 / 跟高手学编程
1.3 用Java实现非阻塞通信(下)
 

  public String decode(ByteBuffer buffer){…}

  public ByteBuffer encode(String str){… }

 

  public static void main(String args[])throws Exception{

    final EchoServer server = new EchoServer();

    Thread accept=new Thread(){

        public void run(){

            server.accept();

        }

    };

    accept.start();

    server.service();

  }

}

以上EchoServer类的构造方法与例程1EchoServer类的构造方法基本相同,唯一的区别是,在本例中, ServerSocketChannel采用默认的阻塞模式,即没有调用以下方法:

serverSocketChannel.configureBlocking(false);

EchoServer类的accept()方法负责接收客户连接,ServerSocketChannelaccept()方法工作于阻塞模式,如果没有客户连接,就会进入阻塞状态,直到接收到了客户连接。接下来调用socketChannel.configureBlocking(false)方法把SocketChannel设为非阻塞模式,然后向Selector注册读就绪和写就绪事件。

EchoServer类的service()方法负责接收和发送数据,它在一个无限for循环中,不断调用Selectorselect()方法查寻已经发生的事件,然后作出相应的处理。

EchoServer类的main()方法中,定义了一个匿名线程(暂且称它为Accept线程),它负责执行EchoServeraccept()方法。执行main()方法的主线程启动了Accept线程后,主线程就开始执行EchoServerservice()方法。因此当EchoServer启动后,共有两个线程在工作,Accept线程负责接收客户连接,主线程负责接收和发送数据:

public static void main(String args[])throws Exception{

  final EchoServer server = new EchoServer();

  Thread accept=new Thread(){  //定义Accept线程

     public void run(){

       server.accept();

     }

  };

  accept.start();  //启动Accept线程

  server.service();  //主线程执行service()方法

}

Accept线程开始执行以下方法时:

socketChannel.register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE,buffer);

如果主线程正好在执行selector.select()方法,而且处于阻塞状态,那么Accept线程也会进入阻塞状态。两个线程都处于阻塞状态,很有可能导致死锁。导致死锁的具体情形为:Selector中尚没有任何注册的事件,即all-keys集合为空,主线程执行selector.select()方法时将进入阻塞状态,只有Accept线程向Selector注册了事件,并且该事件发生后,主线程才会从selector.select()方法中返回。假如Selector中尚没有任何注册的事件,此时Accept线程调用socketChannel.register()方法向Selector注册事件,由于主线程正在selector.select()方法中阻塞,这使得Accept线程也在socketChannel.register()方法中阻塞。Accept线程无法向Selector注册事件,而主线程没有任何事件可以监控,所以这两个线程都将永远阻塞下去。

为了避免死锁,程序必须保证当Accept线程正在通过socketChannel.register()方法向Selector注册事件时,不允许主线程正在selector.select()方法中阻塞。

为了协调Accept线程和主线程,EchoServer类在以下代码前加了同步标记。当Accept线程开始执行这段代码时,必须先获得gate对象的同步锁,然后进入同步代码块,先执行Selector对象的wakeup()方法,假如此时主线程正好在执行selector.select()方法,而且处于阻塞状态,那么主线程就会被唤醒,立即退出selector.select()方法。

synchronized(gate){  //Accept线程执行这个同步代码块

selector.wakeup();

  socketChannel.register(selector,

                        SelectionKey.OP_READ |

                        SelectionKey.OP_WRITE, buffer);

}

主线程被唤醒后,在下一次循环中又会执行selector.select()方法,为了保证让Accept线程先执行完socketChannel.register()方法,再让主线程执行selector.select()方法,主线程必须先获得gate对象的同步锁:

for(;;){

  //一个空的同步代码块,其作用是为了让主线程等待Accept线程执行完同步代码块

  synchronized(gate){}  //主线程执行这个同步代码块

int n = selector.select();

}

假如Accept线程还没有执行完同步代码块,就不会释放gate对象的同步锁,这使得主线程必须等待片刻,等到Accept线程执行完同步代码块,释放了gate对象的同步锁,主线程才能恢复运行,再次执行selector.select()方法。

3.创建非阻塞的EchoClient

对于客户与服务器之间的通信,按照它们收发数据的协调程度来区分,可分为同步通信和异步通信。同步通信是指甲方向乙方发送了一批数据后,必须等接收到了乙方的响应数据后,再发送下一批数据。异步通信是指发送数据和接收数据的操作互不干扰,各自独立进行。值得注意的是,通信的两端并不要求都采用同样的通信方式,一方采用同步通信方式时,另一方可以采用异步通信方式。

同步通信要求一个I/O操作完成之后,才能完成下一个I/O操作,用阻塞模式更容易实现它。异步通信允许发送数据和接收数据的操作各自独立进行,用非阻塞模式更容易实现它。例程1和例程2介绍的EchoServer都采用异步通信,每次接收数据时,能读到多少数据,就读多少数据,并不要求必须读到一行数据后,才能执行发送数据的操作。

例程3EchoClient类利用非阻塞模式来实现异步通信。在EchoClient类中,定义了两个ByteBuffersendBufferreceiveBufferEchoClient把用户向控制台输入的数据存放到sendBuffer中,并且把sendBuffer中的数据发送给远程服务器;EchoClient把从远程服务器接收到的数据存放在receiveBuffer中,并且把receiveBuffer中的数据打印到控制台。图6显示了这两个Buffer的作用。

 


     

                           图6  sendBufferreceiveBuffer的作用

//例程3  EchoClient.java(非阻塞模式)

package nonblock;

import java.net.*;

import java.nio.channels.*;

import java.nio.*;

import java.io.*;

import java.nio.charset.*;

import java.util.*;

 

public class EchoClient{

  private SocketChannel socketChannel = null;

  private ByteBuffer sendBuffer=ByteBuffer.allocate(1024);

  private ByteBuffer receiveBuffer=ByteBuffer.allocate(1024);

  private Charset charset=Charset.forName("GBK");

  private Selector selector;

 

  public EchoClient()throws IOException{

    socketChannel = SocketChannel.open();

    InetAddress ia = InetAddress.getLocalHost();

    InetSocketAddress isa = new InetSocketAddress(ia,8000);

    socketChannel.connect(isa);  //采用阻塞模式连接服务器

    socketChannel.configureBlocking(false);  //设置为非阻塞模式

    System.out.println("与服务器的连接建立成功");

    selector=Selector.open();

  }

  public static void main(String args[])throws IOException{

    final EchoClient client=new EchoClient();

    Thread receiver=new Thread(){  //创建Receiver线程

      public void run(){

        client.receiveFromUser();  //接收用户向控制台输入的数据

      }

    };

 

    receiver.start();  //启动Receiver线程

    client.talk();

  }

 

  public void receiveFromUser(){ //接收用户从控制台输入的数据,把它放到sendBuffer

    try{

      BufferedReader localReader=new BufferedReader(new InputStreamReader(System.in));

      String msg=null;

      while((msg=localReader.readLine())!=null){

        synchronized(sendBuffer){

            sendBuffer.put(encode(msg + "\r\n"));

         }

        if(msg.equals("bye"))

          break;

      }

    }catch(IOException e){

       e.printStackTrace();

    }

  }

 

  public void talk()throws IOException { //接收和发送数据

     socketChannel.register(selector,

                          SelectionKey.OP_READ |

                          SelectionKey.OP_WRITE);

     while (selector.select() > 0 ){

       Set readyKeys = selector.selectedKeys();

       Iterator it = readyKeys.iterator();

       while (it.hasNext()){

         SelectionKey key=null;

         try{

             key = (SelectionKey) it.next();

             it.remove();

 

             if (key.isReadable()) {

                 receive(key);

             }

             if (key.isWritable()) {

                 send(key);

             }

         }catch(IOException e){

            e.printStackTrace();

            try{

                if(key!=null){

                    key.cancel();

                    key.channel().close();

                }

            }catch(Exception ex){e.printStackTrace();}

         }

      }//#while

    }//#while

  }

 

  public void send(SelectionKey key)throws IOException{

    //发送sendBuffer中的数据

    SocketChannel socketChannel=(SocketChannel)key.channel();

    synchronized(sendBuffer){

        sendBuffer.flip(); //把极限设为位置,把位置设为零

        socketChannel.write(sendBuffer);  //发送数据

        sendBuffer.compact();  //删除已经发送的数据

     }

  }

 

  public void receive(SelectionKey key)throws IOException{

    //接收EchoServer发送的数据,把它放到receiveBuffer

    //如果receiveBuffer中有一行数据,就打印这行数据,然后把它从receiveBuffer中删除

    SocketChannel socketChannel=(SocketChannel)key.channel();

    socketChannel.read(receiveBuffer);

    receiveBuffer.flip();

    String receiveData=decode(receiveBuffer);

 

    if(receiveData.indexOf("\n")==-1)return;

 

    String outputData=receiveData.substring(0,receiveData.indexOf("\n")+1);

    System.out.print(outputData);

    if(outputData.equals("echo:bye\r\n")){

        key.cancel();

        socketChannel.close();

        System.out.println("关闭与服务器的连接");

        selector.close();

        System.exit(0);  //结束程序

    }

 

    ByteBuffer temp=encode(outputData);

    receiveBuffer.position(temp.limit());

    receiveBuffer.compact();  //删除已经打印的数据

  }

 

  public String decode(ByteBuffer buffer){  //解码

    CharBuffer charBuffer= charset.decode(buffer);

    return charBuffer.toString();

  }

  public ByteBuffer encode(String str){  //编码

    return charset.encode(str);

  }

}

EchoClient类的构造方法中,创建了SocketChannel对象后,该SocketChannel对象采用默认的阻塞模式,随后调用socketChannel.connect(isa)方法,该方法将按照阻塞模式来与远程服务器EchoServer连接,只有当连接建立成功,该connect()方法才会返回。接下来程序再调用socketChannel.configureBlocking(false)方法把SocketChannel设为非阻塞模式,这使得接下来通过SocketChannel来接收和发送数据都会采用非阻塞模式。

socketChannel = SocketChannel.open();

socketChannel.connect(isa);

socketChannel.configureBlocking(false);

EchoClient类共使用了两个线程:主线程和Receiver线程。主线程主要负责接收和发送数据,这些操作由talk()方法实现。Receiver线程负责读取用户向控制台输入的数据,该操作由receiveFromUser()方法实现。

public static void main(String args[])throws IOException{

  final EchoClient client=new EchoClient();

  Thread receiver=new Thread(){  //创建receiver线程

    public void run(){

      client.receiveFromUser();  //读取用户向控制台输入的数据

    }

  };

 

  receiver.start();

  client.talk(); //接收和发送数据

}

receiveFromUser()方法读取用户输入的字符串,把它存放到sendBuffer中。如果用户输入字符串“bye”,就退出receiveFromUser()方法,这使得执行该方法的Receiver线程结束运行。由于主线程在执行send()方法时,也会操纵sendBuffer,为了避免两个线程对共享资源sendBuffer的竞争,receiveFromUser()方法对操纵sendBuffer的代码进行了同步。

BufferedReader localReader=new BufferedReader(new InputStreamReader(System.in));

String msg=null;

while((msg=localReader.readLine())!=null){

synchronized(sendBuffer){

     sendBuffer.put(encode(msg + "\r\n"));

  }

  if(msg.equals("bye"))

    break;

}

talk()方法向Selector注册读就绪和写就绪事件,然后轮询已经发生的事件,并做出相应的处理。如果发生读就绪事件,就执行receive()方法,如果发生写就绪事件,就执行send()方法。

receive()方法接收EchoServer发回的响应数据,把它们存放在receiveBuffer中。如果receiveBuffer中已经满一行数据,就向控制台打印这一行数据,并且把这行数据从receiveBuffer中删除。如果打印的字符串为“echo:bye\r\n”,就关闭SocketChannel,并且结束程序。

send()方法把sendBuffer中的数据发送给EchoServer,然后删除已经发送的数据。由于Receiver线程以及执行send()方法的主线程都会操纵共享资源sendBuffer,为了避免对共享资源的竞争,对send()方法中操纵sendBuffer的代码进行了同步。

四、结语

本文介绍了用ServerSocketChannelSocketChannel来创建服务器和客户程序的方法。ServerSocketChannelSocketChannel既可以工作于阻塞模式,也可以工作于非阻塞模式,默认情况下,它们都工作于阻塞模式,可以调用configureBlocking()方法来重新设置模式。

总的说来,尽管阻塞模式与非阻塞模式都可以同时处理多个客户连接,但阻塞模式需要使用较多的线程,而非阻塞模式只需使用较少的线程,非阻塞模式能更有效地利用CPU,系统开销小,因此有更高的并发性能。

阻塞模式编程相对简单,但是当线程数目很多时,必须处理好线程之间的同步,如果自己编写线程池,要实现健壮的线程池难度较高。阻塞模式比较适用于同步通信,并且通信双方稳定地发送小批量的数据,双方都不需要花很长时间等待对方的回应。假如通信过程中,由于一方迟迟没有回应,导致另一方长时间的阻塞,为了避免线程无限期地阻塞下去,应该设置超时时间,及时中断长时间阻塞的线程。

非阻塞模式编程相对难一些,对ByteBuffer缓冲区的处理比较麻烦。非阻塞模式比较适用于异步通信,并且通信双方发送大批量的数据,尽管一方接收到另一方的数据可能要花一段时间,但在这段时间内,接收方不必傻傻地等待,可以处理其他事情。

 

  推荐精品文章

·2024年12月目录 
·2024年11月目录 
·2024年10月目录 
·2024年9月目录 
·2024年8月目录 
·2024年7月目录 
·2024年6月目录 
·2024年5月目录 
·2024年4月目录 
·2024年3月目录 
·2024年2月目录 
·2024年1月目录
·2023年12月目录
·2023年11月目录

  联系方式
TEL:010-82561037
Fax: 010-82561614
QQ: 100164630
Mail:gaojian@comprg.com.cn

  友情链接
 
Copyright 2001-2010, www.comprg.com.cn, All Rights Reserved
京ICP备14022230号-1,电话/传真:010-82561037 82561614 ,Mail:gaojian@comprg.com.cn
地址:北京市海淀区远大路20号宝蓝大厦E座704,邮编:100089