虽然上面说到NIO一个线程就可以支持所有的IO处理。但是瓶颈也是显而易见的!我们看一个客户端的情况,如果这个客户端多次进行请求,如果在Handler中的处理速度较慢,那么后续的客户端请求都会被积压,导致响应变慢!所以引入了Reactor多线程模型!
Reactor多线程模型

Reactor多线程模型就是将Handler中的IO操作和非IO操作分开,操作IO的线程称为IO线程,非IO操作的线程称为工作线程!这样的话,客户端的请求会直接被丢到线程池中,客户端发送请求就不会堵塞!
但是当用户进一步增加的时候,Reactor会出现瓶颈!因为Reactor既要处理IO操作请求,又要响应连接请求!为了分担Reactor的负担,所以引入了主从Reactor模型!
主从Reactor模型

主Reactor用于响应连接请求,从Reactor用于处理IO操作请求!
Netty
Netty是一个高性能NIO框架,其是对Reactor模型的一个实现!
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- Bootstrap b = new Bootstrap();
- b.group(workerGroup);
- b.channel(NioSocketChannel.class);
- b.option(ChannelOption.SO_KEEPALIVE, true);
- b.handler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(new TimeClientHandler());
- }
- });
-
- ChannelFuture f = b.connect(host, port).sync();
-
- f.channel().closeFuture().sync();
- } finally {
- workerGroup.shutdownGracefully();
- }
- public class TimeClientHandler extends ChannelInboundHandlerAdapter {
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- ByteBuf m = (ByteBuf) msg;
- try {
- long currentTimeMillis =
- (m.readUnsignedInt() - 2208988800L) * 1000L;
- System.out.println(new Date(currentTimeMillis));
- ctx.close();
- } finally {
- m.release();
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx,
- Throwable cause) {
- cause.printStackTrace();
- ctx.close();
- }
- }
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(new TimeServerHandler());
- }
- })
- .option(ChannelOption.SO_BACKLOG, 128)
- .childOption(ChannelOption.SO_KEEPALIVE, true);
- // Bind and start to accept incoming connections.
- ChannelFuture f = b.bind(port).sync();
- f.channel().closeFuture().sync();
- } finally {
- workerGroup.shutdownGracefully();
- bossGroup.shutdownGracefully();
- }
- public class TimeServerHandler extends ChannelInboundHandlerAdapter {
-
- @Override
- public void channelActive(final ChannelHandlerContext ctx) {
- final ByteBuf time = ctx.alloc().buffer(4);
- time.writeInt((int)
- (System.currentTimeMillis() / 1000L + 2208988800L));
-
- final ChannelFuture f = ctx.writeAndFlush(time);
- f.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) {
- assert f == future;
- ctx.close();
- }
- });
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx,
- Throwable cause) {
- cause.printStackTrace();
- ctx.close();
- }
- }
(编辑:晋中站长网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|