第498集Netty框架
|字数总计:3.2k|阅读时长:15分钟|阅读量:
Netty框架
1. 概述
1.1 Netty的重要性
Netty是一个高性能、异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端,是Java网络编程的首选框架。
本文内容:
- Netty基础:核心概念和特性
- 核心组件:Channel、EventLoop、Pipeline
- 编解码器:编解码器开发
- 事件处理:事件处理机制
- 性能优化:性能优化技巧
- 实战案例:Netty实战应用
1.2 本文内容结构
本文将从以下几个方面深入探讨Netty:
- Netty基础:核心概念和快速开始
- 核心组件:Channel、EventLoop、Pipeline
- 编解码器:编解码器开发
- 事件处理:事件处理机制
- 性能优化:性能优化技巧
- 实战案例:Netty实战应用
2. Netty基础
2.1 Netty特性
2.1.1 核心特性
Netty核心特性:
- 高性能:基于NIO,性能优异
- 异步非阻塞:事件驱动,异步处理
- 易用性:API简单,易于使用
- 稳定性:久经考验,稳定可靠
- 功能丰富:支持多种协议
Netty优势:
- 高性能:比传统BIO性能高数倍
- 易用性:简化网络编程
- 可扩展:支持自定义协议
- 社区活跃:社区支持完善
2.2 快速开始
2.2.1 Maven依赖
Maven依赖:
1 2 3 4 5
| <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.90.Final</version> </dependency>
|
Netty服务器示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
| import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder;
public class NettyServer { private static final int PORT = 8888; public static void main(String[] args) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(new ServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture future = bootstrap.bind(PORT).sync(); System.out.println("Netty Server started on port " + PORT); future.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
class ServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { String message = (String) msg; System.out.println("Received: " + message); ctx.writeAndFlush("Echo: " + message); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("Client connected: " + ctx.channel().remoteAddress()); } @Override public void channelInactive(ChannelHandlerContext ctx) { System.out.println("Client disconnected: " + ctx.channel().remoteAddress()); } }
|
Netty客户端示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder;
public class NettyClient { private static final String HOST = "localhost"; private static final int PORT = 8888; public static void main(String[] args) throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(new ClientHandler()); } }); ChannelFuture future = bootstrap.connect(HOST, PORT).sync(); System.out.println("Connected to server"); Channel channel = future.channel(); channel.writeAndFlush("Hello Netty"); channel.closeFuture().sync(); } finally { group.shutdownGracefully(); } } }
class ClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { String message = (String) msg; System.out.println("Server: " + message); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
|
3. 核心组件
3.1 Channel
3.1.1 Channel概念
Channel:网络连接的抽象,代表一个Socket连接。
Channel类型:
NioServerSocketChannel:服务器Socket通道
NioSocketChannel:客户端Socket通道
NioDatagramChannel:UDP通道
Channel操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| Channel channel = ...;
channel.write("Hello");
channel.writeAndFlush("Hello");
channel.close();
SocketAddress remoteAddress = channel.remoteAddress();
SocketAddress localAddress = channel.localAddress();
boolean active = channel.isActive();
boolean writable = channel.isWritable();
|
3.2 EventLoop
3.2.1 EventLoop概念
EventLoop:事件循环,处理Channel的IO事件。
EventLoopGroup:管理多个EventLoop。
EventLoop使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| EventLoopGroup group = new NioEventLoopGroup(4);
EventLoop eventLoop = group.next();
eventLoop.execute(() -> { System.out.println("Task executed in EventLoop"); });
eventLoop.schedule(() -> { System.out.println("Scheduled task"); }, 5, TimeUnit.SECONDS);
|
3.3 Pipeline
3.3.1 Pipeline概念
Pipeline:处理器链,包含多个ChannelHandler。
Pipeline结构:
1 2 3 4
| Inbound Handlers Outbound Handlers ↓ ↑ [Handler1] → [Handler2] → [Handler3] → [Handler4] ↓ ↑
|
Pipeline使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("handler", new BusinessHandler());
pipeline.remove("decoder");
ChannelHandler handler = pipeline.get("handler");
pipeline.replace("handler", "newHandler", new NewHandler());
|
3.4 ChannelHandler
3.4.1 Handler类型
ChannelHandler类型:
ChannelInboundHandler:处理入站事件
ChannelOutboundHandler:处理出站事件
InboundHandler示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public class InboundHandler extends ChannelInboundHandlerAdapter { @Override public void channelRegistered(ChannelHandlerContext ctx) { System.out.println("Channel registered"); } @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("Channel active"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println("Channel read: " + msg); ctx.fireChannelRead(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { System.out.println("Channel read complete"); } @Override public void channelInactive(ChannelHandlerContext ctx) { System.out.println("Channel inactive"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
|
OutboundHandler示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public class OutboundHandler extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { System.out.println("Writing: " + msg); ctx.write(msg, promise); } @Override public void flush(ChannelHandlerContext ctx) { System.out.println("Flushing"); ctx.flush(); } }
|
4. 编解码器
4.1 内置编解码器
4.1.1 常用编解码器
Netty内置编解码器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder());
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)); pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer("\n".getBytes())));
pipeline.addLast(new LineBasedFrameDecoder(1024));
pipeline.addLast(new FixedLengthFrameDecoder(100));
|
4.2 自定义编解码器
4.2.1 自定义解码器
自定义解码器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public class CustomDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { if (in.readableBytes() < 4) { return; } int length = in.readInt(); if (in.readableBytes() < length) { in.resetReaderIndex(); return; } byte[] data = new byte[length]; in.readBytes(data); String message = new String(data); out.add(message); } }
|
自定义编码器:
1 2 3 4 5 6 7 8 9
| public class CustomEncoder extends MessageToByteEncoder<String> { @Override protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) { byte[] data = msg.getBytes(); out.writeInt(data.length); out.writeBytes(data); } }
|
4.3 编解码器组合
4.3.1 编解码器链
编解码器链示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder( 1024, 0, 4, 0, 4 ));
pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast(new CustomDecoder());
pipeline.addLast(new CustomEncoder());
pipeline.addLast(new BusinessHandler());
|
5. 事件处理
5.1 事件类型
5.1.1 入站事件
入站事件:
channelRegistered:Channel注册到EventLoop
channelActive:Channel激活
channelRead:读取数据
channelReadComplete:读取完成
channelInactive:Channel非激活
exceptionCaught:异常捕获
5.2 事件传播
5.2.1 事件传播机制
事件传播:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public class EventHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println("Handler1: " + msg); ctx.fireChannelRead(msg); } }
|
5.3 异步处理
5.3.1 异步操作
异步操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| ChannelFuture future = channel.writeAndFlush("Hello");
future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { if (future.isSuccess()) { System.out.println("Write successful"); } else { System.out.println("Write failed: " + future.cause()); } } });
future.await();
|
6. 性能优化
6.1 线程模型优化
6.1.1 EventLoopGroup配置
线程模型优化:
1 2 3 4 5 6
| int bossThreads = 1; int workerThreads = Runtime.getRuntime().availableProcessors() * 2;
EventLoopGroup bossGroup = new NioEventLoopGroup(bossThreads); EventLoopGroup workerGroup = new NioEventLoopGroup(workerThreads);
|
6.2 内存管理
6.2.1 ByteBuf使用
ByteBuf使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| ByteBuf directBuf = Unpooled.directBuffer(1024);
ByteBuf heapBuf = Unpooled.buffer(1024);
directBuf.release();
try { ByteBuf buf = ...; } finally { ReferenceCountUtil.release(buf); }
|
6.3 零拷贝
6.3.1 零拷贝技术
零拷贝示例:
1 2 3 4 5 6 7
| FileRegion region = new DefaultFileRegion( file.getChannel(), 0, file.length() ); channel.writeAndFlush(region);
|
7. 实战案例
7.1 HTTP服务器
7.1.1 HTTP服务器实现
HTTP服务器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.*;
public class HttpServer { private static final int PORT = 8080; public static void main(String[] args) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new HttpObjectAggregator(65536)); pipeline.addLast(new HttpServerHandler()); } }); ChannelFuture future = bootstrap.bind(PORT).sync(); System.out.println("HTTP Server started on port " + PORT); future.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> { @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) { String uri = request.uri(); HttpMethod method = request.method(); System.out.println("Method: " + method + ", URI: " + uri); FullHttpResponse response = new DefaultFullHttpResponse( HttpVersion.HTTP_1_1, HttpResponseStatus.OK ); String content = "Hello from Netty HTTP Server"; response.content().writeBytes(content.getBytes()); response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain"); response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.length()); ctx.writeAndFlush(response); } }
|
7.2 WebSocket服务器
7.2.1 WebSocket实现
WebSocket服务器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
public class WebSocketServer { private static final int PORT = 8080; public static void main(String[] args) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new HttpObjectAggregator(65536)); pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); pipeline.addLast(new WebSocketHandler()); } }); ChannelFuture future = bootstrap.bind(PORT).sync(); System.out.println("WebSocket Server started on port " + PORT); future.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame frame) { String text = frame.text(); System.out.println("Received: " + text); ctx.writeAndFlush(new TextWebSocketFrame("Echo: " + text)); } @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("WebSocket client connected"); } @Override public void channelInactive(ChannelHandlerContext ctx) { System.out.println("WebSocket client disconnected"); } }
|
7.3 聊天室服务器
7.3.1 多客户端聊天
聊天室服务器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
| import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder;
public class ChatServer { private static final int PORT = 8888; private static final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); public static void main(String[] args) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(new ChatServerHandler()); } }); ChannelFuture future = bootstrap.bind(PORT).sync(); System.out.println("Chat Server started on port " + PORT); future.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } static class ChatServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) { channels.add(ctx.channel()); String message = "User " + ctx.channel().remoteAddress() + " joined"; channels.writeAndFlush(message); System.out.println(message); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { String message = (String) msg; String sender = ctx.channel().remoteAddress().toString(); String broadcast = "[" + sender + "] " + message; channels.writeAndFlush(broadcast); System.out.println(broadcast); } @Override public void channelInactive(ChannelHandlerContext ctx) { channels.remove(ctx.channel()); String message = "User " + ctx.channel().remoteAddress() + " left"; channels.writeAndFlush(message); System.out.println(message); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } } }
|
8. 总结
8.1 核心要点
- 高性能:基于NIO,性能优异
- 异步非阻塞:事件驱动,异步处理
- 易用性:API简单,易于使用
- 可扩展:支持自定义协议和编解码器
8.2 关键理解
- 事件驱动:基于事件驱动的异步模型
- Pipeline机制:处理器链式处理
- 零拷贝:支持零拷贝技术
- 内存管理:需要手动管理ByteBuf
8.3 最佳实践
- 合理配置线程数:根据CPU核心数配置
- 正确使用ByteBuf:及时释放,避免内存泄漏
- 使用编解码器:解决粘包拆包问题
- 异常处理:完善的异常处理机制
相关文章: