SpringBoot使用Netty實現遠程調用的示例
前言
眾所周知我們在進行網絡連接的時候,建立套接字連接是一個非常消耗性能的事情,特別是在分布式的情況下,用線程池去保持多個客戶端連接,是一種非常消耗線程的行為。那么我們該通過什么技術去解決上述的問題呢,那么就不得不提一個網絡連接的利器——Netty.
正文 Netty
Netty是一個NIO客戶端服務器框架:
它可快速輕松地開發網絡應用程序,例如協議服務器和客戶端。 它極大地簡化和簡化了網絡編程,例如TCP和UDP套接字服務器。NIO是一種非阻塞IO ,它具有以下的特點
單線程可以連接多個客戶端。 選擇器可以實現但線程管理多個Channel,新建的通道都要向選擇器注冊。 一個SelectionKey鍵表示了一個特定的通道對象和一個特定的選擇器對象之間的注冊關系。 selector進行select()操作可能會產生阻塞,但是可以設置阻塞時間,并且可以用wakeup()喚醒selector,所以NIO是非阻塞IO。Netty模型selector模式
它相對普通NIO的在性能上有了提升,采用了:
NIO采用多線程的方式可以同時使用多個selector 通過綁定多個端口的方式,使得一個selector可以同時注冊多個ServerSocketServer 單個線程下只能有一個selector,用來實現Channel的匹配及復用半包問題
TCP/IP在發送消息的時候,可能會拆包,這就導致接收端無法知道什么時候收到的數據是一個完整的數據。在傳統的BIO中在讀取不到數據時會發生阻塞,但是NIO不會。為了解決NIO的半包問題,Netty在Selector模型的基礎上,提出了reactor模式,從而解決客戶端請求在服務端不完整的問題。
netty模型reactor模式
在selector的基礎上解決了半包問題。
上圖,簡單地可以描述為'boss接活,讓work干':manReactor用來接收請求(會與客戶端進行握手驗證),而subReactor用來處理請求(不與客戶端直接連接)。
SpringBoot使用Netty實現遠程調用
maven依賴
<!--lombok--><dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.2</version> <optional>true</optional></dependency><!--netty--><dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.17.Final</version></dependency>
服務端部分
NettyServer.java:服務啟動監聽器
@Slf4jpublic class NettyServer { public void start() { InetSocketAddress socketAddress = new InetSocketAddress('127.0.0.1', 8082); //new 一個主線程組 EventLoopGroup bossGroup = new NioEventLoopGroup(1); //new 一個工作線程組 EventLoopGroup workGroup = new NioEventLoopGroup(200); ServerBootstrap bootstrap = new ServerBootstrap().group(bossGroup, workGroup).channel(NioServerSocketChannel.class).childHandler(new ServerChannelInitializer()).localAddress(socketAddress)//設置隊列大小.option(ChannelOption.SO_BACKLOG, 1024)// 兩小時內沒有數據的通信時,TCP會自動發送一個活動探測數據報文.childOption(ChannelOption.SO_KEEPALIVE, true); //綁定端口,開始接收進來的連接 try { ChannelFuture future = bootstrap.bind(socketAddress).sync(); log.info('服務器啟動開始監聽端口: {}', socketAddress.getPort()); future.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error('服務器開啟失敗', e); } finally { //關閉主線程組 bossGroup.shutdownGracefully(); //關閉工作線程組 workGroup.shutdownGracefully(); } }}
ServerChannelInitializer.java:netty服務初始化器
/*** netty服務初始化器**/public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //添加編解碼 socketChannel.pipeline().addLast('decoder', new StringDecoder(CharsetUtil.UTF_8)); socketChannel.pipeline().addLast('encoder', new StringEncoder(CharsetUtil.UTF_8)); socketChannel.pipeline().addLast(new NettyServerHandler()); }}
NettyServerHandler.java:netty服務端處理器
/*** netty服務端處理器**/@Slf4jpublic class NettyServerHandler extends ChannelInboundHandlerAdapter { /** * 客戶端連接會觸發 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info('Channel active......'); } /** * 客戶端發消息會觸發 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info('服務器收到消息: {}', msg.toString()); ctx.write('你也好哦'); ctx.flush(); } /** * 發生異常觸發 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}
RpcServerApp.java:SpringBoot啟動類
/*** 啟動類**/@Slf4j@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})public class RpcServerApp extends SpringBootServletInitializer { @Override protected SpringApplicationBuilder configure(SpringApplicationBuilder application) { return application.sources(RpcServerApp.class); } /** * 項目的啟動方法 * * @param args */ public static void main(String[] args) { SpringApplication.run(RpcServerApp.class, args); //開啟Netty服務 NettyServer nettyServer =new NettyServer (); nettyServer.start(); log.info('======服務已經啟動========'); }}
客戶端部分
NettyClientUtil.java:NettyClient工具類
/*** Netty客戶端**/@Slf4jpublic class NettyClientUtil { public static ResponseResult helloNetty(String msg) { NettyClientHandler nettyClientHandler = new NettyClientHandler(); EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap().group(group)//該參數的作用就是禁止使用Nagle算法,使用于小數據即時傳輸.option(ChannelOption.TCP_NODELAY, true).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast('decoder', new StringDecoder()); socketChannel.pipeline().addLast('encoder', new StringEncoder()); socketChannel.pipeline().addLast(nettyClientHandler); }}); try { ChannelFuture future = bootstrap.connect('127.0.0.1', 8082).sync(); log.info('客戶端發送成功....'); //發送消息 future.channel().writeAndFlush(msg); // 等待連接被關閉 future.channel().closeFuture().sync(); return nettyClientHandler.getResponseResult(); } catch (Exception e) { log.error('客戶端Netty失敗', e); throw new BusinessException(CouponTypeEnum.OPERATE_ERROR); } finally { //以一種優雅的方式進行線程退出 group.shutdownGracefully(); } }}
NettyClientHandler.java:客戶端處理器
/*** 客戶端處理器**/@Slf4j@Setter@Getterpublic class NettyClientHandler extends ChannelInboundHandlerAdapter { private ResponseResult responseResult; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info('客戶端Active .....'); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info('客戶端收到消息: {}', msg.toString()); this.responseResult = ResponseResult.success(msg.toString(), CouponTypeEnum.OPERATE_SUCCESS.getCouponTypeDesc()); ctx.close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}
驗證
測試接口
@RestController@Slf4jpublic class UserController { @PostMapping('/helloNetty') @MethodLogPrint public ResponseResult helloNetty(@RequestParam String msg) { return NettyClientUtil.helloNetty(msg); }}
訪問測試接口
服務端打印信息
客戶端打印信息
源碼
項目源碼可從的我的github中獲取:github源碼地址
到此這篇關于SpringBoot使用Netty實現遠程調用的示例的文章就介紹到這了,更多相關SpringBoot Netty遠程調用內容請搜索好吧啦網以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持好吧啦網!
相關文章:
