public String decode(ByteBuffer buffer){…}
public ByteBuffer encode(String str){… }
public static void main(String args[])throws Exception{
final EchoServer server = new EchoServer();
Thread accept=new Thread(){
public void run(){
server.accept();
}
};
accept.start();
server.service();
}
}
以上EchoServer类的构造方法与例程1的EchoServer类的构造方法基本相同,唯一的区别是,在本例中, ServerSocketChannel采用默认的阻塞模式,即没有调用以下方法:
serverSocketChannel.configureBlocking(false);
EchoServer类的accept()方法负责接收客户连接,ServerSocketChannel的accept()方法工作于阻塞模式,如果没有客户连接,就会进入阻塞状态,直到接收到了客户连接。接下来调用socketChannel.configureBlocking(false)方法把SocketChannel设为非阻塞模式,然后向Selector注册读就绪和写就绪事件。
EchoServer类的service()方法负责接收和发送数据,它在一个无限for循环中,不断调用Selector的select()方法查寻已经发生的事件,然后作出相应的处理。
在EchoServer类的main()方法中,定义了一个匿名线程(暂且称它为Accept线程),它负责执行EchoServer的accept()方法。执行main()方法的主线程启动了Accept线程后,主线程就开始执行EchoServer的service()方法。因此当EchoServer启动后,共有两个线程在工作,Accept线程负责接收客户连接,主线程负责接收和发送数据:
public static void main(String args[])throws Exception{
final EchoServer server = new EchoServer();
Thread accept=new Thread(){ //定义Accept线程
public void run(){
server.accept();
}
};
accept.start(); //启动Accept线程
server.service(); //主线程执行service()方法
}
当Accept线程开始执行以下方法时:
socketChannel.register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE,buffer);
如果主线程正好在执行selector.select()方法,而且处于阻塞状态,那么Accept线程也会进入阻塞状态。两个线程都处于阻塞状态,很有可能导致死锁。导致死锁的具体情形为:Selector中尚没有任何注册的事件,即all-keys集合为空,主线程执行selector.select()方法时将进入阻塞状态,只有Accept线程向Selector注册了事件,并且该事件发生后,主线程才会从selector.select()方法中返回。假如Selector中尚没有任何注册的事件,此时Accept线程调用socketChannel.register()方法向Selector注册事件,由于主线程正在selector.select()方法中阻塞,这使得Accept线程也在socketChannel.register()方法中阻塞。Accept线程无法向Selector注册事件,而主线程没有任何事件可以监控,所以这两个线程都将永远阻塞下去。
为了避免死锁,程序必须保证当Accept线程正在通过socketChannel.register()方法向Selector注册事件时,不允许主线程正在selector.select()方法中阻塞。
为了协调Accept线程和主线程,EchoServer类在以下代码前加了同步标记。当Accept线程开始执行这段代码时,必须先获得gate对象的同步锁,然后进入同步代码块,先执行Selector对象的wakeup()方法,假如此时主线程正好在执行selector.select()方法,而且处于阻塞状态,那么主线程就会被唤醒,立即退出selector.select()方法。
synchronized(gate){ //Accept线程执行这个同步代码块
selector.wakeup();
socketChannel.register(selector,
SelectionKey.OP_READ |
SelectionKey.OP_WRITE, buffer);
}
主线程被唤醒后,在下一次循环中又会执行selector.select()方法,为了保证让Accept线程先执行完socketChannel.register()方法,再让主线程执行selector.select()方法,主线程必须先获得gate对象的同步锁:
for(;;){
//一个空的同步代码块,其作用是为了让主线程等待Accept线程执行完同步代码块
synchronized(gate){} //主线程执行这个同步代码块
int n = selector.select();
…
}
假如Accept线程还没有执行完同步代码块,就不会释放gate对象的同步锁,这使得主线程必须等待片刻,等到Accept线程执行完同步代码块,释放了gate对象的同步锁,主线程才能恢复运行,再次执行selector.select()方法。
3.创建非阻塞的EchoClient
对于客户与服务器之间的通信,按照它们收发数据的协调程度来区分,可分为同步通信和异步通信。同步通信是指甲方向乙方发送了一批数据后,必须等接收到了乙方的响应数据后,再发送下一批数据。异步通信是指发送数据和接收数据的操作互不干扰,各自独立进行。值得注意的是,通信的两端并不要求都采用同样的通信方式,一方采用同步通信方式时,另一方可以采用异步通信方式。
同步通信要求一个I/O操作完成之后,才能完成下一个I/O操作,用阻塞模式更容易实现它。异步通信允许发送数据和接收数据的操作各自独立进行,用非阻塞模式更容易实现它。例程1和例程2介绍的EchoServer都采用异步通信,每次接收数据时,能读到多少数据,就读多少数据,并不要求必须读到一行数据后,才能执行发送数据的操作。
例程3的EchoClient类利用非阻塞模式来实现异步通信。在EchoClient类中,定义了两个ByteBuffer:sendBuffer和receiveBuffer。EchoClient把用户向控制台输入的数据存放到sendBuffer中,并且把sendBuffer中的数据发送给远程服务器;EchoClient把从远程服务器接收到的数据存放在receiveBuffer中,并且把receiveBuffer中的数据打印到控制台。图6显示了这两个Buffer的作用。

