Standard IO是对字节流的读写,在进行IO之前,首先创建一个流对象,流对象进行读写操作都是按字节,一个字节一个字节的来读或写。而NIO把IO抽象成块,类似磁盘的读写,每次IO操作的单位都是一个块,块被读入内存之后就是一个byte[], NIO一次可以读或写多个字节。

流与块

I/O与NIO最重要的区别是数据打包和传输的方式,I/O以流的方式处理数据,而NIO以块的方式处理数据。

面向流的I/O一次处理一个字节数据:一个输入流产生一个字节数据,一个输出流消费一个字节数据。为流式数据创建过滤器非常容易,链接几个过滤器,以便每个过滤器只负责复杂处理机制的一部分。不利的一面是,面向流的I/O通常相当慢。

面向块的I/O一次处理一个数据块,按块处理数据比按流处理数据要快的多。但是面向块的I/O缺少一些面向流的I/O所具有的优雅性和简单性。

I/O包和NIO已经很好地集成了,java.io.*已经以NIO为基础重新实现了,所以现在它可以利用NIO的一些特性。

通道与缓冲区

1. 通道

通道Channel是对原I/O包中流的模拟,可以通过它读取和写入数据。

通道与流的不同之处在于,流只能在一个方向上流动,而通道是双向的,可以用于读、写或者同时用于读写。

