你好,欢迎来到电脑编程技巧与维护杂志社! 杂志社简介广告服务读者反馈编程社区  
合订本订阅
 
 
您的位置:杂志经典 / 跟高手学编程
1.3 用Java实现非阻塞通信(上)
 

    用ServerSocketSocket来编写服务器程序和客户程序,是Java网络编程的最基本的方式。这些服务器程序或客户程序在运行过程中常常会阻塞。例如当一个线程执行ServerSocketaccept()方法时,假如没有客户连接,该线程就会一直等到有了客户连接才从accept()方法返回。再例如当线程执行Socketread()方法时,如果输入流中没有数据,该线程就会一直等到读入了足够的数据才从read()方法返回。

假如服务器程序需要同时与多个客户通信,就必须分配多个工作线程,让它们分别负责与一个客户通信,当然每个工作线程都有可能经常处于长时间的阻塞状态。

JDK1.4版本开始,引入了非阻塞的通信机制。服务器程序接收客户连接、客户程序建立与服务器的连接,以及服务器程序和客户程序收发数据的操作都可以按非阻塞的方式进行。服务器程序只需要创建一个线程,就能完成同时与多个客户通信的任务。

非阻塞的通信机制主要由java.nio包(新I/O包)中的类实现,主要的类包括ServerSocketChannelSocketChannelSelectorSelectionKeyByteBuffer等。

一、线程阻塞

在生活中,最常见的阻塞现象是公路上汽车的堵塞。汽车在公路上快速运行,如果前方交通受阻,就只好停下来等待,等到公路顺畅,才能恢复运行。

线程在运行中也会因为某些原因而阻塞。所有处于阻塞状态的线程的共同特征是:放弃CPU,暂停运行,只有等到导致阻塞的原因消除,才能恢复运行;或者被其他线程中断,该线程会退出阻塞状态,并且抛出InterruptedException

1.线程阻塞的原因

导致线程阻塞的原因主要有以下方面:

l         线程执行了Thread.sleep(int n)方法,线程放弃CPU,睡眠n毫秒,然后恢复运行。

l         线程要执行一段同步代码,由于无法获得相关的同步锁,只好进入阻塞状态,等到获得了同步锁,才能恢复运行。

l         线程执行了一个对象的wait()方法,进入阻塞状态,只有等到其他线程执行了该对象的notify()notifyAll()方法,才可能将其唤醒。

l         线程执行I/O操作或进行远程通信时,会因为等待相关的资源而进入阻塞状态。例如当线程执行System.in.read()方法时,如果用户没有向控制台输入数据,则该线程会一直等读到了用户的输入数据才从read()方法返回。

进行远程通信时,在客户程序中,线程在以下情况可能进入阻塞状态:

l         请求与服务器建立连接时,即当线程执行Socket的带参数的构造方法,或执行Socketconnect()方法时,会进入阻塞状态,直到连接成功,此线程才从Socket的构造方法或connect()方法返回。

l         线程从Socket的输入流读入数据时,如果没有足够的数据,就会进入阻塞状态,直到读到了足够的数据,或者到达输入流的末尾,或者出现了异常,才从输入流的read()方法返回或异常中断。输入流中有多少数据才算足够呢?这要看线程执行的read()方法的类型:

1int read():只要输入流中有一个字节,就算足够。

2int read(byte[] buff):只要输入流中的字节数目与参数buff数组的长度相同就算足够。

3String readLine():只要输入流中有一行字符串,就算足够。值得注意的是InputStream类并没有readLine()方法,在过滤流BufferedReader类中才有此方法。

l         线程向Socket的输出流写一批数据时,可能会进入阻塞状态,等到输出了所有的数据,或者出现异常,才从输出流的write()方法返回或异常中断。

l         当调用SocketsetSoLinger()方法设置了关闭Socket的延迟时间,那么当线程执行Socketclose()方法时,会进入阻塞状态,直到底层Socket发送完所有剩余数据,或者超过了setSoLinger()方法设置的延迟时间,才从close()方法返回。

在服务器程序中,线程在以下情况可能会进入阻塞状态:

l         线程执行ServerSocketaccept()方法,等待客户的连接,直到接收到了客户连接,才从accept()方法返回。

l         线程从Socket的输入流读入数据时, 如果输入流没有足够的数据,就会进入阻塞状态。

l         线程向Socket的输出流写一批数据时,可能会进入阻塞状态,等到输出了所有的数据,或者出现异常,才从输出流的write()方法返回或异常中断。

由此可见,无论是在服务器程序还是客户程序中,当通过Socket的输入流和输出流来读写数据时,都可能进入阻塞状态。这种可能出现阻塞的输入和输出操作被称为阻塞I/O。与此对照,如果执行输入和输出操作时,不会发生阻塞,则称为非阻塞I/O

2.服务器程序用多线程处理阻塞通信的局限

1显示了服务器程序用多线程来同时处理多个客户连接的工作流程。主线程负责接收客户的连接。在线程池中有若干工作线程,它们负责处理具体的客户连接。每当主线程接收到一个客户连接,主线程就会把与这个客户交互的任务交一个空闲的工作线程去完成,主线程继续负责接收下一个客户连接。

 


1  服务器程序用多线程处理阻塞通信

在图1中,用粗体框标识的步骤为可能引起阻塞的步骤。可以看出,当主线程接收客户连接,以及工作线程执行I/O操作时,都有可能进入阻塞状态。

服务器程序用多线程来处理阻塞I/O,尽管能满足同时响应多个客户请求的需求,但是有以下局限:

1Java虚拟机会为每个线程分配独立的堆栈空间,工作线程数目越多,系统开销就越大,而且增加了Java虚拟机调度线程的负担,增加了线程之间同步的复杂性,提高了线程死锁的可能性。

2)工作线程的许多时间都浪费在阻塞I/O操作上,Java虚拟机需要频繁地转让CPU的使用权,使进入阻塞状态的线程放弃CPU,再把CPU分配给处于可运行状态的线程。

由此可见,工作线程并不是越多越好。如图2所示,保持适量的工作线程,会提高服务器的并发性能,但是当工作线程的数目到达某个极限,超出了系统的负荷时,反而会降低并发性能,使得多数客户无法快速得服务器的响应。

 

                                     并发性能


 

                          图2线程数目与并发技能的关系

 

3非阻塞通信的基本思想

 

假如同事要做两件事:烧开水和烧粥。烧开水的步骤如下:

 

     锅里放水,打开煤气炉;

     等待水烧开; //阻塞

     关闭煤气炉,把开水灌到水壶里;

烧烧粥的步骤如下:

锅里放水和米,打开煤气炉;

等待粥烧开; //阻塞

调整煤气炉,改为小火;

等待粥烧熟; //阻塞

关闭煤气炉;

为了同时完成两件事,一种方案是同时请两个人分别做其中的一件事,这相当于采用多线程来同时完成多个任务。还有一种方案是让一个人同时完成两件事,这个人应该善于利用一件事的空闲时间去做另一件事,这个人一刻也不应该闲着:

锅里放水,打开煤气炉; //开始烧开水

锅里放水和米,打开煤气炉; //开始烧粥

while(一直等待,直到有水烧开、粥烧开或粥烧熟事件发生){  //阻塞

if(水烧开)

关闭煤气炉,把开水灌到水壶里;

if(粥烧开)

调整煤气炉,改为小火;

if(粥烧熟)

关闭煤气炉;

}

这个人不断监控烧水以及烧粥的状态,如果发生了“水烧开”、“粥烧开”或“粥烧熟”事件,就去处理这些事件,处理完一件事后继续监控烧水以及烧粥的状态,直到所有的任务都完成。

以上工作方式也可以运用到服务器程序中,服务器程序只需要一个线程就能同时负责接收客户的连接、接收各个客户发送的数据,以及向各个客户发送响应数据。服务器程序的处理流程如下:

while(一直等待,直到有接收连接就绪事件、读就绪事件或写就绪事件发生){ //阻塞

if(有客户连接)

接收客户的连接;  //非阻塞

if(某个Socket的输入流中有可读数据)

从输入流中读数据;  //非阻塞

if(某个Socket的输出流可以写数据)

向输出流写数据;  //非阻塞

}

以上处理流程采用了轮询的工作方式,当某一种操作就绪,就执行该操作,否则就察看是否还有其他就绪的操作可以执行。线程不会因为某一个操作还没有就绪,就进入阻塞状态,一直傻傻地在那里等待这个操作就绪。

为了使轮询的工作方式顺利进行,接收客户的连接、从输入流读数据、以及向输出流写数据的操作都应该以非阻塞的方式运行。所谓非阻塞,就是指当线程执行这些方法时,如果操作还没有就绪,就立即返回,而不会一直等到操作就绪。例如当线程接收客户连接时,如果没有客户连接,就立即返回;再例如当线程从输入流中读数据时,如果输入流中还没有数据,就立即返回,或者如果输入流还没有足够的数据,那么就读取现有的数据,然后返回。值得注意的是,以上while循环条件中的操作还是按照阻塞方式进行的,如果未发生任何事件,就会进入阻塞状态,直到接收连接就绪事件、读就绪事件或写就绪事件中至少有一个事件发生,此时就会执行while循环体中的操作。

 

二、java.nio包中的主要类

java.nio包提供了支持非阻塞通信的类,主要包括:

l         ServerSocketChannelServerSocket的替代类,支持阻塞通信与非阻塞通信。

l         SocketChannelSocket的替代类,支持阻塞通信与非阻塞通信。

l         Selector:为ServerSocketChannel监控接收连接就绪事件,为SocketChannel监控连接就绪、读就绪和写就绪事件。

l         SelectionKey:代表ServerSocketChannel以及SocketChannelSelector注册事件的句柄。当一个SelectionKey对象位于Selector对象的selected-keys集合中,就表示与这个SelectionKey对象相关的事件发生了。

ServerSocketChannel以及SocketChannel都是SelectableChannel的子类,如图3所示。SelectableChannel类以及其子类都能委托Selector来监控它们可能发生的一些事件,这种委托过程也称为注册事件过程。


3  SelectableChannel类及其子类的类框图

ServerSocketChannelSelector注册接收连接就绪事件的代码如下:

SelectionKey key=serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);

SelectionKey类的一些静态常量表示事件类型,ServerSocketChannel只可能发生一种事件:

l         SelectionKey.OP_ACCEPT:接收连接就绪事件,表示至少有了一个客户连接,服务器可以接收这个连接。

SocketChannel可能发生以下三种事件:

l         SelectionKey.OP_CONNECT:连接就绪事件,表示客户与服务器的连接已经建立成功。

l         SelectionKey.OP_READ:读就绪事件,表示输入流中已经有了可读数据,可以执行读操作了。

l         SelectionKey.OP_WRITE:写就绪事件,表示已经可以向输出流写数据了。

SocketChannel提供了接收和发送数据的方法:

l         read(ByteBuffer buffer):接收数据,把它们存放到参数指定的ByteBuffer中。

l         write(ByteBuffer buffer):把参数指定的ByteBuffer中的数据发送出去。

ByteBuffer表示字节缓冲区,SocketChannelread()write()方法都会操纵ByteBufferByteBuffer类继承于Buffer类。ByteBuffer中存放的是字节,为了把它们转换为字符串,还需要用到Charset类,Charset类代表字符编码,它提供了把字节流转换为字符串(解码过程)和把字符串转换为字节流(编码过程)的实用方法。

三、非阻塞编程实例

1.创建非阻塞的EchoServer

在非阻塞模式下,EchoServer只需要启动一个主线程,就能同时处理三件事:

接收客户的连接。

接收客户发送的数据。

向客户发回响应数据。

EchoServer委托Selector来负责监控接收连接就绪事件、读就绪事件和写就绪事件,如果有特定事件发生,就处理该事件。

EchoServer类的构造方法负责启动服务器,把它绑定到一个本地端口,代码如下:

//创建一个Selector对象

selector = Selector.open();

//创建一个ServerSocketChannel对象

serverSocketChannel= ServerSocketChannel.open();

//使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时,

//可以顺利绑定到相同的端口

serverSocketChannel.socket().setReuseAddress(true);

//使ServerSocketChannel工作于非阻塞模式

serverSocketChannel.configureBlocking(false);

//把服务器进程与一个本地端口绑定

serverSocketChannel.socket().bind(new InetSocketAddress(port));

EchoServer类的service()方法负责处理本节开头所说的三件事,体现其主要流程的代码如下:

public void service() throws IOException{

  serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT );

  while (selector.select() > 0 ){  //第一层while循环

    Set readyKeys = selector.selectedKeys();  //获得Selectorselected-keys集合

    Iterator it = readyKeys.iterator();

    while (it.hasNext()){  //第二层while循环

      SelectionKey key=null;

      try{  //处理SelectionKey

        key = (SelectionKey) it.next();  //取出一个SelectionKey

        it.remove();  //SelectionKeySelectorselected-key集合中删除

        if (key.isAcceptable()) {处理接收连接就绪事件; }

        if (key.isReadable()) {处理读就绪事件; }

        if (key.isWritable()) {处理写就绪事件; }

      }catch(IOException e){

        e.printStackTrace();

        try{

          if(key!=null){

            //使这个SelectionKey失效,

            //使得Selector不再监控这个SelectionKey感兴趣的事件

            key.cancel();  

            key.channel().close(); //关闭与这个SelectionKey关联的SocketChannel

          }

        }catch(Exception ex){e.printStackTrace();}

      }

    }//#while

  }//#while

}

service()方法中,首先由ServerSocketChannelSelector注册接收连接就绪事件。如果Selector监控到该事件发生,就会把相应的SelectionKey对象加入到selected-keys集合中。service()方法接下来在第一层while循环中不断询问Selector已经发生的事件,然后依次处理每个事件。

