Netty入门——组件(Channel)二
创始人
2024-04-03 18:50:27

目录

    • 一、channel的主要作用
    • 二、EventLoop处理io任务代码示例
      • 2.1、服务端代码示例
      • 2.2、客户端代码示例
      • 2.3、服务端和客户端查看控制台输出结果
    • 三、CloseFuture关闭问题代码示例
      • 3.1、服务端代码示例
      • 3.2、客户端代码示例
      • 3.3、服务端和客户端查看控制台输出结果
      • 3.4、CloseFuture关闭问题出现的原因
    • 四、CloseFuture关闭问题的处理方式一(获取 CloseFuture 对象,同步处理关闭)
      • 4.1、服务端代码示例
      • 4.2、客户端代码示例
      • 4.3、服务端和客户端查看控制台输出结果
    • 五、CloseFuture关闭问题的处理方式二(获取 CloseFuture 对象,异步处理关闭)
      • 5.1、服务端代码示例
      • 5.2、客户端代码示例
      • 5.3、服务端和客户端查看控制台输出结果

一、channel的主要作用

channel 中的方法作用
close()用来关闭 channel
closeFuture()处理 channel 的关闭,sync 方法作用是同步等待 channel 关闭,addListener 方法是异步等待 channel 关闭
pipeline()添加处理器
write()将数据写入
writeAndFlush()将数据写入并刷出

二、EventLoop处理io任务代码示例

2.1、服务端代码示例

  • 引入pom依赖

     io.nettynetty-all4.1.39.Final
    
  • 服务端

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.DefaultEventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import lombok.extern.slf4j.Slf4j;import java.nio.charset.Charset;/*** @description: EventLoop处理io任务 服务端* @author: xz*/
    @Slf4j
    public class EventLoopServer {public static void main(String[] args) {//创建一个独立的EventLoopGroupDefaultEventLoopGroup normalWorkers = new DefaultEventLoopGroup(2);//1、服务端启动器:负责组装netty组件new ServerBootstrap()//2、将EventLoop分为boss和worker(即将EventLoop分工细化)// boss即第1个参数,只负责accept事件; worker即第2个参数,只负责socketChannel上的读写.group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))//3、选择服务器的 ServerSocketChannel 实现.channel(NioServerSocketChannel.class)//4、添加服务端处理器.childHandler(// 5. channel 代表和客户端进行数据读写的通道 Initializer 初始化,负责添加别的 handlernew ChannelInitializer() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {//6、添加具体 handlerch.pipeline().addLast(normalWorkers,"handler1", new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//msg转ByteBufByteBuf buf = (ByteBuf) msg;//ByteBuf转字符串log.debug(buf.toString(Charset.defaultCharset()));//让消息传递给下一个handlerctx.fireChannelRead(msg);}});}})//7、绑定监听端口.bind(8080);}
    }
    

2.2、客户端代码示例

  • 客户端

    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBufAllocator;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringEncoder;
    import lombok.extern.slf4j.Slf4j;
    import java.net.InetSocketAddress;
    /*** @description: EventLoop处理io任务 客户端* @author: xz*/
    @Slf4j
    public class EventLoopClient {public static void main(String[] args) throws InterruptedException {// 1. 客户端启动器Channel channel = new Bootstrap()// 2. 添加 EventLoop(事件循环).group(new NioEventLoopGroup(1))// 3. 选择客户端的 SocketChannel 实现.channel(NioSocketChannel.class)// 4. 添加客户端处理器.handler(new ChannelInitializer() {// 在连接建立后被调用@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {//9. 消息会经过通道 handler 处理,这里是将 String => ByteBuf 发出nioSocketChannel.pipeline().addLast(new StringEncoder());}})//5. 连接到服务器.connect(new InetSocketAddress("localhost", 8080))//6. 等待 connect 建立连接完毕.sync()//7. 连接对象.channel();System.out.println("打印channel对象==="+channel);//8. 发送数据channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("aaaaaa".getBytes()));}
    }
    

2.3、服务端和客户端查看控制台输出结果

  • 先启动服务端,再启动客户端,查看客户端控制台输出,结果如下:
    在这里插入图片描述
  • 再查看服务端控制台输出,结果如下:
    在这里插入图片描述

三、CloseFuture关闭问题代码示例

3.1、服务端代码示例

  • 同步2.1步骤中的代码

3.2、客户端代码示例

  • 将2.2步骤中客户端代码拆开,代码如下

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import lombok.extern.slf4j.Slf4j;
    import java.net.InetSocketAddress;
    import java.util.Scanner;/*** @description: EventLoop处理io任务中CloseFuture连接问题及处理结果* @author: xz*/
    @Slf4j
    public class CloseFutureClient {public static void main(String[] args) throws InterruptedException {client1();}/*** 将客户端代码拆开* ChannelFuture关闭问题 : 不能保证 log输出的处理关闭之后的操作 一定是在真正的channel.close()之后执行* */public static void client1() throws InterruptedException {ChannelFuture channelFuture = new Bootstrap().group(new NioEventLoopGroup(1)).channel(NioSocketChannel.class).handler(new ChannelInitializer() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.INFO));nioSocketChannel.pipeline().addLast(new StringEncoder());}})//1、连接到服务器//异步非阻塞,main方法发起了调用,真正执行connect是nio线程.connect(new InetSocketAddress("localhost", 8080));//无阻塞向下执行获取channelChannel channel = channelFuture.sync().channel();log.info("建立连接后,打印channel对象====={}",channel);/*** 启动一个新的线程* */new Thread(()->{Scanner scanner = new Scanner(System.in);while(true){String line = scanner.nextLine();if("q".equals(line)){channel.close();// close 异步操作break;}channel.writeAndFlush(line);}},"input").start();log.debug("处理关闭之后的操作");}
    }
    