图6 sendBuffer和receiveBuffer的作用
//例程3 EchoClient.java(非阻塞模式)
package nonblock;
import java.net.*;
import java.nio.channels.*;
import java.nio.*;
import java.io.*;
import java.nio.charset.*;
import java.util.*;
public class EchoClient{
private SocketChannel socketChannel = null;
private ByteBuffer sendBuffer=ByteBuffer.allocate(1024);
private ByteBuffer receiveBuffer=ByteBuffer.allocate(1024);
private Charset charset=Charset.forName("GBK");
private Selector selector;
public EchoClient()throws IOException{
socketChannel = SocketChannel.open();
InetAddress ia = InetAddress.getLocalHost();
InetSocketAddress isa = new InetSocketAddress(ia,8000);
socketChannel.connect(isa); //采用阻塞模式连接服务器
socketChannel.configureBlocking(false); //设置为非阻塞模式
System.out.println("与服务器的连接建立成功");
selector=Selector.open();
}
public static void main(String args[])throws IOException{
final EchoClient client=new EchoClient();
Thread receiver=new Thread(){ //创建Receiver线程
public void run(){
client.receiveFromUser(); //接收用户向控制台输入的数据
}
};
receiver.start(); //启动Receiver线程
client.talk();
}
public void receiveFromUser(){ //接收用户从控制台输入的数据,把它放到sendBuffer中
try{
BufferedReader localReader=new BufferedReader(new InputStreamReader(System.in));
String msg=null;
while((msg=localReader.readLine())!=null){
synchronized(sendBuffer){
sendBuffer.put(encode(msg + "\r\n"));
}
if(msg.equals("bye"))
break;
}
}catch(IOException e){
e.printStackTrace();
}
}
public void talk()throws IOException { //接收和发送数据
socketChannel.register(selector,
SelectionKey.OP_READ |
SelectionKey.OP_WRITE);
while (selector.select() > 0 ){
Set readyKeys = selector.selectedKeys();
Iterator it = readyKeys.iterator();
while (it.hasNext()){
SelectionKey key=null;
try{
key = (SelectionKey) it.next();
it.remove();
if (key.isReadable()) {
receive(key);
}
if (key.isWritable()) {
send(key);
}
}catch(IOException e){
e.printStackTrace();
try{
if(key!=null){
key.cancel();
key.channel().close();
}
}catch(Exception ex){e.printStackTrace();}
}
}//#while
}//#while
}
public void send(SelectionKey key)throws IOException{
//发送sendBuffer中的数据
SocketChannel socketChannel=(SocketChannel)key.channel();
synchronized(sendBuffer){
sendBuffer.flip(); //把极限设为位置,把位置设为零
socketChannel.write(sendBuffer); //发送数据
sendBuffer.compact(); //删除已经发送的数据
}
}
public void receive(SelectionKey key)throws IOException{
//接收EchoServer发送的数据,把它放到receiveBuffer中
//如果receiveBuffer中有一行数据,就打印这行数据,然后把它从receiveBuffer中删除
SocketChannel socketChannel=(SocketChannel)key.channel();
socketChannel.read(receiveBuffer);
receiveBuffer.flip();
String receiveData=decode(receiveBuffer);
if(receiveData.indexOf("\n")==-1)return;
String outputData=receiveData.substring(0,receiveData.indexOf("\n")+1);
System.out.print(outputData);
if(outputData.equals("echo:bye\r\n")){
key.cancel();
socketChannel.close();
System.out.println("关闭与服务器的连接");
selector.close();
System.exit(0); //结束程序
}
ByteBuffer temp=encode(outputData);
receiveBuffer.position(temp.limit());
receiveBuffer.compact(); //删除已经打印的数据
}
public String decode(ByteBuffer buffer){ //解码
CharBuffer charBuffer= charset.decode(buffer);
return charBuffer.toString();
}
public ByteBuffer encode(String str){ //编码
return charset.encode(str);
}
}
在EchoClient类的构造方法中,创建了SocketChannel对象后,该SocketChannel对象采用默认的阻塞模式,随后调用socketChannel.connect(isa)方法,该方法将按照阻塞模式来与远程服务器EchoServer连接,只有当连接建立成功,该connect()方法才会返回。接下来程序再调用socketChannel.configureBlocking(false)方法把SocketChannel设为非阻塞模式,这使得接下来通过SocketChannel来接收和发送数据都会采用非阻塞模式。
socketChannel = SocketChannel.open();
…
socketChannel.connect(isa);
socketChannel.configureBlocking(false);
EchoClient类共使用了两个线程:主线程和Receiver线程。主线程主要负责接收和发送数据,这些操作由talk()方法实现。Receiver线程负责读取用户向控制台输入的数据,该操作由receiveFromUser()方法实现。
public static void main(String args[])throws IOException{
final EchoClient client=new EchoClient();
Thread receiver=new Thread(){ //创建receiver线程
public void run(){
client.receiveFromUser(); //读取用户向控制台输入的数据
}
};
receiver.start();
client.talk(); //接收和发送数据
}
receiveFromUser()方法读取用户输入的字符串,把它存放到sendBuffer中。如果用户输入字符串“bye”,就退出receiveFromUser()方法,这使得执行该方法的Receiver线程结束运行。由于主线程在执行send()方法时,也会操纵sendBuffer,为了避免两个线程对共享资源sendBuffer的竞争,receiveFromUser()方法对操纵sendBuffer的代码进行了同步。
BufferedReader localReader=new BufferedReader(new InputStreamReader(System.in));
String msg=null;
while((msg=localReader.readLine())!=null){
synchronized(sendBuffer){
sendBuffer.put(encode(msg + "\r\n"));
}
if(msg.equals("bye"))
break;
}
talk()方法向Selector注册读就绪和写就绪事件,然后轮询已经发生的事件,并做出相应的处理。如果发生读就绪事件,就执行receive()方法,如果发生写就绪事件,就执行send()方法。
receive()方法接收EchoServer发回的响应数据,把它们存放在receiveBuffer中。如果receiveBuffer中已经满一行数据,就向控制台打印这一行数据,并且把这行数据从receiveBuffer中删除。如果打印的字符串为“echo:bye\r\n”,就关闭SocketChannel,并且结束程序。
send()方法把sendBuffer中的数据发送给EchoServer,然后删除已经发送的数据。由于Receiver线程以及执行send()方法的主线程都会操纵共享资源sendBuffer,为了避免对共享资源的竞争,对send()方法中操纵sendBuffer的代码进行了同步。
本文介绍了用ServerSocketChannel与SocketChannel来创建服务器和客户程序的方法。ServerSocketChannel与SocketChannel既可以工作于阻塞模式,也可以工作于非阻塞模式,默认情况下,它们都工作于阻塞模式,可以调用configureBlocking()方法来重新设置模式。
总的说来,尽管阻塞模式与非阻塞模式都可以同时处理多个客户连接,但阻塞模式需要使用较多的线程,而非阻塞模式只需使用较少的线程,非阻塞模式能更有效地利用CPU,系统开销小,因此有更高的并发性能。
阻塞模式编程相对简单,但是当线程数目很多时,必须处理好线程之间的同步,如果自己编写线程池,要实现健壮的线程池难度较高。阻塞模式比较适用于同步通信,并且通信双方稳定地发送小批量的数据,双方都不需要花很长时间等待对方的回应。假如通信过程中,由于一方迟迟没有回应,导致另一方长时间的阻塞,为了避免线程无限期地阻塞下去,应该设置超时时间,及时中断长时间阻塞的线程。
非阻塞模式编程相对难一些,对ByteBuffer缓冲区的处理比较麻烦。非阻塞模式比较适用于异步通信,并且通信双方发送大批量的数据,尽管一方接收到另一方的数据可能要花一段时间,但在这段时间内,接收方不必傻傻地等待,可以处理其他事情。
|