Selectorselect()方法返回当前相关事件已经发生的SelectionKey的个数。如果当前没有任何事件发生,select()方法就会阻塞下去,直到至少有一个事件发生。SelectorselectedKeys()方法返回selected-keys集合,它存放了相关事件已经发生的SelectionKey对象。

service()方法在第二层while循环中,从selected-keys集合中依次取出每个SelectionKey对象,把它从selected-keys集合中删除,然后调用isAcceptable()isReadable()isWritable()方法判断到底是哪种事件发生了,从而作出相应的处理。处理每个SelectionKey的代码放在一个try语句中,如果出现异常,就会在catch语句中使这个SelectionKey失效,并且关闭与之关联的Channel

1)处理接收连接就绪事件

service()方法中处理接收连接就绪事件的代码如下:

if (key.isAcceptable()) {

//获得与SelectionKey关联的ServerSocketChannel

ServerSocketChannel ssc = (ServerSocketChannel) key.channel();

//获得与客户连接的SocketChannel

SocketChannel socketChannel = (SocketChannel) ssc.accept();

System.out.println("接收到客户连接,来自:" +

                       socketChannel.socket().getInetAddress() +

                       ":" + socketChannel.socket().getPort());

//SocketChannel设置为非阻塞模式

socketChannel.configureBlocking(false);

//创建一个用于存放用户发送来的数据的缓冲区

ByteBuffer buffer = ByteBuffer.allocate(1024);

//SocketChannelSelector注册读就绪事件和写就绪事件

socketChannel.register(selector, SelectionKey.OP_READ |SelectionKey.OP_WRITE, buffer);

//关联了一个buffer附件

}

如果SelectionKeyisAcceptable()方法返回true,就意味着这个 SelectionKey所感兴趣的接收连接就绪事件已经发生了。service()方法首先通过SelectionKeychannel()方法获得与之关联的ServerSocketChannel对象,然后调用ServerSocketChannelaccept()方法获得与客户连接的SocketChannel对象。这个SocketChannel对象默认情况下处于阻塞模式。如果希望它执行非阻塞的I/O操作,需要调用它的configureBlocking(false)方法。SocketChannel调用Selectorregister()方法来注册读就绪事件和写就绪事件,还向register()方法传递了一个ByteBuffer类型的参数,这个ByteBuffer将作为附件与新建的SelectionKey对象关联。

2)处理读就绪事件

如果SelectionKeyisReadable()方法返回true,就意味着这个SelectionKey所感兴趣的读就绪事件已经发生了。EchoServer类的receive()方法负责处理这一事件:

public void receive(SelectionKey key)throws IOException{

  //获得与SelectionKey关联的附件

  ByteBuffer buffer=(ByteBuffer)key.attachment();

  //获得与SelectionKey关联的SocketChannel

  SocketChannel socketChannel=(SocketChannel)key.channel();

  //创建一个ByteBuffer,用于存放读到的数据

  ByteBuffer readBuff= ByteBuffer.allocate(32);

  socketChannel.read(readBuff);

  readBuff.flip();

 

  //buffer的极限设为容量

  buffer.limit(buffer.capacity());

  //readBuff中的内容拷贝到buffer中,

  //假定buffer的容量足够大,不会出现缓冲区溢出异常

  buffer.put(readBuff); 

}

receive()方法中,先获得与这个SelectionKey关联的ByteBufferSocketChannelSocketChannel每次读到的数据都被添加到这个ByteBuffer,在程序中,由buffer变量引用这个ByteBuffer对象。在非阻塞模式下,socketChannel.read(readBuff)方法读到多少数据是不确定的,假定读到的字节为n个,那么“0<=n<readBuff”的容量。EchoServer要求每接收到客户的一行字符串XXX(也就是字符串以“\r\n”结尾),就返回字符串echo:XXX。由于无法保证socketChannel.read(readBuff)方法一次读入一行字符串,因此只好把它每次读入的数据都放到buffer中,当这个buffer中凑足了一行字符串,再把它发送给客户。

  推荐精品文章

·2024年12月目录 
·2024年11月目录 
·2024年10月目录 
·2024年9月目录 
·2024年8月目录 
·2024年7月目录 
·2024年6月目录 
·2024年5月目录 
·2024年4月目录 
·2024年3月目录 
·2024年2月目录 
·2024年1月目录
·2023年12月目录
·2023年11月目录

  联系方式
TEL:010-82561037
Fax: 010-82561614
QQ: 100164630
Mail:gaojian@comprg.com.cn

  友情链接
 
Copyright 2001-2010, www.comprg.com.cn, All Rights Reserved
京ICP备14022230号-1,电话/传真:010-82561037 82561614 ,Mail:gaojian@comprg.com.cn
地址:北京市海淀区远大路20号宝蓝大厦E座704,邮编:100089