3.3、服务端和客户端查看控制台输出结果

  • 先启动服务端,再启动客户端,查看客户端控制台输出,结果如下:

    在这里插入图片描述

  • 再查看客户端控制台输出,结果如下:

    在这里插入图片描述

3.4、CloseFuture关闭问题出现的原因

  • 由上述代码示例可知,ChannelFuture关闭问题 : 不能保证 log输出的处理关闭之后的操作 一定是在真正的channel.close()之后执行。

四、CloseFuture关闭问题的处理方式一(获取 CloseFuture 对象,同步处理关闭)

4.1、服务端代码示例

  • 同步2.1步骤中的代码

4.2、客户端代码示例

  • 将3.2步骤中客户端代码进行修改,代码如下

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import lombok.extern.slf4j.Slf4j;
    import java.net.InetSocketAddress;
    import java.util.Scanner;/*** @description: EventLoop处理io任务中CloseFuture连接问题及处理结果* @author: xz*/
    @Slf4j
    public class CloseFutureClient {public static void main(String[] args) throws InterruptedException {client2();}/*** 将客户端代码拆开* ChannelFuture关闭问题的处理方式一 :获取 CloseFuture 对象,同步处理关闭* */public static void client2() throws InterruptedException {ChannelFuture channelFuture = new Bootstrap().group(new NioEventLoopGroup(1)).channel(NioSocketChannel.class).handler(new ChannelInitializer() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.INFO));nioSocketChannel.pipeline().addLast(new StringEncoder());}})//1、连接到服务器//异步非阻塞,main方法发起了调用,真正执行connect是nio线程.connect(new InetSocketAddress("localhost", 8080));//无阻塞向下执行获取channelChannel channel = channelFuture.sync().channel();log.info("建立连接后,打印channel对象====={}",channel);/*** 启动一个新的线程* */new Thread(()->{Scanner scanner = new Scanner(System.in);while(true){String line = scanner.nextLine();if("q".equals(line)){channel.close();// close 异步操作break;}channel.writeAndFlush(line);}},"input").start();// 获取 CloseFuture 对象, 1) 同步处理关闭ChannelFuture closeFuture = channel.closeFuture();log.debug("waiting close...");closeFuture.sync();log.debug("处理关闭之后的操作");}
    }
    

4.3、服务端和客户端查看控制台输出结果

  • 先启动服务端,再启动客户端,查看客户端控制台输出,结果如下:
    在这里插入图片描述

  • 再查看服务端控制台输出,结果如下:

    在这里插入图片描述

五、CloseFuture关闭问题的处理方式二(获取 CloseFuture 对象,异步处理关闭)

5.1、服务端代码示例

  • 同步2.1步骤中的代码

5.2、客户端代码示例

  • 将3.2步骤中客户端代码进行修改,代码如下

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import lombok.extern.slf4j.Slf4j;
    import java.net.InetSocketAddress;
    import java.util.Scanner;/*** @description: EventLoop处理io任务中CloseFuture连接问题及处理结果* @author: xz*/
    @Slf4j
    public class CloseFutureClient {public static void main(String[] args) throws InterruptedException {client3();}/*** 将客户端代码拆开* ChannelFuture关闭问题的处理方式二 :获取 CloseFuture 对象,异步处理关闭* */public static void client3() throws InterruptedException {ChannelFuture channelFuture = new Bootstrap().group(new NioEventLoopGroup(1)).channel(NioSocketChannel.class).handler(new ChannelInitializer() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.INFO));nioSocketChannel.pipeline().addLast(new StringEncoder());}})//1、连接到服务器//异步非阻塞,main方法发起了调用,真正执行connect是nio线程.connect(new InetSocketAddress("localhost", 8080));//无阻塞向下执行获取channelChannel channel = channelFuture.sync().channel();log.info("建立连接后,打印channel对象====={}",channel);/*** 启动一个新的线程* */new Thread(()->{Scanner scanner = new Scanner(System.in);while(true){String line = scanner.nextLine();if("q".equals(line)){channel.close();// close 异步操作break;}channel.writeAndFlush(line);}},"input").start();// 获取 CloseFuture 对象, 2) 异步处理关闭ChannelFuture closeFuture = channel.closeFuture();log.debug("waiting close...");closeFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {log.debug("处理关闭之后的操作");}});}
    }
    

5.3、服务端和客户端查看控制台输出结果

  • 先启动服务端,再启动客户端,查看客户端控制台输出,结果如下:
    在这里插入图片描述

  • 再查看服务端控制台输出,结果如下:

    在这里插入图片描述

相关内容

热门资讯

财联社1月8日早间新闻精选 转自:财联社【财联社1月8日早间新闻精选】 1、工业和信息化部等八部门印发《“人工智能+制造”专项行...
国家医保局:2028年前全面推... 转自:北京日报客户端今后看病缴费将不用再为排长队发愁了。1月8日,国家医保局发布通知,将在全国范围内...
新闻分析丨格陵兰岛为何让美国如... 来源:新华社新华社北京1月7日电 新闻分析|格陵兰岛为何让美国如此垂涎新华社记者林昊美军强行控制委内...
数字人主播纳入监管 “会员降权...   市场监管总局和国家网信办近日联合发布《网络交易平台规则监督管理办法》《直播电商监督管理办法》。这...
突破困境 “丫邦”组合更加坚定 北京时间1月7日,马来西亚羽毛球公开赛混双首轮,2号种子蒋振邦/魏雅欣2比0击败印度组合卡普尔/加德...