Netty框架

1. 概述

1.1 Netty的重要性

Netty是一个高性能、异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端,是Java网络编程的首选框架。

本文内容

  • Netty基础:核心概念和特性
  • 核心组件:Channel、EventLoop、Pipeline
  • 编解码器:编解码器开发
  • 事件处理:事件处理机制
  • 性能优化:性能优化技巧
  • 实战案例:Netty实战应用

1.2 本文内容结构

本文将从以下几个方面深入探讨Netty:

  1. Netty基础:核心概念和快速开始
  2. 核心组件:Channel、EventLoop、Pipeline
  3. 编解码器:编解码器开发
  4. 事件处理:事件处理机制
  5. 性能优化:性能优化技巧
  6. 实战案例:Netty实战应用

2. Netty基础

2.1 Netty特性

2.1.1 核心特性

Netty核心特性

  1. 高性能:基于NIO,性能优异
  2. 异步非阻塞:事件驱动,异步处理
  3. 易用性:API简单,易于使用
  4. 稳定性:久经考验,稳定可靠
  5. 功能丰富:支持多种协议

Netty优势

  • 高性能:比传统BIO性能高数倍
  • 易用性:简化网络编程
  • 可扩展:支持自定义协议
  • 社区活跃:社区支持完善

2.2 快速开始

2.2.1 Maven依赖

Maven依赖

1
2
3
4
5
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.90.Final</version>
</dependency>

Netty服务器示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class NettyServer {

private static final int PORT = 8888;

public static void main(String[] args) throws InterruptedException {
// 1. 创建EventLoopGroup
EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 接收连接
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 处理连接

try {
// 2. 创建ServerBootstrap
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 添加编解码器
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
// 添加业务处理器
pipeline.addLast(new ServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);

// 3. 绑定端口
ChannelFuture future = bootstrap.bind(PORT).sync();
System.out.println("Netty Server started on port " + PORT);

// 4. 等待服务器关闭
future.channel().closeFuture().sync();
} finally {
// 5. 关闭EventLoopGroup
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}

// 服务器处理器
class ServerHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String message = (String) msg;
System.out.println("Received: " + message);

// 回显
ctx.writeAndFlush("Echo: " + message);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("Client connected: " + ctx.channel().remoteAddress());
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
System.out.println("Client disconnected: " + ctx.channel().remoteAddress());
}
}

Netty客户端示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class NettyClient {

private static final String HOST = "localhost";
private static final int PORT = 8888;

public static void main(String[] args) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();

try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new ClientHandler());
}
});

// 连接服务器
ChannelFuture future = bootstrap.connect(HOST, PORT).sync();
System.out.println("Connected to server");

// 发送消息
Channel channel = future.channel();
channel.writeAndFlush("Hello Netty");

// 等待连接关闭
channel.closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}

// 客户端处理器
class ClientHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String message = (String) msg;
System.out.println("Server: " + message);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

3. 核心组件

3.1 Channel

3.1.1 Channel概念

Channel:网络连接的抽象,代表一个Socket连接。

Channel类型

  • NioServerSocketChannel:服务器Socket通道
  • NioSocketChannel:客户端Socket通道
  • NioDatagramChannel:UDP通道

Channel操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Channel操作
Channel channel = ...;

// 写入数据
channel.write("Hello");

// 刷新并写入
channel.writeAndFlush("Hello");

// 关闭连接
channel.close();

// 获取远程地址
SocketAddress remoteAddress = channel.remoteAddress();

// 获取本地地址
SocketAddress localAddress = channel.localAddress();

// 判断是否活跃
boolean active = channel.isActive();

// 判断是否可写
boolean writable = channel.isWritable();

3.2 EventLoop

3.2.1 EventLoop概念

EventLoop:事件循环,处理Channel的IO事件。

EventLoopGroup:管理多个EventLoop。

EventLoop使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 创建EventLoopGroup
EventLoopGroup group = new NioEventLoopGroup(4); // 4个线程

// 获取EventLoop
EventLoop eventLoop = group.next();

// 在EventLoop中执行任务
eventLoop.execute(() -> {
System.out.println("Task executed in EventLoop");
});

// 调度任务
eventLoop.schedule(() -> {
System.out.println("Scheduled task");
}, 5, TimeUnit.SECONDS);

3.3 Pipeline

3.3.1 Pipeline概念

