package bhz.netty.runtime; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.ReadTimeoutHandler; import java.util.concurrent.TimeUnit; /** * Best Do It */ public class Client { private static class SingletonHolder { static final Client instance = new Client(); } public static Client getInstance(){ return SingletonHolder.instance; } private EventLoopGroup group; private Bootstrap b; private ChannelFuture cf ; private Client(){ group = new NioEventLoopGroup(); b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); //超时handler(当服务器端与客户端在指定时间以上没有任何进行通信,则会关闭响应的通道,主要为减小服务端资源占用) //5秒要跟服务器设置的一致 sc.pipeline().addLast(new ReadTimeoutHandler(5)); sc.pipeline().addLast(new ClientHandler()); } }); } public void connect(){ try { this.cf = b.connect("127.0.0.1", 8765).sync(); System.out.println("远程服务器已经连接, 可以进行数据交换.."); } catch (Exception e) { e.printStackTrace(); } } public ChannelFuture getChannelFuture(){ if(this.cf == null){ this.connect(); } if(!this.cf.channel().isActive()){ this.connect(); } return this.cf; } public static void main(String[] args) throws Exception{ final Client c = Client.getInstance(); ChannelFuture cf = c.getChannelFuture(); for(int i = 1; i <= 3; i++ ){ Request request = new Request(); request.setId("" + i); request.setName("pro" + i); request.setRequestMessage("数据信息" + i); cf.channel().writeAndFlush(request); TimeUnit.SECONDS.sleep(4); } //超过五秒之后就断开连接 cf.channel().closeFuture().sync();//程序阻塞在这里,断开连接时才继续往下走 //断开之后,重连服务器再发送 new Thread(new Runnable() { @Override public void run() { try { System.out.println("进入子线程..."); //重连 ChannelFuture cf = c.getChannelFuture(); System.out.println(cf.channel().isActive()); System.out.println(cf.channel().isOpen()); //再次发送数据 Request request = new Request(); request.setId("" + 4); request.setName("pro" + 4); request.setRequestMessage("数据信息" + 4); cf.channel().writeAndFlush(request); cf.channel().closeFuture().sync(); System.out.println("子线程结束."); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); System.out.println("断开连接,主线程结束.."); } }
package bhz.netty.runtime; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; public class ClientHandler extends ChannelHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { Response resp = (Response)msg; System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage()); } finally { ReferenceCountUtil.release(msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
package bhz.netty.runtime; import io.netty.handler.codec.marshalling.DefaultMarshallerProvider; import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider; import io.netty.handler.codec.marshalling.MarshallerProvider; import io.netty.handler.codec.marshalling.MarshallingDecoder; import io.netty.handler.codec.marshalling.MarshallingEncoder; import io.netty.handler.codec.marshalling.UnmarshallerProvider; import org.jboss.marshalling.MarshallerFactory; import org.jboss.marshalling.Marshalling; import org.jboss.marshalling.MarshallingConfiguration; /** * Marshalling工厂 * @author(alienware) * @since 2014-12-16 */ public final class MarshallingCodeCFactory { /** * 创建Jboss Marshalling解码器MarshallingDecoder * @return MarshallingDecoder */ public static MarshallingDecoder buildMarshallingDecoder() { //首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。 final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); //创建了MarshallingConfiguration对象,配置了版本号为5 final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); //根据marshallerFactory和configuration创建provider UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration); //构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度 MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024); return decoder; } /** * 创建Jboss Marshalling编码器MarshallingEncoder * @return MarshallingEncoder */ public static MarshallingEncoder buildMarshallingEncoder() { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration); //构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组 MarshallingEncoder encoder = new MarshallingEncoder(provider); return encoder; } }
package bhz.netty.runtime; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.ReadTimeoutHandler; public class Server { public static void main(String[] args) throws Exception{ EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) //设置日志 .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); //五秒后没有连接,断开。 sc.pipeline().addLast(new ReadTimeoutHandler(5)); sc.pipeline().addLast(new ServerHandler()); } }); ChannelFuture cf = b.bind(8765).sync(); cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } }
package bhz.netty.runtime; import java.io.Serializable; public class Response implements Serializable{ private static final long serialVersionUID = 1L; private String id; private String name; private String responseMessage; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getResponseMessage() { return responseMessage; } public void setResponseMessage(String responseMessage) { this.responseMessage = responseMessage; } }
package bhz.netty.runtime; import java.io.Serializable; public class Request implements Serializable{ private static final long SerialVersionUID = 1L; private String id ; private String name ; private String requestMessage ; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getRequestMessage() { return requestMessage; } public void setRequestMessage(String requestMessage) { this.requestMessage = requestMessage; } }
package bhz.netty.runtime; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class ServerHandler extends ChannelHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Request request = (Request)msg; System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getRequestMessage()); Response response = new Response(); response.setId(request.getId()); response.setName("response" + request.getId()); response.setResponseMessage("响应内容" + request.getId()); ctx.writeAndFlush(response);//.addListener(ChannelFutureListener.CLOSE); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
相关推荐
配置maven添加io.netty 在com.zhao的包下的文件,可以自行修改使用
netty使用自带工具类实现断线重连和心跳包
netty的长连接.. 连接断开之后 自动重连demo
netty断线重连机制及心跳,包含客户端和服务端,主要学习怎么重连和发送接收心跳,不满足心跳则关闭管道。
Netty断线重连解决方案.docx
实现netty作为服务端,websocket连接成功,将channel保存到map集合,通过js发送心跳,服务端接收心跳信息并响应给客户端,当服务端断开时 客户端进行重连操作
使用netty4,客户端断线重连,使用message收发数据,重写方法
使用netty自带工具类实现心跳功能,并带有重连功能
Netty4长连接、断开重连、心跳检测、Msgpack编码解码 http://blog.csdn.net/giousa/article/details/72846303#t2
功能 #### 自定义协议 | 魔数 2byte | 指令类型 1byte | 数据长度 4byte ... 设置重连次数来判断是否重连,重连次数耗尽则停止重连并关闭客户端 - 多长时间重连 短时间内频繁重连既消耗资源又没有必要,好的重连
NULL 博文链接:https://gjp014.iteye.com/blog/2390925
一个netty小列子服务端和客户端
java作为服务端,websocket利用netty做连接,解压之后内有html前端,把项目导入eclipse,在main方法启动即可使用。
netty案例,netty4.1中级拓展篇八《Netty心跳服务与断线重连》源码 https://mp.weixin.qq.com/s?__biz=MzIxMDAwMDAxMw==&mid=2650724845&idx=1&sn=8631c590ff4876ba0b7af64df16fc54b&scene=19#wechat_redirect
用netty实现长连接和心跳监测的示例代码
Netty实现长连接通讯,连接协议为了简单json封装,如果需要协议变化可以自己封装,有问题可以联系邮箱:lmjlimj@foxmail.com
学习netty框架编写的maven项目,使用netty4.1.1.Final版本实现的netty客户端自动重连,检测链路空闲时自动发送心跳包,如没有收到返回则自动断开连接重连。先运行NettyServerBootstrap类启动Server端,再启动Netty...
springboot不用多说,火的不行,netty在很多实际项目中都有运用,非常好的一个框架,开发非常方便。 本文主要内容是: springboot与netty的整合 netty简单的长连接 pom文件 <groupId>org.springframework.boot ...
具体信息查看我的博客:https://blog.csdn.net/qq_37437983/article/details/86585079
NULL 博文链接:https://cpjsjxy.iteye.com/blog/1609830