通道包括以下类型:

  • FileChannel:从文件中读写数据
  • DatagramChannel:通过UDP读写网络中的数据
  • SocketChannel:通过TCP读写网络中的数据
  • ServerSockrgtChannel: 可以监听新进来的TCP连接,对每一个新进来的连接都会创建一个SocketChannel。
  • 2. 缓冲区

    发送一个通道的所有数据都必须首先放到缓冲区中,同样地,从通道中读取任何数据都要先读到缓冲区中。也就是说,不会直接对通道进行读写数据,而是要先经过缓冲区。

    缓冲区实质上是一个数组,但是它不仅仅是一个数组。缓冲区提供了对数据的结构化访问,而且还可以跟踪系统的读/写进程。

    缓冲区包括以下类型;

  • ByteBuffer
  • CharBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer
  • 缓冲区状态变量

  • capacity:最大容量
  • position:当前已经读写的字节数
  • limit:还可以读写的字节数
  • 文件NIO实例

    以下展示了使用NIO快速复制文件的实例:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    
    public static void fastCopy(String src, String dist) throws IOException {
    
        /* 获得源文件的输入字节流 */
        FileInputStream fin = new FileInputStream(src);
    
        /* 获取输入字节流的文件通道 */
        FileChannel fcin = fin.getChannel();
    
        /* 获取目标文件的输出字节流 */
        FileOutputStream fout = new FileOutputStream(dist);
    
        /* 获取输出字节流的通道 */
        FileChannel fcout = fout.getChannel();
    
        /* 为缓冲区分配 1024 个字节 */
        ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
    
        while (true) {
    
            /* 从输入通道中读取数据到缓冲区中 */
            int r = fcin.read(buffer);
    
            /* read() 返回 -1 表示 EOF */
            if (r == -1) {
                break;
            }
    
            /* 切换读写 */
            buffer.flip();
    
            /* 把缓冲区的内容写入输出文件中 */
            fcout.write(buffer);
            
            /* 清空缓冲区 */
            buffer.clear();
        }
    }
    

    选择器

    NIO常常被叫做非阻塞IO,主要是因为NIO在网络中的个非阻塞特性被广泛使用。

    NIO实现了IO多路复用中的Reactor模型,一个线程Thread使用一个选择器Selector通过轮询的方式去监听多个通道Channel上的事件,从而让一个线程可以处理多个事件。

    通过配置监听的通道Channel为非阻塞,那么当Channel上的IO事件还未到达时,就不会进入阻塞状态一直等待,二十继续轮询其他的Channel,找到IO事件已经到达的Channel执行。

    因为创建和切换线程的开销很大,因此使用一个线程来处理多个事件而不是一个线程处理事件具有更好的性能。

    应该注意的是,只有套接字Cahnnel才能配置为非阻塞,而FileChannel不能,为FileChannel配置非阻塞也没有意义。

    1. 创建选择器

    1
    
    Selector selector = Selector.open();
    

    2. 将融到注册到选择器上

    1
    2
    3
    
    ServerSocketChannel ssChannel = ServerSocketChannel.open();
    ssChannel.configureBlocking(false);
    ssChannel.register(selector, SelectionKey.OP_ACCEPT);
    

    通道必须配置为非阻塞模式,否则使用选择器就没有任何意义了,因为如果通道在某个事件上被阻塞,那么服务器就不能响应其他事件,必须等待这个事件处理完毕才能去处理其他事件,显然这和选择器的作用背道而驰。

    在将通道注册到选择器上时,还需要指定要注册的具体事件,主要有以下几类:

  • SelectionKey.OP_CONNECT
  • SelectionKey.OP_ACCEPT
  • SelectionKey.OP_READ
  • SelectionKey.OP_WRITE
  • 它们在SelectionKey的定义如下:

    1
    2
    3
    4
    
    public static final int OP_READ = 1 << 0;
    public static final int OP_WRITE = 1 << 2;
    public static final int OP_CONNECT = 1 << 3;
    public static final int OP_ACCEPT = 1 << 4;
    

    可以看出每个事件可以被当成一个位域,从而组成事件集整数。例如:

    1
    
    int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
    

    3.监听事件

    1
    
    int num = selector.select();
    

    使用select()来监听到达的事件,它会一直阻塞直到有至少一个事件到达。

    4. 获取到达的事件

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    
    Set<SelectionKey> keys = selector.selectedKeys();
    Iterator<SelectionKey> keyIterator = keys.iterator();
    while (keyIterator.hasNext()) {
        SelectionKey key = keyIterator.next();
        if (key.isAcceptable()) {
            // ...
        } else if (key.isReadable()) {
            // ...
        }
        keyIterator.remove();
    }
    

    5. 事件循环

    因为一次select()调用不能处理完所有的事件,并且服务器有可能需要一直监听事件,因此服务端处理事件的代码一般会放在一个死循环内。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    
    while (true) {
        int num = selector.select();
        Set<SelectionKey> keys = selector.selectedKeys();
        Iterator<SelectionKey> keyIterator = keys.iterator();
        while (keyIterator.hasNext()) {
            SelectionKey key = keyIterator.next();
            if (key.isAcceptable()) {
                // ...
            } else if (key.isReadable()) {
                // ...
            }
            keyIterator.remove();
        }
    }
    

    套接字NIO实例

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    
    public class NIOServer {
    
        public static void main(String[] args) throws IOException {
    
            Selector selector = Selector.open();
    
            ServerSocketChannel ssChannel = ServerSocketChannel.open();
            ssChannel.configureBlocking(false);
            ssChannel.register(selector, SelectionKey.OP_ACCEPT);
    
            ServerSocket serverSocket = ssChannel.socket();
            InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8888);
            serverSocket.bind(address);
    
            while (true) {
    
                selector.select();
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> keyIterator = keys.iterator();
    
                while (keyIterator.hasNext()) {
    
                    SelectionKey key = keyIterator.next();
    
                    if (key.isAcceptable()) {
    
                        ServerSocketChannel ssChannel1 = (ServerSocketChannel) key.channel();
    
                        // 服务器会为每个新连接创建一个 SocketChannel
                        SocketChannel sChannel = ssChannel1.accept();
                        sChannel.configureBlocking(false);
    
                        // 这个新连接主要用于从客户端读取数据
                        sChannel.register(selector, SelectionKey.OP_READ);
    
                    } else if (key.isReadable()) {
    
                        SocketChannel sChannel = (SocketChannel) key.channel();
                        System.out.println(readDataFromSocketChannel(sChannel));
                        sChannel.close();
                    }
    
                    keyIterator.remove();
                }
            }
        }
    
        private static String readDataFromSocketChannel(SocketChannel sChannel) throws IOException {
    
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            StringBuilder data = new StringBuilder();
    
            while (true) {
    
                buffer.clear();
                int n = sChannel.read(buffer);
                if (n == -1) {
                    break;
                }
                buffer.flip();
                int limit = buffer.limit();
                char[] dst = new char[limit];
                for (int i = 0; i < limit; i++) {
                    dst[i] = (char) buffer.get(i);
                }
                data.append(dst);
                buffer.clear();
            }
            return data.toString();
        }
    }
    
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    
    public class NIOClient {
    
        public static void main(String[] args) throws IOException {
            Socket socket = new Socket("127.0.0.1", 8888);
            OutputStream out = socket.getOutputStream();
            String s = "hello world";
            out.write(s.getBytes());
            out.close();
        }
    }
    

    内存映射文件

    内存映射文件I/O是一种读和写文件数据的方法。它可以比常规的基于流后者基于通道的I/O快得多。

    向内存映射文件写入可能是危险的,只是改变数组的单个元素这样的简单操作,就可能会直接修改磁盘上的文件。修改数据与将数据保存到磁盘是没有分开的。

    下面代码行将文件的前1024个字节映射到内存中,map()方法返回一个MappedByteBuffer,它是ByteBuffer的子类。因此,可以像使用其他任何一个ByteBuffer一样使用新映射的缓冲区,操作系统会在需要时负责执行映射。

    1
    
    MappedByteBuffer mbb = fc.map(FileChannel.MapMode.READ_WRITE, 0, 1024);
    

    对比

    NIO与普通I/O的区别主要有以下两点:

  • NIO是非阻塞的
  • NIO面向块,I/O面向流
  • NIO-IO多路复用详解

    有一个现实场景:

    一个餐厅同时有100位客人到店,当然到店后第一件要做的事情就是点菜。但是问题来了,餐厅老板为了节约人力成本目前只有一位大堂服务员拿着唯一的一本菜单等待客人进行服务。

  • 那么最笨(但是最简单)的方法是(方法A),无论有多少客人等待点餐,服务员都把仅有的一份菜单递给其中一位客人,然后站在客人身旁等待这个客人完成点菜过程。在记录客人点菜内容后,把点菜记录交给后堂厨师。然后是第二位客人。。。。然后是第三位客人。很明显,只有脑袋被门夹过的老板,才会这样设置服务流程。因为随后的80位客人,再等待超时后就会离店(还会给差评)。
  • 于是还有一种办法(方法B),老板马上新雇佣99名服务员,同时印制99本新的菜单。每一名服务员手持一本菜单负责一位客人(关键不只在于服务员,还在于菜单。因为没有菜单客人也无法点菜)。在客人点完菜后,记录点菜内容交给后堂厨师(当然为了更高效,后堂厨师最好也有100名)。这样每一位客人享受的就是VIP服务咯,当然客人不会走,但是人力成本可是一个大头哦(亏死你)。
  • 另外一种办法(方法C),就是改进点菜的方式,当客人到店后,自己申请一本菜单。想好自己要点的才后,就呼叫服务员。服务员站在自己身边后记录客人的菜单内容。将菜单递给厨师的过程也要进行改进,并不是每一份菜单记录好以后,都要交给后堂厨师。服务员可以记录号多份菜单后,同时交给厨师就行了。那么这种方式,对于老板来说人力成本是最低的;对于客人来说,虽然不再享受VIP服务并且要进行一定的等待,但是这些都是可接受的;对于服务员来说,基本上她的时间都没有浪费,基本上被老板压杆了最后一滴油水。
  • 如果您是老板一定会选择第三种方式

  • 客人:客户端请求
  • 点餐内容:客户端发送的实际数据
  • 老板:操作系统
  • 人力成本:系统资源
  • 菜单:文件状态描述符。操作系统对于一个进程能够同时持有的文件状态描述符的个数是有限制的,在Linux系统中可以用$ulimit -n查看这个限制值,当然也可以进行内核参数调整
  • 服务员:操作系统内核用于IO操作的线程
  • 厨师:应用程序线程
  • 菜单传递方式:包括了阻塞和非阻塞 同步IO
  • 方法A:阻塞式/非阻塞式 同步IO 方法B:使用线程进行处理的 阻塞式/非阻塞式 同步IO 方法C:阻塞式/非阻塞式 多路复用IO

    典型的多路复用IO实现

    目前流行的多路复用IO实现主要包括四种:select、poll、epoll、kqueue。

    IO模型 相对性能 关键思路 操作系统 Java支持情况
    select 较高 Reactor windows/Linux 支持,Reactor模式(反应器设计模式)。Linux操作系统的 kernels 2.4内核版本之前,默认使用select;而目前windows下对同步IO的支持,都是select模型
    poll 较高 Reactor Linux Linux下的JAVA NIO框架,Linux kernels 2.6内核版本之前使用poll进行支持。也是使用的Reactor模式
    epoll Reactor/Proactor Linux Linux kernels 2.6内核版本及以后使用epoll进行支持;Linux kernels 2.6内核版本之前使用poll进行支持;另外一定注意,由于Linux下没有Windows下的IOCP技术提供真正的 异步IO 支持,所以Linux下使用epoll模拟异步IO
    kqueue Proactor Linux 目前Java的版本不支持

    多路复用IO技术最适用的是"高并发场景",所谓高并发是指1ms内至少同时有上千个连接请求准备好。其他情况下多路复用IO技术发挥不出来它的优势。另一方面,使用Java NIO进行功能实现,相对于传统的Socket套接字实现要复杂一些,所以实际应用中,需要根据自己的业务需求进行技术选择。

    Reactor模型和Proactor模型

    传统IO模型

    对于传统IO模型,其主要是一个Server对接N个客户端,在客户端连接之后,为每个客户端都分配一个执行线程

    传统IO的特点在于:

  • 每个客户端连接到达之后,服务端会分配一个线程给该客户端,该线程会处理包括读取数据,解码,业务计算,编码以及发送数据整个过程
  • 同一时刻,服务端的吞吐量与服务器所提供的线程数量是呈线性关系的。
  • 这种设计模式在客户端连接不多,并发量不大的情况下是可以运行的很好的,但是在海量并发情况下,这种模式就显得力不从心了,这种模式主要存在问题有如下几点:

  • 服务器的并发量对服务端能够创建的线程数有很大的依赖关系,但是服务器线程却是不能无限增长的;
  • 服务端每个线程不仅要进行IO操作,而且还需要进行业务计算
  • 服务端在获取客户端连接,读取数据,以及写入数据的过程都是阻塞型的,在网络状况不好的情况下,这将极大地降低服务器每个线程的利用率,从而降低服务器吞吐量
  • Reactor事件驱动模型

    在传统IO模型中,由于线程在等待连接以及进行IO操作时都会阻塞当前线程,这部分损耗是非常大的。因而jdk1.4中就提供了一套非阻塞IO的API。该API本质上是以事件驱动来处理网络事件的。而Reactor是基于该API提出的一套IO模型。

    在Reactor模型中,主要有四个角色:客户端连接,Reactor,Acceptor和Handler。这里Acceptor会不断地接收客户端的连接,然后将接收到的连接交由Reactor进行分发,最后有具体的Handler进行处理。改进后的Reactor模型相对于传统IO模型主要有如下优点:

  • 从模型上讲,如果仅仅还是使用一个线程池来处理客户端连接的网络读写,以及业务计算,那么Reactor模型与传统IO模型在效率上并没有什么提升。但是Reactor模型是以事件进行驱动的,其能够将接收客户端连接+网络读写以及业务计算进行拆分,从而极大的提升处理效率
  • Reactor模型是异步非阻塞模型,工作线程在没有网络事件时可以处理其他任务,而不用像传统IO那样必须阻塞等待。
  • Reactor模型—-业务处理与IO分离

    在上面的Reactor模型中,由于网络读写和业务操作都在同一个线程中,在高并发情况下,这里的系统瓶颈主要在两方面:

  • 高频率的网络读写事件处理
  • 大量的业务操作处理
  • 基于上述两个问题,这里再单线程Reactor模型的基础上提出了使用线程池的方式处理业务操作的模型

    在多线程进行业务操作的模型下,该模式主要具有如下特点:

  • 使用一个线程进行客户端连接以及网络读写事件的处理
  • 在接收到客户端连接之后,将该连接交由线程池进行数据的编解码以及业务计算
  • 这种模式相较于前面的模式性能有了很大提升,主要在于在进行网络读写的同时,也进行了业务计算,从而大大提升了系统的吞吐量。但是这种模式也有其不足,主要在于:

  • 网络读写是一个比较消耗CPU的操作,在高斌发噶的情况下,将会有大量的客户端数据需要进行网络读写,此时一个线程将不足以处理这么多请求。
  • Reactor模型—-并发读写

    对于使用线程池处理业务操作的模型,由于网络读写在高并发情况下会成为系统的一个瓶颈,因而针对该模型这里提出了一种改进后的模型,即使用线程池进行网络读写,而仅仅只使用一个线程专门接收客户端连接。

    Reactor模型示例

    对于上述的Reactor模型,服务端主要有三个角色: Reactor, Acceptor和Handler。如下是Reactor的实现代码

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    
    public class Reactor implements Runnable {
      private final Selector selector;
      private final ServerSocketChannel serverSocket;
    
      public Reactor(int port) throws IOException {
        serverSocket = ServerSocketChannel.open();  // 创建服务端的ServerSocketChannel
        serverSocket.configureBlocking(false);  // 设置为非阻塞模式
        selector = Selector.open();  // 创建一个Selector多路复用器
        SelectionKey key = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        serverSocket.bind(new InetSocketAddress(port));  // 绑定服务端端口
        key.attach(new Acceptor(serverSocket));  // 为服务端Channel绑定一个Acceptor
      }
    
      @Override
      public void run() {
        try {
          while (!Thread.interrupted()) {
            selector.select();  // 服务端使用一个线程不断等待客户端的连接到达
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = keys.iterator();
            while (iterator.hasNext()) {
              dispatch(iterator.next());  // 监听到客户端连接事件后将其分发给Acceptor
              iterator.remove();
            }
    
            selector.selectNow();
          }
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    
      private void dispatch(SelectionKey key) throws IOException {
        // 这里的attachement也即前面为服务端Channel绑定的Acceptor,调用其run()方法进行
        // 客户端连接的获取,并且进行分发
        Runnable attachment = (Runnable) key.attachment();
        attachment.run();
      }
    }
    

    这里Reactor首先开启了一个ServerSocketChannel,然后将其绑定到指定的端口,并且注册到了一个多路复用器上。接着在一个线程中,其会在多路复用器上等待客户端连接。当有客户端连接到达后,Reactor就会将其派发给一个Acceptor,由该Acceptor专门进行客户端连接的获取,下面我们继续看一下Acceptor的代码:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    
    public class Acceptor implements Runnable {
      private final ExecutorService executor = Executors.newFixedThreadPool(20);
    
      private final ServerSocketChannel serverSocket;
    
      public Acceptor(ServerSocketChannel serverSocket) {
        this.serverSocket = serverSocket;
      }
    
      @Override
      public void run() {
        try {
          SocketChannel channel = serverSocket.accept();  // 获取客户端连接
          if (null != channel) {
            executor.execute(new Handler(channel));  // 将客户端连接交由线程池处理
          }
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    }
    

    这里可以看到,Acceptor获取到客户端连接之后,其就将其交由线程池进行网络读写了,而这里的主线程只是不断禁停客户端连接事件。下面我们看看Handler的具体逻辑:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    
    public class Handler implements Runnable {
      private volatile static Selector selector;
      private final SocketChannel channel;
      private SelectionKey key;
      private volatile ByteBuffer input = ByteBuffer.allocate(1024);
      private volatile ByteBuffer output = ByteBuffer.allocate(1024);
    
      public Handler(SocketChannel channel) throws IOException {
        this.channel = channel;
        channel.configureBlocking(false);  // 设置客户端连接为非阻塞模式
        selector = Selector.open();  // 为客户端创建一个新的多路复用器
        key = channel.register(selector, SelectionKey.OP_READ);  // 注册客户端Channel的读事件
      }
    
      @Override
      public void run() {
        try {
          while (selector.isOpen() && channel.isOpen()) {
            Set<SelectionKey> keys = select();  // 等待客户端事件发生
            Iterator<SelectionKey> iterator = keys.iterator();
            while (iterator.hasNext()) {
              SelectionKey key = iterator.next();
              iterator.remove();
    
              // 如果当前是读事件,则读取数据
              if (key.isReadable()) {
                read(key);
              } else if (key.isWritable()) {
               // 如果当前是写事件,则写入数据
                write(key);
              }
            }
          }
        } catch (Exception e) {
          e.printStackTrace();
        }
      }
    
      // 这里处理的主要目的是处理Jdk的一个bug,该bug会导致Selector被意外触发,但是实际上没有任何事件到达,
      // 此时的处理方式是新建一个Selector,然后重新将当前Channel注册到该Selector上
      private Set<SelectionKey> select() throws IOException {
        selector.select();
        Set<SelectionKey> keys = selector.selectedKeys();
        if (keys.isEmpty()) {
          int interestOps = key.interestOps();
          selector = Selector.open();
          key = channel.register(selector, interestOps);
          return select();
        }
    
        return keys;
      }
    
      // 读取客户端发送的数据
      private void read(SelectionKey key) throws IOException {
        channel.read(input);
        if (input.position() == 0) {
          return;
        }
    
        input.flip();
        process();  // 对读取的数据进行业务处理
        input.clear();
        key.interestOps(SelectionKey.OP_WRITE);  // 读取完成后监听写入事件
      }
    
      private void write(SelectionKey key) throws IOException {
        output.flip();
        if (channel.isOpen()) {
          channel.write(output);  // 当有写入事件时,将业务处理的结果写入到客户端Channel中
          key.channel();
          channel.close();
          output.clear();
        }
      }
        
      // 进行业务处理,并且获取处理结果。本质上,基于Reactor模型,如果这里成为处理瓶颈,
      // 则直接将其处理过程放入线程池即可,并且使用一个Future获取处理结果,最后写入客户端Channel
      private void process() {
        byte[] bytes = new byte[input.remaining()];
        input.get(bytes);
        String message = new String(bytes, CharsetUtil.UTF_8);
        System.out.println("receive message from client: \n" + message);
    
        output.put("hello client".getBytes());
      }
    }
    

    在Handler中,主要进行的就是为每一个客户端Channel创建一个Selecttor,并且监听该Chaneel的网络读写事件。当有事件到达时,进行数据读写,而业务操作这里交由具体的业务线程池处理

    重要概念:Channel

    通道,被建立的一个应用程序和操作系统交互事件、传递内容的渠道(注意是连接到操作系统)。一个通道会有一个专属的文件状态描述符。那么既然是和操作系统进行内容的传递,那么说明应用程序可以通过通道读取数据,也可以通过通道向操作系统写入数据。

    所有被Selector注册的通道,只能是继承了SelectableChannel类的子类,

  • ServerSocketChannel: 应用服务器程序的监听通道。只有通过这个通道,应用程序才能向操作系统注册支持“多路复用IO”的端口监听。同时支持UDP协议和TCP协议。
  • ScoketChannel: TCP Socket套接字的监听通道,一个Socket套接字对应了一个客户端IP: 端口 到 服务器IP: 端口的通信连接。
  • DatagramChannel: UDP 数据报文的监听通道。
  • 重要概念:Buffer

    数据缓存区:在JAVA NIO框架中,为了保证每个通道的数据读写速度,JAVA NIO框架为每一种需要支持数据读写的通道继承了Buffer的支持。

    这句话怎么理解呢?例如ServerSocketChannel通道它只支持对OP_ACCEPT事件的监听,所以它是不能直接进行网络数据内容的读写的。所以ServerSocketChannel是没有集成Buffer的。

    Buffer有两种工作模式:写模式和读模式。在读模式下,应用程序只能从Buffer中读取数据,不能进行读写操作。但是在写模式下,应用程序是可以进行读操作的,这就表示可能会出现脏读的情况。所以一旦您决定要从Buffer中读取数据,一定要将Buffer的状态改为读模式。

  • position: 缓存区目前这在操作的数据块位置
  • limit: 缓存区最大可以进行操作的位置。缓存区的读写状态正式由这个属性控制的。
  • capacity: 缓存区的最大容量。这个容量是在缓存区创建时进行指定的。由于高并发时通道数量往往会很庞大,所以每一个缓存区的容量最好不要过大。
  • 重要概念:Selector

    Selector的含义是选择器,不过根据我们详细介绍的Selector的岗位职责,您可以把他称之为"轮询代理器"、“事件订阅器”,“channel容器管理机"都行。

  • 事件订阅和Channel管理
  • 应用程序向Selector对象注册需要它关注的Channel,以及具体的某一个Channel会对哪些IO事件感兴趣。Selector中也会维护一个"已经注册的Channel"的容器。以下代码来自WindowsSelectorImpl实现类中,对已注册的Channel的管理容器:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    
    // Initial capacity of the poll array
    private final int INIT_CAP = 8;
    // Maximum number of sockets for select().
    // Should be INIT_CAP times a power of 2
    private final static int MAX_SELECTABLE_FDS = 1024;
    
    // The list of SelectableChannels serviced by this Selector. Every mod
    // MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll
    // array,  where the corresponding entry is occupied by the wakeupSocket
    private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];
    
  • 轮询代理
  • 应用程序不再通过阻塞模式或者非阻塞模式直接询问操作系统"事件有没有发生”,二十由Selector代其询问。

  • 实现不同操作系统的支持
  • 之前已经提到过,多路复用技术是需要操作系统支持的,其特点就是操作系统可以同时扫描一个端口上不同网络连接的事件。所以作为最上层的JVM,必须要为不同操作系统的多路复用IO实现编写不同的代码。

    Java NIO框架简要设计分析

    通过上文的描述,我们知道了多路复用IO技术是操作系统的内核实现。在不同的操作系统,甚至同一系列操作系统的版本中所实现的多路复用IO技术都是不一样的。那么作为跨平台的Java JVM来说如何适应多种多样的多路复用IO技术实现呢?面向对象的威力就显现出来了:无论使用哪种实现方式,他们都会有"选择器"、“通道”,“缓存"这几个操作要素,那么可以为不同的多路复用IO技术创建一个统一的抽象组,并且为不同的操作系统进行具体的实现。JAVA NIO中对各种多路复用IO的支持,主要的基础是java.nio.channel.spi.SelectorProvider抽象类,其中的几个主要抽象方法包括:

  • public abstract DatagramChannel openDatagramChannel(): 创建和这个操作系统匹配的UDP 通道实现。
  • public abstract AbstractSelector openSelector(): 创建和这个操作系统匹配的NIO选择器,就像上文所述,不同的操作系统,不同的版本所默认支持的NIO模型是不一样的。
  • public abstract ServerSocketChannel openServerSocketChannel(): 创建和这个NIO模型匹配的服务器端通道
  • public abstract SocketChannel openSocketChannel(): 创建和这个NIO模型匹配的TCP Socket套接字通道(用来反映客户端的TCP连接)
  • 由于Java NIO框架的整个设计是很大的,所以我们只能还原一部分我们关心的问题。

    多路复用的优点

  • 不再使用多线程来进行IO处理了。当然在实际业务的处理中,应用程序进程还是可以引入线程池技术的
  • 同一个端口可以处理多种协议,例如,使用ServerSocketChannel的服务器端口监听,既可以处理TCP协议又可以处理UDP协议
  • 操作系统级别的优化:多路复用IO技术可以是操作系统级别在一个端口上能够接受多个客户端的IO事件。同时具有之前我们讲到的阻塞式同步IO和非阻塞式同步IO的所有特点。Selector的一部分作用更相当于"轮询代理器"
  • 都是同步IO: 目前我们介绍的 阻塞式IO、非阻塞式IO甚至包括多路复用IO,这些都是基于操作系统级别对“同步IO”的实现。我们一直在说“同步IO”,一直都没有详细说,什么叫做“同步IO”。实际上一句话就可以说清楚: 只有上层(包括上层的某种代理机制)系统询问我是否有某个事件发生了,否则我不会主动告诉上层系统事件发生了