Netty通信中如何实现高效的数据传输?

2026-05-27 19:421阅读0评论SEO资讯
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计6520个文字,预计阅读时间需要27分钟。

Netty通信中如何实现高效的数据传输?

在开始学习Netty之前,需要先了解操作系统中的IO和零拷贝(已附上链接)。Netty是JBOSS提供的一个Java开源框架,现已成为GitHub上的独立项目。Netty是一个异步的、基于事件驱动的网络应用框架。

学习netty之前,要先了解操作系统中的IO、零拷贝(已经附上链接了)

一、netty的简单介绍
  • Netty 是由 JBOSS 提供的一个 Java 开源框架,现为 Github 上的独立项目。
  • Netty 是一个异步的、基于事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络 IO 程序。
  • Netty 主要针对在 TCP 协议下,面向 Client 端的高并发应用,或者 **Peer-to-Peer **场景下的大量数据持续传输的应用。
  • Netty 本质是一个 NIO 框架,适用于服务器通讯相关的多种应用场景。
  • Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现各进程节点之间的内部通信
为什么有了netty框架

原生的NIO存在问题:

  • NIO的类库和API繁杂:需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer
  • 要熟悉Java多线程编程,因为NIO编程涉及到Reactor模式
  • Selector可能出现空轮询,占用CPU资源

netty是对NIO进行封装,解决了上述问题:

  • 设计优雅:
    适用于各种传输类型的统一API,阻塞和非阻塞Socket;
    基于灵活且可扩展的事件模型,可以清晰地分离关注点;
    高度可定制的线程模型
  • 高性能、吞吐量更高:
    延迟更低;
    减少资源消耗;
    最小化不必要的内存复制。
  • 安全:完整的 SSL/TLS 和 StartTLS 支持。
  • 社区活跃、不断更新
二、线程模式

下面讲解的是netty线程模式的由来

现有的线程模式:

  • BIO的传统阻塞IO服务模型
  • NIO的Reactor模型
    单 Reactor 单线程;
    单 Reactor多线程;
    主从 Reactor多线程
2.1 BIO的传统阻塞IO服务模型

1、每个连接都需要独立的线程完成数据的输入,业务处理,数据返回。当并发数很大,就会创建大量的线程,占用很大系统资源。
2、连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在 Handler对象中的read 操作,导致上面的处理线程资源浪费。

2.2 NIO的Reactor模型

基于I/O多路复用模型:多个客户端进行连接,先把连接请求给Reactor,多个连接共用一个阻塞对象Reactor,由Reactor负责监听和分发,当客户端连接没有数据时不会阻塞线程。
基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务。(解决了当并发数很大时,会创建大量线程,占用很大系统资源)

Reactor模式中核心组成:

  • Reactor:在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理线程来对IO事件做出反应。
  • Handlers:处理线程执行IO事件。
单Reactor单线程

特点:

  • 该模型简单没有多线程的竞争,由一个线程完成所有的操作(监听、分发、执行),没有充分利用多核CPU
  • 因为Reactor是单线程运行,因此在处理某个handler的IO事件时,其他的handler需要进行等待,等待时间长因为该线程还要处理业务;
  • 当Reactor出现问题,就会造成业务模块不可用


上图解析:

  • Reactor对象通过select监控客户端请求事件,收到事件后通过dispatch 进行分发
  • 如果是建立连接请求事件,则由Acceptor通过accept处理连接请求,然后创建一个 Handler 对象处理连接完成后的后续业务处理
  • 如果不是建立连接事件,则Reactor会分发处理线程来处理Handler的IO事件(完成 Read → 业务处理 → send 的完整业务流程)
单Reactor多线程

特点:

  • 充分利用多核CPU,可能出现多线程竞争
  • 因为Reactor是单线程运行,因此在处理某个handler的IO事件时,其他的handler需要进行等待,等待时间短因为处理业务交给worker线程池执行;
  • Reactor承担所有的事件的监听和响应,它是单线程运行,在高并发场景容易出现性能瓶颈
  • 当Reactor出现问题,就会造成业务模块不可用

上图解析:

  • Reactor对象通过select监控客户端请求事件,收到事件后,通过Dispatch 进行分发
  • 如果是建立连接请求事件,则由Acceptor通过accept处理连接请求,然后创建一个Handler对象处理完成连接后的各种事件
  • 如果不是连接请求事件,则Reactor会分发处理线程来处理Handler的IO事件,handler只负责响应事件(read、send),不做具体的业务处理交给worker线程池执行(这样不会使handler阻塞太久)
  • worker线程池会分配独立的worker线程完成真正的业务,并将结果返回给handler,handler收到响应后,通过send将结果返回给client
主从Reactor多线程

特点:

  • 主Reactor可已有多个,从Reactor也可以有多个
  • 主Reactor负责处理建立连接请求事件,从Reactor负责处理业务请求事件
  • 当从Reactor出现问题,可以调用其他的从Reactor提供服务

上图解析:

  • Reactor主线程 MainReactor对象通过select监听连接事件,收到事件后,通过Acceptor的accept处理连接事件
  • 当Acceptor处理连接事件后,MainReactor将连接分配给SubReactor,subreactor将连接加入到连接队列进行监听,并创建handler进行各种事件处理
  • 当有新事件发生时,subreactor就会分发处理线程来处理handler,handler通过read读取数据,分发给后面的worker线程池处理。
  • worker线程池分配独立的worker线程进行业务处理,并返回结果。handler 收到响应的结果后,再通过send将结果返回给client
  • Reactor主线程可以对应多个Reactor子线程,即MainRecator可以关联多个 SubReactor

生活中的体现:
单 Reactor 单线程:前台接待员和服务员是同一个人,全程为顾客服务
单 Reactor 多线程:1 个前台接待员,多个服务员,接待员只负责接待
主从 Reactor 多线程:多个前台接待员,多个服务生

2.3 netty线程模型