Pipeline:处理器链,包含多个ChannelHandler。

Pipeline结构

1
2
3
4
Inbound Handlers          Outbound Handlers
↓ ↑
[Handler1] → [Handler2] → [Handler3] → [Handler4]
↓ ↑

Pipeline使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ChannelPipeline pipeline = channel.pipeline();

// 添加处理器
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", new BusinessHandler());

// 移除处理器
pipeline.remove("decoder");

// 获取处理器
ChannelHandler handler = pipeline.get("handler");

// 替换处理器
pipeline.replace("handler", "newHandler", new NewHandler());

3.4 ChannelHandler

3.4.1 Handler类型

ChannelHandler类型

  • ChannelInboundHandler:处理入站事件
  • ChannelOutboundHandler:处理出站事件

InboundHandler示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class InboundHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRegistered(ChannelHandlerContext ctx) {
System.out.println("Channel registered");
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("Channel active");
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("Channel read: " + msg);
ctx.fireChannelRead(msg); // 传递给下一个处理器
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
System.out.println("Channel read complete");
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
System.out.println("Channel inactive");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

OutboundHandler示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class OutboundHandler extends ChannelOutboundHandlerAdapter {

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
System.out.println("Writing: " + msg);
ctx.write(msg, promise); // 传递给下一个处理器
}

@Override
public void flush(ChannelHandlerContext ctx) {
System.out.println("Flushing");
ctx.flush();
}
}

4. 编解码器

4.1 内置编解码器

4.1.1 常用编解码器

Netty内置编解码器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 字符串编解码器
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());

// 长度字段编解码器
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));

// 分隔符编解码器
pipeline.addLast(new DelimiterBasedFrameDecoder(1024,
Unpooled.copiedBuffer("\n".getBytes())));

// 行编解码器
pipeline.addLast(new LineBasedFrameDecoder(1024));

// 固定长度编解码器
pipeline.addLast(new FixedLengthFrameDecoder(100));

4.2 自定义编解码器

4.2.1 自定义解码器

自定义解码器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class CustomDecoder extends ByteToMessageDecoder {

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
// 检查数据长度
if (in.readableBytes() < 4) {
return; // 数据不足,等待
}

// 读取长度字段
int length = in.readInt();
if (in.readableBytes() < length) {
in.resetReaderIndex(); // 重置读指针
return; // 数据不足,等待
}

// 读取数据
byte[] data = new byte[length];
in.readBytes(data);
String message = new String(data);
out.add(message);
}
}

自定义编码器

1
2
3
4
5
6
7
8
9
public class CustomEncoder extends MessageToByteEncoder<String> {

@Override
protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) {
byte[] data = msg.getBytes();
out.writeInt(data.length); // 写入长度
out.writeBytes(data); // 写入数据
}
}

4.3 编解码器组合

4.3.1 编解码器链

编解码器链示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
ChannelPipeline pipeline = ch.pipeline();

// 1. 长度字段解码器(解决粘包拆包)
pipeline.addLast(new LengthFieldBasedFrameDecoder(
1024, // 最大帧长度
0, // 长度字段偏移量
4, // 长度字段长度
0, // 长度调整值
4 // 跳过的字节数
));

// 2. 长度字段编码器
pipeline.addLast(new LengthFieldPrepender(4));

// 3. 自定义解码器
pipeline.addLast(new CustomDecoder());

// 4. 自定义编码器
pipeline.addLast(new CustomEncoder());

// 5. 业务处理器
pipeline.addLast(new BusinessHandler());

5. 事件处理

5.1 事件类型

5.1.1 入站事件

入站事件

  • channelRegistered:Channel注册到EventLoop
  • channelActive:Channel激活
  • channelRead:读取数据
  • channelReadComplete:读取完成
  • channelInactive:Channel非激活
  • exceptionCaught:异常捕获

5.2 事件传播

5.2.1 事件传播机制

事件传播

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class EventHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 处理消息
System.out.println("Handler1: " + msg);

// 传递给下一个处理器
ctx.fireChannelRead(msg);

// 或者不传递,终止传播
// 不调用ctx.fireChannelRead(msg)
}
}

5.3 异步处理

5.3.1 异步操作

异步操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 异步写入
ChannelFuture future = channel.writeAndFlush("Hello");

// 添加监听器
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
System.out.println("Write successful");
} else {
System.out.println("Write failed: " + future.cause());
}
}
});

// 等待完成
future.await();

6. 性能优化

6.1 线程模型优化

6.1.1 EventLoopGroup配置

线程模型优化

1
2
3
4
5
6
// 合理设置线程数
int bossThreads = 1; // 通常为1
int workerThreads = Runtime.getRuntime().availableProcessors() * 2;

EventLoopGroup bossGroup = new NioEventLoopGroup(bossThreads);
EventLoopGroup workerGroup = new NioEventLoopGroup(workerThreads);

6.2 内存管理

6.2.1 ByteBuf使用

ByteBuf使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 直接内存(性能更好,但需要手动释放)
ByteBuf directBuf = Unpooled.directBuffer(1024);

// 堆内存(GC管理,但性能稍差)
ByteBuf heapBuf = Unpooled.buffer(1024);

// 使用后释放
directBuf.release();

// 或者使用try-with-resources(如果实现了ReferenceCounted)
try {
ByteBuf buf = ...;
// 使用buf
} finally {
ReferenceCountUtil.release(buf);
}

6.3 零拷贝

6.3.1 零拷贝技术

零拷贝示例

1
2
3
4
5
6
7
// 使用FileRegion实现零拷贝文件传输
FileRegion region = new DefaultFileRegion(
file.getChannel(),
0,
file.length()
);
channel.writeAndFlush(region);

7. 实战案例

7.1 HTTP服务器

7.1.1 HTTP服务器实现

HTTP服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;

public class HttpServer {

private static final int PORT = 8080;

public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new HttpServerHandler());
}
});

ChannelFuture future = bootstrap.bind(PORT).sync();
System.out.println("HTTP Server started on port " + PORT);
future.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}

class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {

@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {
String uri = request.uri();
HttpMethod method = request.method();

System.out.println("Method: " + method + ", URI: " + uri);

// 构建响应
FullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
HttpResponseStatus.OK
);

String content = "Hello from Netty HTTP Server";
response.content().writeBytes(content.getBytes());
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.length());

ctx.writeAndFlush(response);
}
}

7.2 WebSocket服务器

7.2.1 WebSocket实现

WebSocket服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;

public class WebSocketServer {

private static final int PORT = 8080;

public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast(new WebSocketHandler());
}
});

ChannelFuture future = bootstrap.bind(PORT).sync();
System.out.println("WebSocket Server started on port " + PORT);
future.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}

class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame frame) {
String text = frame.text();
System.out.println("Received: " + text);

// 回显
ctx.writeAndFlush(new TextWebSocketFrame("Echo: " + text));
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("WebSocket client connected");
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
System.out.println("WebSocket client disconnected");
}
}

7.3 聊天室服务器

7.3.1 多客户端聊天

聊天室服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class ChatServer {

private static final int PORT = 8888;
private static final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new ChatServerHandler());
}
});

ChannelFuture future = bootstrap.bind(PORT).sync();
System.out.println("Chat Server started on port " + PORT);
future.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}

static class ChatServerHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelActive(ChannelHandlerContext ctx) {
channels.add(ctx.channel());
String message = "User " + ctx.channel().remoteAddress() + " joined";
channels.writeAndFlush(message);
System.out.println(message);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String message = (String) msg;
String sender = ctx.channel().remoteAddress().toString();
String broadcast = "[" + sender + "] " + message;

// 广播消息给所有客户端
channels.writeAndFlush(broadcast);
System.out.println(broadcast);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
channels.remove(ctx.channel());
String message = "User " + ctx.channel().remoteAddress() + " left";
channels.writeAndFlush(message);
System.out.println(message);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
}

8. 总结

8.1 核心要点

  1. 高性能:基于NIO,性能优异
  2. 异步非阻塞:事件驱动,异步处理
  3. 易用性:API简单,易于使用
  4. 可扩展:支持自定义协议和编解码器

8.2 关键理解

  1. 事件驱动:基于事件驱动的异步模型
  2. Pipeline机制:处理器链式处理
  3. 零拷贝:支持零拷贝技术
  4. 内存管理:需要手动管理ByteBuf

8.3 最佳实践

  1. 合理配置线程数:根据CPU核心数配置
  2. 正确使用ByteBuf:及时释放,避免内存泄漏
  3. 使用编解码器:解决粘包拆包问题
  4. 异常处理:完善的异常处理机制

相关文章