netty线程模型主要是依据主从Reactor多线程

  • BossGroup中的NioEventLoop就像MainReactor可以有多个,WorkerGroup中的NioEventLoop就像SubReactor一样可以有多个。

  • Netty抽象出两组线程池,BossGroup专门负责接收客户端的连接,WorkerGroup专门负责网络的读写

  • BossGroup和WorkerGroup类型都是NioEventLoopGroup,NioEventLoopGroup相当于一个事件循环组,这个组中含有多个事件循环,每一个事件循环是NioEventLoop(NioEventLoopGroup可以有多个线程,即可以含有多个NioEventLoop)

  • NioEventLoop表示一个不断循环的执行处理任务的线程,每个 NioEventLoop都有一个Selector,用于监听绑定在其上的socket的网络通讯

  • 每个BossGroup下面的NioEventLoop循环执行的步骤有3步:
    1、轮询 accept 事件
    2、处理 accept 事件,与 client 建立连接,生成NioSocketChannel,并将其注册到某个WorkerGroup的NioEventLoop上的Selector
    3、继续处理任务队列的任务,即runAllTasks

  • 每个WorkerGroup下面的NioEventLoop循环执行的步骤有3步:
    1、轮询 read,write 事件
    2、处理 I/O 事件,即 read,write 事件,在对应NioSocketChannel处理
    3、处理任务队列的任务,即runAllTasks

  • 每个WorkerGroup的NioEventLoop处理业务时,会使用pipeline(管道),pipeline中包含了channel(通道),即通过pipeline可以获取到对应通道,每个通道中都有一个channelPipeline维护了很多的处理器channelhandler。

netty案例-TCP服务

服务端:
NettyServer

NettyServer代码

import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class NettyServer { public static void main(String[] args) throws Exception { //创建BossGroup 和 WorkerGroup //说明 //1. 创建两个线程组 bossGroup 和 workerGroup //2. bossGroup 只是处理连接请求 , 真正的和客户端业务处理,会交给 workerGroup完成 //3. 两个都是无限循环 //4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数 // 默认实际 cpu核数 * 2 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); //8 try { //创建服务器端的启动对象,配置参数 ServerBootstrap bootstrap = new ServerBootstrap(); //使用链式编程来进行设置 bootstrap.group(bossGroup, workerGroup) //设置两个线程组 .channel(NioServerSocketChannel.class) //使用NioSocketChannel 作为服务器的通道实现 .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列等待连接个数 .childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态 // .handler(null) // 该 handler对应 bossGroup , childHandler 对应 workerGroup .childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象) //给pipeline 设置处理器 @Override protected void initChannel(SocketChannel ch) throws Exception { System.out.println("客户socketchannel hashcode=" + ch.hashCode()); //可以使用一个集合管理 SocketChannel, 再推送消息时,可以将业务加入到各个channel 对应的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueue ch.pipeline().addLast(new NettyServerHandler()); } }); // 给我们的workerGroup 的 EventLoop 对应的管道设置处理器 System.out.println(".....服务器 is ready..."); //绑定一个端口并且同步生成了一个 ChannelFuture 对象(也就是立马返回这样一个对象) //启动服务器(并绑定端口) ChannelFuture cf = bootstrap.bind(6668).sync(); //给cf 注册监听器,监控我们关心的事件 cf.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (cf.isSuccess()) { System.out.println("监听端口 6668 成功"); } else { System.out.println("监听端口 6668 失败"); } } }); //对关闭通道事件 进行监听 cf.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }


NettyServerHandler

NettyServerHandler代码

import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelPipeline; import io.netty.util.CharsetUtil; import java.util.concurrent.TimeUnit; /* 说明 1. 我们自定义一个Handler 需要继承netty 规定好的某个HandlerAdapter(规范) 2. 这时我们自定义一个Handler , 才能称为一个handler */ public class NettyServerHandler extends ChannelInboundHandlerAdapter { //读取数据事件(这里我们可以读取客户端发送的消息) /* 1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址 2. Object msg: 就是客户端发送的数据 默认Object */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("服务器读取线程 " + Thread.currentThread().getName() + " channle =" + ctx.channel()); System.out.println("server ctx =" + ctx); System.out.println("看看channel 和 pipeline的关系"); Channel channel = ctx.channel(); ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链表 //将 msg 转成一个 ByteBuf //ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer. ByteBuf buf = (ByteBuf) msg; System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("客户端地址:" + channel.remoteAddress()); } //数据读取完毕 @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //writeAndFlush 是 write + flush //将数据写入到缓存,并刷新 //一般讲,我们对这个发送的数据进行编码 ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8)); } //发生异常后, 一般是需要关闭通道 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }


客户端:
NettyClient

Netty通信中如何实现高效的数据传输?

NettyClient代码

import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class NettyClient { public static void main(String[] args) throws Exception { //客户端需要一个事件循环组 EventLoopGroup group = new NioEventLoopGroup(); try { //创建客户端启动对象 //注意客户端使用的不是 ServerBootstrap 而是 Bootstrap Bootstrap bootstrap = new Bootstrap(); //设置相关参数 bootstrap.group(group) //设置线程组 .channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new NettyClientHandler()); //加入自己的处理器 } }); System.out.println("客户端 ok.."); //启动客户端去连接服务器端 //关于 ChannelFuture 要分析,涉及到netty的异步模型 ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync(); //对关闭通道事件 进行监听 channelFuture.channel().closeFuture().sync(); }finally { group.shutdownGracefully(); } } }


NettyClientHandler

NettyClientHandler代码

import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; public class NettyClientHandler extends ChannelInboundHandlerAdapter { //当通道就绪就会触发该方法 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("client " + ctx); ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server: (>^ω^<)喵", CharsetUtil.UTF_8)); } //当通道有读取事件时,会触发 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("服务器的地址: "+ ctx.channel().remoteAddress()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }


netty中的参数组件 1、Bootstrap、ServerBootstrap

Bootstrap 意思是引导,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件,Netty 中 Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类。

2、Future、ChannelFuture

Netty 中所有的 IO 操作都是异步的,不能立刻得知消息是否被正确处理。但是可以立即返回一个ChannelFuture,它可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件

  • Channel channel(),返回当前正在进行 IO 操作的通道
  • ChannelFuture sync(),等待异步操作执行完毕,同步执行注册的监听事件

Future-Listener 机制
当Future对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture来获取操作执行的状态,注册监听函数来执行完成后的操作。

  • 通过 isDone 方法来判断当前操作是否完成;
  • 通过 isSuccess 方法来判断已完成的当前操作是否成功;
  • 通过 getCause 方法来获取已完成的当前操作失败的原因;
  • 通过 isCancelled 方法来判断已完成的当前操作是否被取消;
  • 通过 addListener 方法来注册监听器,当操作已完成(isDone方法返回完成),将会通知指定的监听器;
3、Channel
  • Netty 网络通信的组件,能够用于执行网络 I/O 操作。
  • Netty 网络通信的组件,能够用于执行网络 I/O 操作。
  • Channel 提供异步的网络 I/O 操作(如建立连接,读写,绑定端口),异步调用意味着任何 I/O 调用都将立即返回一个ChannelFuture,并且不保证在调用结束时所请求的 I/O 操作已完(后期通过ChannelFuture的方法查看异步执行结果)
  • 不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应
    NioSocketChannel,异步的客户端 TCP Socket 连接。
    NioServerSocketChannel,异步的服务器端 TCP Socket 连接。
    NioDatagramChannel,异步的 UDP 连接。
4、Selector
  • Netty 基于 Selector 对象实现 I/O 多路复用,通过 Selector 一个线程可以监听多个连接的 Channel 事件。
  • 当向一个 Selector 中注册 Channel 后,Selector 内部的机制就可以自动不断地查询(Select)这些注册的 Channel 是否有已就绪的 I/O 事件(例如可读,可写,网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个 Channel
5、ChannelHandler 及其实现类
  • ChannelHandler 是一个接口,处理 I/O 事件或拦截 I/O 操作,并将其转发到其 ChannelPipeline(业务处理链)中的下一个处理程序。
  • ChannelHandler 本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方便使用期间,可以继承它的子类
  • 当自定义一个handler类处理器时,需要继承ChannelhandlerAdapter
6、Pipeline 和 ChannelPipeline
  • Pipeline中包含了多个Channel(通道)
  • 一个Channel包含了一个ChannelPipeline,而ChannePipeline中又维护了一个由ChannelHandlerContext组成的双向链表,并且每个channeHandlerContext中又关联着一个channelHandler
  • ChannelPipeline是保存ChannelHandler的List,用于处理或拦截Channel 的入站事件和出站事件操作
  • 入站事件和出站事件在一个双向链表中,入站事件会从链表head往后传递到最后一个入站的 handler,出站事件会从链表tail往前传递到最前t个出站的handler, c两种类型的handler互不干扰
  • ChannelPipeline实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式

ChannelPipeline addFirst(ChannelHandler... handlers),把一个业务处理类(handler)添加到链中的第一个位置
ChannelPipeline addLast(ChannelHandler... handlers),把一个业务处理类(handler)添加到链中的最后一个位置

7、ChannelHandlerContext
  • 保存 Channel 相关的所有上下文信息,同时关联一个 ChannelHandler 对象
  • 即ChannelHandlerContext中包含一个具体的事件处理器ChannelHandler,同时ChannelHandlerContext中也绑定了对应的pipeline 和 Channel 的信息,方便对ChannelHandler进行调用。

ChannelHandlerContext的常用方法:
1、ChannelFuture close(),关闭通道
2、ChannelOutboundInvoker flush(),刷新
3、ChannelFuture writeAndFlush(Object msg),将数据写到ChannelPipeline 中当前 ChannelHandler 的下一个ChannelHandler 开始处理(出站)

8、ChannelOption

Netty在创建Channel实例后,一般都需要设置 ChannelOption 参数

9、EventLoopGroup 和其实现类 NioEventLoopGroup
  • EventLoopGroup 是一组 EventLoop 的抽象,Netty 为了更好的利用多核 CPU 资源,一般会有多个 EventLoop 同时工作,每个 EventLoop 维护着一个 Selector 实例。
  • EventLoopGroup 提供 next 接口,可以从组里面按照一定规则获取其中一个 EventLoop 来处理任务。在 Netty 服务器端编程中,我们一般都需要提供两个 EventLoopGroup,例如:BossEventLoopGroup 和 WorkerEventLoopGroup。
  • BossEventLoop负责接收客户端的连接并将SocketChannel注册到 WorkerEventLoopGroup的其中一个workerEventLoop的selector上并进行后续的IO事件处理
10、Unpooled类
  • Netty提供一个专门用来操作缓冲区(即 Netty 的数据容器)的工具类

ByteBuf buffer = Unpooled.buffer(10);
ByteBuf byteBuf = Unpooled.copiedBuffer("hello,world!", Charset.forName("utf-8"));

netty的案例-群聊系统

GroupChatServer

GroupChatServer代码

import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class GroupChatServer { private int port; //监听端口 public GroupChatServer(int port) { this.port = port; } //编写run方法,处理客户端的请求 public void run() throws Exception{ //创建两个线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //获取到pipeline ChannelPipeline pipeline = ch.pipeline(); //向pipeline加入解码器 pipeline.addLast("decoder", new StringDecoder()); //向pipeline加入编码器 pipeline.addLast("encoder", new StringEncoder()); //加入自己的业务处理handler pipeline.addLast(new GroupChatServerHandler()); } }); System.out.println("netty 服务器启动"); ChannelFuture channelFuture = b.bind(port).sync(); //监听关闭 channelFuture.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new GroupChatServer(7000).run(); } }


GroupChatServerHandler

GroupChatServerHandler代码

import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> { //这样写还要自己遍历Channel //public static List<Channel> channels = new ArrayList<Channel>(); //使用一个hashmap 管理私聊(私聊本案例并未实现,只是提供个思路) //public static Map<String, Channel> channels = new HashMap<String,Channel>(); //定义一个channle 组,管理所有的channel //GlobalEventExecutor.INSTANCE) 是全局的事件执行器,是一个单例 private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //handlerAdded 表示连接建立,一旦连接,第一个被执行 //将当前channel 加入到 channelGroup @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); //将该客户加入聊天的信息推送给其它在线的客户端 //该方法会将 channelGroup 中所有的channel 遍历,并发送消息,我们不需要自己遍历 channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 加入聊天" + sdf.format(new java.util.Date()) + " \n"); channelGroup.add(channel); //私聊如何实现 // channels.put("userid100",channel); } //断开连接, 将xx客户离开信息推送给当前在线的客户 @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 离开了\n"); System.out.println("channelGroup size" + channelGroup.size()); } //表示channel 处于活动状态, 提示 xx上线 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //这个是给服务端看的,客户端上面已经提示xxx加入群聊了 System.out.println(ctx.channel().remoteAddress() + " 上线了~"); } //表示channel 处于不活动状态, 提示 xx离线了 @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println(ctx.channel().remoteAddress() + " 离线了~"); } //读取数据,转发给在线的每一个客户端 @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { //获取到当前channel Channel channel = ctx.channel(); //这时我们遍历channelGroup, 根据不同的情况,回送不同的消息 channelGroup.forEach(ch -> { if(channel != ch) { //不是当前的channel,转发消息 ch.writeAndFlush("[客户]" + channel.remoteAddress() + " 发送了消息" + msg + "\n"); }else {//回显自己发送的消息给自己 ch.writeAndFlush("[自己]发送了消息" + msg + "\n"); } }); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //关闭通道 ctx.close(); } }


GroupChatClient

GroupChatClient代码

import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.util.Scanner; public class GroupChatClient { //属性 private final String host; private final int port; public GroupChatClient(String host, int port) { this.host = host; this.port = port; } public void run() throws Exception{ EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //得到pipeline ChannelPipeline pipeline = ch.pipeline(); //加入相关handler pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); //加入自定义的handler pipeline.addLast(new GroupChatClientHandler()); } }); ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); //得到channel Channel channel = channelFuture.channel(); System.out.println("-------" + channel.localAddress()+ "--------"); //客户端需要输入信息,创建一个扫描器 Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()) { String msg = scanner.nextLine(); //通过channel 发送到服务器端 channel.writeAndFlush(msg + "\r\n"); } }finally { group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new GroupChatClient("127.0.0.1", 7000).run(); } }


GroupChatClientHandler

GroupChatClientHandler代码

import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> { //从服务器拿到的数据 @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(msg.trim()); } }

netty的心跳检测机制

IdleStateHandler是netty 提供的处理空闲状态的处理器(放在ChannelPipeline维护的ChannelHandler的双向链表上):

  • long readerIdleTime : 表示多长时间没有读, 就会发送一个心跳检测包检测是否连接
  • long writerIdleTime : 表示多长时间没有写, 就会发送一个心跳检测包检测是否连接
  • long allIdleTime : 表示多长时间没有读写, 就会发送一个心跳检测包检测是否连接
netty的编解码器

编解码器放在ChannelPipeline维护的ChannelHandler的双向链表上
服务端发数据给客户端:服务端—>出站(编码)—>Socket通道—>入站(解码)—>客户端
客户端发数据给服务端:客户端—>出站(编码)—>Socket通道—>入站(解码)—>服务端

编解码器(自定义编解码器时需要继承下面的其中一个类):

  • ByteToMessageDecoder
  • LineBasedFrameDecoder:这个类在 Netty 内部也有使用,它使用行尾控制字符(\n或者\r\n)作为分隔符来解析数据。
  • DelimiterBasedFrameDecoder:使用自定义的特殊字符作为消息的分隔符。
  • HttpObjectDecoder:一个 HTTP 数据的解码器
  • LengthFieldBasedFrameDecoder:通过指定长度来标识整包消息,这样就可以自动的处理黏包和半包消息。
TCP 粘包和拆包及解决方案

TCP粘包和拆包
使用优化方法(Nagle 算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样做虽然提高了效率,但是接收端就难于分辨出完整的数据包了,出现了粘包和拆包的问题,因为面向流的通信是无消息保护边界的

  • 服务端分两次读取到了两个独立的数据包,分别是 D1 和 D2,没有粘包和拆包
  • 服务端一次接受到了两个数据包,D1 和 D2 粘合在一起,称之为 TCP 粘包
  • 服务端分两次读取到了数据包,第一次读取到了完整的 D1 包和 D2 包的部分内容,第二次读取到了 D2 包的剩余内容,这称之为 TCP 拆包
  • 服务端分两次读取到了数据包,第一次读取到了 D1 包的部分内容 D1_1,第二次读取到了 D1 包的剩余部分内容 D1_2 和完整的 D2 包。

解决方案
使用自定义协议+编解码器来解决,关键就是要解决服务器端每次读取数据长度的问题,这个问题解决,就不会出现服务器多读或少读数据的问题,从而避免的 TCP 粘包、拆包。

RPC(基于netty)

1、RPC(Remote Procedure Call)远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程
2、两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样

RPC的调用流程图

  • 服务消费方(client)以本地调用方式调用服务
  • client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
  • client stub 将消息进行编码并发送到服务端
  • server stub 收到消息后进行解码
  • server stub 根据解码结果调用本地的服务
  • 本地服务执行并将结果返回给 server stub
  • server stub 将返回导入结果进行编码并发送至消费方
  • client stub 接收到消息并进行解码
  • 服务消费方(client)得到结果

RPC 的目标就是将 2 - 8 这些步骤都封装起来,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用

本文共计6520个文字,预计阅读时间需要27分钟。

Netty通信中如何实现高效的数据传输?

在开始学习Netty之前,需要先了解操作系统中的IO和零拷贝(已附上链接)。Netty是JBOSS提供的一个Java开源框架,现已成为GitHub上的独立项目。Netty是一个异步的、基于事件驱动的网络应用框架。

学习netty之前,要先了解操作系统中的IO、零拷贝(已经附上链接了)

一、netty的简单介绍
  • Netty 是由 JBOSS 提供的一个 Java 开源框架,现为 Github 上的独立项目。
  • Netty 是一个异步的、基于事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络 IO 程序。
  • Netty 主要针对在 TCP 协议下,面向 Client 端的高并发应用,或者 **Peer-to-Peer **场景下的大量数据持续传输的应用。
  • Netty 本质是一个 NIO 框架,适用于服务器通讯相关的多种应用场景。
  • Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现各进程节点之间的内部通信
为什么有了netty框架

原生的NIO存在问题:

  • NIO的类库和API繁杂:需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer
  • 要熟悉Java多线程编程,因为NIO编程涉及到Reactor模式
  • Selector可能出现空轮询,占用CPU资源

netty是对NIO进行封装,解决了上述问题:

  • 设计优雅:
    适用于各种传输类型的统一API,阻塞和非阻塞Socket;
    基于灵活且可扩展的事件模型,可以清晰地分离关注点;
    高度可定制的线程模型
  • 高性能、吞吐量更高:
    延迟更低;
    减少资源消耗;
    最小化不必要的内存复制。
  • 安全:完整的 SSL/TLS 和 StartTLS 支持。
  • 社区活跃、不断更新
二、线程模式

下面讲解的是netty线程模式的由来

现有的线程模式:

  • BIO的传统阻塞IO服务模型
  • NIO的Reactor模型
    单 Reactor 单线程;
    单 Reactor多线程;
    主从 Reactor多线程
2.1 BIO的传统阻塞IO服务模型

1、每个连接都需要独立的线程完成数据的输入,业务处理,数据返回。当并发数很大,就会创建大量的线程,占用很大系统资源。
2、连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在 Handler对象中的read 操作,导致上面的处理线程资源浪费。

2.2 NIO的Reactor模型

基于I/O多路复用模型:多个客户端进行连接,先把连接请求给Reactor,多个连接共用一个阻塞对象Reactor,由Reactor负责监听和分发,当客户端连接没有数据时不会阻塞线程。
基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务。(解决了当并发数很大时,会创建大量线程,占用很大系统资源)

Reactor模式中核心组成:

  • Reactor:在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理线程来对IO事件做出反应。
  • Handlers:处理线程执行IO事件。
单Reactor单线程

特点:

  • 该模型简单没有多线程的竞争,由一个线程完成所有的操作(监听、分发、执行),没有充分利用多核CPU
  • 因为Reactor是单线程运行,因此在处理某个handler的IO事件时,其他的handler需要进行等待,等待时间长因为该线程还要处理业务;
  • 当Reactor出现问题,就会造成业务模块不可用


上图解析:

  • Reactor对象通过select监控客户端请求事件,收到事件后通过dispatch 进行分发
  • 如果是建立连接请求事件,则由Acceptor通过accept处理连接请求,然后创建一个 Handler 对象处理连接完成后的后续业务处理
  • 如果不是建立连接事件,则Reactor会分发处理线程来处理Handler的IO事件(完成 Read → 业务处理 → send 的完整业务流程)
单Reactor多线程

特点:

  • 充分利用多核CPU,可能出现多线程竞争
  • 因为Reactor是单线程运行,因此在处理某个handler的IO事件时,其他的handler需要进行等待,等待时间短因为处理业务交给worker线程池执行;
  • Reactor承担所有的事件的监听和响应,它是单线程运行,在高并发场景容易出现性能瓶颈
  • 当Reactor出现问题,就会造成业务模块不可用

上图解析:

  • Reactor对象通过select监控客户端请求事件,收到事件后,通过Dispatch 进行分发
  • 如果是建立连接请求事件,则由Acceptor通过accept处理连接请求,然后创建一个Handler对象处理完成连接后的各种事件
  • 如果不是连接请求事件,则Reactor会分发处理线程来处理Handler的IO事件,handler只负责响应事件(read、send),不做具体的业务处理交给worker线程池执行(这样不会使handler阻塞太久)
  • worker线程池会分配独立的worker线程完成真正的业务,并将结果返回给handler,handler收到响应后,通过send将结果返回给client
主从Reactor多线程

特点:

  • 主Reactor可已有多个,从Reactor也可以有多个
  • 主Reactor负责处理建立连接请求事件,从Reactor负责处理业务请求事件
  • 当从Reactor出现问题,可以调用其他的从Reactor提供服务

上图解析:

  • Reactor主线程 MainReactor对象通过select监听连接事件,收到事件后,通过Acceptor的accept处理连接事件
  • 当Acceptor处理连接事件后,MainReactor将连接分配给SubReactor,subreactor将连接加入到连接队列进行监听,并创建handler进行各种事件处理
  • 当有新事件发生时,subreactor就会分发处理线程来处理handler,handler通过read读取数据,分发给后面的worker线程池处理。
  • worker线程池分配独立的worker线程进行业务处理,并返回结果。handler 收到响应的结果后,再通过send将结果返回给client
  • Reactor主线程可以对应多个Reactor子线程,即MainRecator可以关联多个 SubReactor

生活中的体现:
单 Reactor 单线程:前台接待员和服务员是同一个人,全程为顾客服务
单 Reactor 多线程:1 个前台接待员,多个服务员,接待员只负责接待
主从 Reactor 多线程:多个前台接待员,多个服务生

2.3 netty线程模型

netty线程模型主要是依据主从Reactor多线程

  • BossGroup中的NioEventLoop就像MainReactor可以有多个,WorkerGroup中的NioEventLoop就像SubReactor一样可以有多个。

  • Netty抽象出两组线程池,BossGroup专门负责接收客户端的连接,WorkerGroup专门负责网络的读写

  • BossGroup和WorkerGroup类型都是NioEventLoopGroup,NioEventLoopGroup相当于一个事件循环组,这个组中含有多个事件循环,每一个事件循环是NioEventLoop(NioEventLoopGroup可以有多个线程,即可以含有多个NioEventLoop)

  • NioEventLoop表示一个不断循环的执行处理任务的线程,每个 NioEventLoop都有一个Selector,用于监听绑定在其上的socket的网络通讯

  • 每个BossGroup下面的NioEventLoop循环执行的步骤有3步:
    1、轮询 accept 事件
    2、处理 accept 事件,与 client 建立连接,生成NioSocketChannel,并将其注册到某个WorkerGroup的NioEventLoop上的Selector
    3、继续处理任务队列的任务,即runAllTasks

  • 每个WorkerGroup下面的NioEventLoop循环执行的步骤有3步:
    1、轮询 read,write 事件
    2、处理 I/O 事件,即 read,write 事件,在对应NioSocketChannel处理
    3、处理任务队列的任务,即runAllTasks

  • 每个WorkerGroup的NioEventLoop处理业务时,会使用pipeline(管道),pipeline中包含了channel(通道),即通过pipeline可以获取到对应通道,每个通道中都有一个channelPipeline维护了很多的处理器channelhandler。

netty案例-TCP服务

服务端:
NettyServer

NettyServer代码

import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class NettyServer { public static void main(String[] args) throws Exception { //创建BossGroup 和 WorkerGroup //说明 //1. 创建两个线程组 bossGroup 和 workerGroup //2. bossGroup 只是处理连接请求 , 真正的和客户端业务处理,会交给 workerGroup完成 //3. 两个都是无限循环 //4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数 // 默认实际 cpu核数 * 2 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); //8 try { //创建服务器端的启动对象,配置参数 ServerBootstrap bootstrap = new ServerBootstrap(); //使用链式编程来进行设置 bootstrap.group(bossGroup, workerGroup) //设置两个线程组 .channel(NioServerSocketChannel.class) //使用NioSocketChannel 作为服务器的通道实现 .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列等待连接个数 .childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态 // .handler(null) // 该 handler对应 bossGroup , childHandler 对应 workerGroup .childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象) //给pipeline 设置处理器 @Override protected void initChannel(SocketChannel ch) throws Exception { System.out.println("客户socketchannel hashcode=" + ch.hashCode()); //可以使用一个集合管理 SocketChannel, 再推送消息时,可以将业务加入到各个channel 对应的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueue ch.pipeline().addLast(new NettyServerHandler()); } }); // 给我们的workerGroup 的 EventLoop 对应的管道设置处理器 System.out.println(".....服务器 is ready..."); //绑定一个端口并且同步生成了一个 ChannelFuture 对象(也就是立马返回这样一个对象) //启动服务器(并绑定端口) ChannelFuture cf = bootstrap.bind(6668).sync(); //给cf 注册监听器,监控我们关心的事件 cf.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (cf.isSuccess()) { System.out.println("监听端口 6668 成功"); } else { System.out.println("监听端口 6668 失败"); } } }); //对关闭通道事件 进行监听 cf.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }


NettyServerHandler

NettyServerHandler代码

import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelPipeline; import io.netty.util.CharsetUtil; import java.util.concurrent.TimeUnit; /* 说明 1. 我们自定义一个Handler 需要继承netty 规定好的某个HandlerAdapter(规范) 2. 这时我们自定义一个Handler , 才能称为一个handler */ public class NettyServerHandler extends ChannelInboundHandlerAdapter { //读取数据事件(这里我们可以读取客户端发送的消息) /* 1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址 2. Object msg: 就是客户端发送的数据 默认Object */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("服务器读取线程 " + Thread.currentThread().getName() + " channle =" + ctx.channel()); System.out.println("server ctx =" + ctx); System.out.println("看看channel 和 pipeline的关系"); Channel channel = ctx.channel(); ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链表 //将 msg 转成一个 ByteBuf //ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer. ByteBuf buf = (ByteBuf) msg; System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("客户端地址:" + channel.remoteAddress()); } //数据读取完毕 @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //writeAndFlush 是 write + flush //将数据写入到缓存,并刷新 //一般讲,我们对这个发送的数据进行编码 ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8)); } //发生异常后, 一般是需要关闭通道 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }


客户端:
NettyClient

Netty通信中如何实现高效的数据传输?

NettyClient代码

import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class NettyClient { public static void main(String[] args) throws Exception { //客户端需要一个事件循环组 EventLoopGroup group = new NioEventLoopGroup(); try { //创建客户端启动对象 //注意客户端使用的不是 ServerBootstrap 而是 Bootstrap Bootstrap bootstrap = new Bootstrap(); //设置相关参数 bootstrap.group(group) //设置线程组 .channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new NettyClientHandler()); //加入自己的处理器 } }); System.out.println("客户端 ok.."); //启动客户端去连接服务器端 //关于 ChannelFuture 要分析,涉及到netty的异步模型 ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync(); //对关闭通道事件 进行监听 channelFuture.channel().closeFuture().sync(); }finally { group.shutdownGracefully(); } } }


NettyClientHandler

NettyClientHandler代码

import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; public class NettyClientHandler extends ChannelInboundHandlerAdapter { //当通道就绪就会触发该方法 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("client " + ctx); ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server: (>^ω^<)喵", CharsetUtil.UTF_8)); } //当通道有读取事件时,会触发 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("服务器的地址: "+ ctx.channel().remoteAddress()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }


netty中的参数组件 1、Bootstrap、ServerBootstrap

Bootstrap 意思是引导,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件,Netty 中 Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类。

2、Future、ChannelFuture

Netty 中所有的 IO 操作都是异步的,不能立刻得知消息是否被正确处理。但是可以立即返回一个ChannelFuture,它可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件

  • Channel channel(),返回当前正在进行 IO 操作的通道
  • ChannelFuture sync(),等待异步操作执行完毕,同步执行注册的监听事件

Future-Listener 机制
当Future对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture来获取操作执行的状态,注册监听函数来执行完成后的操作。

  • 通过 isDone 方法来判断当前操作是否完成;
  • 通过 isSuccess 方法来判断已完成的当前操作是否成功;
  • 通过 getCause 方法来获取已完成的当前操作失败的原因;
  • 通过 isCancelled 方法来判断已完成的当前操作是否被取消;
  • 通过 addListener 方法来注册监听器,当操作已完成(isDone方法返回完成),将会通知指定的监听器;
3、Channel
  • Netty 网络通信的组件,能够用于执行网络 I/O 操作。
  • Netty 网络通信的组件,能够用于执行网络 I/O 操作。
  • Channel 提供异步的网络 I/O 操作(如建立连接,读写,绑定端口),异步调用意味着任何 I/O 调用都将立即返回一个ChannelFuture,并且不保证在调用结束时所请求的 I/O 操作已完(后期通过ChannelFuture的方法查看异步执行结果)
  • 不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应
    NioSocketChannel,异步的客户端 TCP Socket 连接。
    NioServerSocketChannel,异步的服务器端 TCP Socket 连接。
    NioDatagramChannel,异步的 UDP 连接。
4、Selector
  • Netty 基于 Selector 对象实现 I/O 多路复用,通过 Selector 一个线程可以监听多个连接的 Channel 事件。
  • 当向一个 Selector 中注册 Channel 后,Selector 内部的机制就可以自动不断地查询(Select)这些注册的 Channel 是否有已就绪的 I/O 事件(例如可读,可写,网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个 Channel
5、ChannelHandler 及其实现类
  • ChannelHandler 是一个接口,处理 I/O 事件或拦截 I/O 操作,并将其转发到其 ChannelPipeline(业务处理链)中的下一个处理程序。
  • ChannelHandler 本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方便使用期间,可以继承它的子类
  • 当自定义一个handler类处理器时,需要继承ChannelhandlerAdapter
6、Pipeline 和 ChannelPipeline
  • Pipeline中包含了多个Channel(通道)
  • 一个Channel包含了一个ChannelPipeline,而ChannePipeline中又维护了一个由ChannelHandlerContext组成的双向链表,并且每个channeHandlerContext中又关联着一个channelHandler
  • ChannelPipeline是保存ChannelHandler的List,用于处理或拦截Channel 的入站事件和出站事件操作
  • 入站事件和出站事件在一个双向链表中,入站事件会从链表head往后传递到最后一个入站的 handler,出站事件会从链表tail往前传递到最前t个出站的handler, c两种类型的handler互不干扰
  • ChannelPipeline实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式

ChannelPipeline addFirst(ChannelHandler... handlers),把一个业务处理类(handler)添加到链中的第一个位置
ChannelPipeline addLast(ChannelHandler... handlers),把一个业务处理类(handler)添加到链中的最后一个位置

7、ChannelHandlerContext
  • 保存 Channel 相关的所有上下文信息,同时关联一个 ChannelHandler 对象
  • 即ChannelHandlerContext中包含一个具体的事件处理器ChannelHandler,同时ChannelHandlerContext中也绑定了对应的pipeline 和 Channel 的信息,方便对ChannelHandler进行调用。

ChannelHandlerContext的常用方法:
1、ChannelFuture close(),关闭通道
2、ChannelOutboundInvoker flush(),刷新
3、ChannelFuture writeAndFlush(Object msg),将数据写到ChannelPipeline 中当前 ChannelHandler 的下一个ChannelHandler 开始处理(出站)

8、ChannelOption

Netty在创建Channel实例后,一般都需要设置 ChannelOption 参数

9、EventLoopGroup 和其实现类 NioEventLoopGroup
  • EventLoopGroup 是一组 EventLoop 的抽象,Netty 为了更好的利用多核 CPU 资源,一般会有多个 EventLoop 同时工作,每个 EventLoop 维护着一个 Selector 实例。
  • EventLoopGroup 提供 next 接口,可以从组里面按照一定规则获取其中一个 EventLoop 来处理任务。在 Netty 服务器端编程中,我们一般都需要提供两个 EventLoopGroup,例如:BossEventLoopGroup 和 WorkerEventLoopGroup。
  • BossEventLoop负责接收客户端的连接并将SocketChannel注册到 WorkerEventLoopGroup的其中一个workerEventLoop的selector上并进行后续的IO事件处理
10、Unpooled类
  • Netty提供一个专门用来操作缓冲区(即 Netty 的数据容器)的工具类

ByteBuf buffer = Unpooled.buffer(10);
ByteBuf byteBuf = Unpooled.copiedBuffer("hello,world!", Charset.forName("utf-8"));

netty的案例-群聊系统

GroupChatServer

GroupChatServer代码

import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class GroupChatServer { private int port; //监听端口 public GroupChatServer(int port) { this.port = port; } //编写run方法,处理客户端的请求 public void run() throws Exception{ //创建两个线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //获取到pipeline ChannelPipeline pipeline = ch.pipeline(); //向pipeline加入解码器 pipeline.addLast("decoder", new StringDecoder()); //向pipeline加入编码器 pipeline.addLast("encoder", new StringEncoder()); //加入自己的业务处理handler pipeline.addLast(new GroupChatServerHandler()); } }); System.out.println("netty 服务器启动"); ChannelFuture channelFuture = b.bind(port).sync(); //监听关闭 channelFuture.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new GroupChatServer(7000).run(); } }


GroupChatServerHandler

GroupChatServerHandler代码

import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> { //这样写还要自己遍历Channel //public static List<Channel> channels = new ArrayList<Channel>(); //使用一个hashmap 管理私聊(私聊本案例并未实现,只是提供个思路) //public static Map<String, Channel> channels = new HashMap<String,Channel>(); //定义一个channle 组,管理所有的channel //GlobalEventExecutor.INSTANCE) 是全局的事件执行器,是一个单例 private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //handlerAdded 表示连接建立,一旦连接,第一个被执行 //将当前channel 加入到 channelGroup @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); //将该客户加入聊天的信息推送给其它在线的客户端 //该方法会将 channelGroup 中所有的channel 遍历,并发送消息,我们不需要自己遍历 channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 加入聊天" + sdf.format(new java.util.Date()) + " \n"); channelGroup.add(channel); //私聊如何实现 // channels.put("userid100",channel); } //断开连接, 将xx客户离开信息推送给当前在线的客户 @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 离开了\n"); System.out.println("channelGroup size" + channelGroup.size()); } //表示channel 处于活动状态, 提示 xx上线 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //这个是给服务端看的,客户端上面已经提示xxx加入群聊了 System.out.println(ctx.channel().remoteAddress() + " 上线了~"); } //表示channel 处于不活动状态, 提示 xx离线了 @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println(ctx.channel().remoteAddress() + " 离线了~"); } //读取数据,转发给在线的每一个客户端 @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { //获取到当前channel Channel channel = ctx.channel(); //这时我们遍历channelGroup, 根据不同的情况,回送不同的消息 channelGroup.forEach(ch -> { if(channel != ch) { //不是当前的channel,转发消息 ch.writeAndFlush("[客户]" + channel.remoteAddress() + " 发送了消息" + msg + "\n"); }else {//回显自己发送的消息给自己 ch.writeAndFlush("[自己]发送了消息" + msg + "\n"); } }); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //关闭通道 ctx.close(); } }


GroupChatClient

GroupChatClient代码

import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.util.Scanner; public class GroupChatClient { //属性 private final String host; private final int port; public GroupChatClient(String host, int port) { this.host = host; this.port = port; } public void run() throws Exception{ EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //得到pipeline ChannelPipeline pipeline = ch.pipeline(); //加入相关handler pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); //加入自定义的handler pipeline.addLast(new GroupChatClientHandler()); } }); ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); //得到channel Channel channel = channelFuture.channel(); System.out.println("-------" + channel.localAddress()+ "--------"); //客户端需要输入信息,创建一个扫描器 Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()) { String msg = scanner.nextLine(); //通过channel 发送到服务器端 channel.writeAndFlush(msg + "\r\n"); } }finally { group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new GroupChatClient("127.0.0.1", 7000).run(); } }


GroupChatClientHandler

GroupChatClientHandler代码

import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> { //从服务器拿到的数据 @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(msg.trim()); } }

netty的心跳检测机制

IdleStateHandler是netty 提供的处理空闲状态的处理器(放在ChannelPipeline维护的ChannelHandler的双向链表上):

  • long readerIdleTime : 表示多长时间没有读, 就会发送一个心跳检测包检测是否连接
  • long writerIdleTime : 表示多长时间没有写, 就会发送一个心跳检测包检测是否连接
  • long allIdleTime : 表示多长时间没有读写, 就会发送一个心跳检测包检测是否连接
netty的编解码器

编解码器放在ChannelPipeline维护的ChannelHandler的双向链表上
服务端发数据给客户端:服务端—>出站(编码)—>Socket通道—>入站(解码)—>客户端
客户端发数据给服务端:客户端—>出站(编码)—>Socket通道—>入站(解码)—>服务端

编解码器(自定义编解码器时需要继承下面的其中一个类):

  • ByteToMessageDecoder
  • LineBasedFrameDecoder:这个类在 Netty 内部也有使用,它使用行尾控制字符(\n或者\r\n)作为分隔符来解析数据。
  • DelimiterBasedFrameDecoder:使用自定义的特殊字符作为消息的分隔符。
  • HttpObjectDecoder:一个 HTTP 数据的解码器
  • LengthFieldBasedFrameDecoder:通过指定长度来标识整包消息,这样就可以自动的处理黏包和半包消息。
TCP 粘包和拆包及解决方案

TCP粘包和拆包
使用优化方法(Nagle 算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样做虽然提高了效率,但是接收端就难于分辨出完整的数据包了,出现了粘包和拆包的问题,因为面向流的通信是无消息保护边界的

  • 服务端分两次读取到了两个独立的数据包,分别是 D1 和 D2,没有粘包和拆包
  • 服务端一次接受到了两个数据包,D1 和 D2 粘合在一起,称之为 TCP 粘包
  • 服务端分两次读取到了数据包,第一次读取到了完整的 D1 包和 D2 包的部分内容,第二次读取到了 D2 包的剩余内容,这称之为 TCP 拆包
  • 服务端分两次读取到了数据包,第一次读取到了 D1 包的部分内容 D1_1,第二次读取到了 D1 包的剩余部分内容 D1_2 和完整的 D2 包。

解决方案
使用自定义协议+编解码器来解决,关键就是要解决服务器端每次读取数据长度的问题,这个问题解决,就不会出现服务器多读或少读数据的问题,从而避免的 TCP 粘包、拆包。

RPC(基于netty)

1、RPC(Remote Procedure Call)远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程
2、两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样

RPC的调用流程图

  • 服务消费方(client)以本地调用方式调用服务
  • client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
  • client stub 将消息进行编码并发送到服务端
  • server stub 收到消息后进行解码
  • server stub 根据解码结果调用本地的服务
  • 本地服务执行并将结果返回给 server stub
  • server stub 将返回导入结果进行编码并发送至消费方
  • client stub 接收到消息并进行解码
  • 服务消费方(client)得到结果

RPC 的目标就是将 2 - 8 这些步骤都封装起来,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用