`
zhangwei_david
  • 浏览: 469500 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

Netty通过心跳保持长链接

    博客分类:
  • Java
 
阅读更多

 

   Netty自带心跳检测功能,IdleStateHandler,客户端在写空闲时主动发起心跳请求,服务器接受到心跳请求后给出一个心跳响应。当客户端在一定时间范围内不能够给出响应则断开链接。

 

public class NettyClient {
	public void connect(String remoteServer, int port) throws Exception {
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			Bootstrap b = new Bootstrap();
			b.group(workerGroup).channel(NioSocketChannel.class).remoteAddress(remoteServer, port)
					.handler(new ChildChannelHandler());

			ChannelFuture f = b.connect();
			System.out.println("Netty time Client connected at port " + port);

			f.channel().closeFuture().sync();
		} finally {
			try {
				TimeUnit.SECONDS.sleep(5);
				try {
					System.out.println("重新链接。。。");
					connect(remoteServer, port);
				} catch (Exception e) {
					e.printStackTrace();
				}
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}

	public static class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

		@Override
		protected void initChannel(final SocketChannel ch) throws Exception {
			// -8表示lengthAdjustment,让解码器从0开始截取字节,并且包含消息头
			ch.pipeline().addLast(new RpcEncoder(NettyMessage.class)).addLast(new RpcDecoder(NettyMessage.class))
					.addLast(new IdleStateHandler(120, 10, 0, TimeUnit.SECONDS)).addLast(new HeartBeatReqHandler());
		}

	}

	public static void main(String[] args) {
		try {
			new NettyClient().connect("127.0.0.1", 12000);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

 

public class SerializationUtil {

    private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>();

    private static Objenesis                objenesis    = new ObjenesisStd(true);

    private static <T> Schema<T> getSchema(Class<T> clazz) {
        @SuppressWarnings("unchecked")
        Schema<T> schema = (Schema<T>) cachedSchema.get(clazz);
        if (schema == null) {
            schema = RuntimeSchema.getSchema(clazz);
            if (schema != null) {
                cachedSchema.put(clazz, schema);
            }
        }
        return schema;
    }

    /**
     * 序列化
     *
     * @param obj
     * @return
     */
    public static <T> byte[] serializer(T obj) {
        @SuppressWarnings("unchecked")
        Class<T> clazz = (Class<T>) obj.getClass();
        LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
        try {
            Schema<T> schema = getSchema(clazz);
            byte result[] = ProtostuffIOUtil.toByteArray(obj, schema, buffer);
            return result;
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        } finally {
            buffer.clear();
        }
    }

    /**
     * 反序列化
     *
     * @param data
     * @param clazz
     * @return
     */
    public static <T> T deserializer(byte[] data, Class<T> clazz) {
        try {
            T obj = objenesis.newInstance(clazz);
            Schema<T> schema = getSchema(clazz);
            ProtostuffIOUtil.mergeFrom(data, obj, schema);
            return obj;
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }
}

 

@SuppressWarnings("rawtypes")
public class RpcEncoder extends MessageToByteEncoder {

    private Class<?> genericClass;

    public RpcEncoder(Class<?> genericClass) {
        this.genericClass = genericClass;
    }

    @Override
    public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
        if (genericClass.isInstance(in)) {
        	System.out.println("发送的请求是:"+in);
            byte[] data = SerializationUtil.serializer(in);
            out.writeInt(data.length);
            out.writeBytes(data);
        }
    }
}

 

public class RpcDecoder extends ByteToMessageDecoder {

    private Class<?> genericClass;

    public RpcDecoder(Class<?> genericClass) {
        this.genericClass = genericClass;
    }

    @Override
    public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
            throws Exception {
        if (in.readableBytes() < 4) {
            return;
        }
        in.markReaderIndex();
        int dataLength = in.readInt();
        if (dataLength < 0) {
            ctx.close();
        }
        if (in.readableBytes() < dataLength) {
            in.resetReaderIndex();
        }
		byte[] data = new byte[dataLength];
        in.readBytes(data);

        Object obj = SerializationUtil.deserializer(data, genericClass);
        System.out.println("接收到的消息是:"+obj);
        out.add(obj);
    }
}

 

public class HeartBeatReqHandler extends ChannelDuplexHandler {

	/**
	 * @see io.netty.channel.ChannelInboundHandlerAdapter#userEventTriggered(io.netty.channel.ChannelHandlerContext,
	 *      java.lang.Object)
	 */
	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
		if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
			IdleStateEvent event = (IdleStateEvent) evt;
			if (event.state() == IdleState.READER_IDLE) {
				System.out.println("read 空闲");
				ctx.disconnect();
			} else if (event.state() == IdleState.WRITER_IDLE) {
				System.out.println("write 空闲");
				ctx.writeAndFlush(buildHeartBeat(MessageType.HEARTBEAT_REQ.getType()));
			}
		}
	}

	/**
	 * 
	 * @return
	 * @author zhangwei<wei.zw@corp.netease.com>
	 */
	private NettyMessage buildHeartBeat(byte type) {
		NettyMessage msg = new NettyMessage();
		Header header = new Header();
		header.setType(type);
		msg.setHeader(header);
		return msg;
	}

}

 

public class NettyServer {
	public void bind(int port) throws Exception {
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			ServerBootstrap b = new ServerBootstrap();
			b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
					.childHandler(new ChildChannelHandler());

			ChannelFuture f = b.bind(port).sync();
			System.out.println("Netty time Server started at port " + port);
			f.channel().closeFuture().sync();
		} finally {
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
	}

	public static class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

		@Override
		protected void initChannel(final SocketChannel ch) throws Exception {
			ch.pipeline().addLast(new RpcDecoder(NettyMessage.class)).addLast(new RpcEncoder(NettyMessage.class))
					.addLast(new IdleStateHandler(120, 0, 0, TimeUnit.SECONDS)).addLast(new HeartBeatRespHandler());
		}

	}

	public static void main(String[] args) {
		try {
			new NettyServer().bind(12000);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

 

public enum MessageType {

	LOGIN_REQ((byte) 1), LOGIN_RESP((byte) 2), HEARTBEAT_REQ((byte) 3), HEARTBEAT_RESP((byte) 4);
	private byte type;

	/**
	 * @param type
	 */
	private MessageType(byte type) {
		this.type = type;
	}

	public byte getType() {
		return type;
	}

	public void setType(byte type) {
		this.type = type;
	}

	public static MessageType getMessageType(byte type) {
		for (MessageType b : MessageType.values()) {
			if (b.getType() == type) {
				return b;
			}
		}
		return null;
	}

}

 

public class HeartBeatRespHandler extends SimpleChannelInboundHandler<NettyMessage> {

	/**
	 * @see io.netty.channel.SimpleChannelInboundHandler#channelRead0(io.netty.channel.ChannelHandlerContext,
	 *      java.lang.Object)
	 */
	@Override
	protected void channelRead0(ChannelHandlerContext ctx, NettyMessage msg) throws Exception {
		if (msg.getHeader() != null && msg.getHeader().getType() == MessageType.HEARTBEAT_REQ.getType()) {
			NettyMessage heartBeat = buildHeartBeat(MessageType.HEARTBEAT_RESP.getType());
			ctx.writeAndFlush(heartBeat);
		} else {
			ctx.fireChannelRead(msg);
		}
	}
	

	/**
	 * @see io.netty.channel.ChannelInboundHandlerAdapter#userEventTriggered(io.netty.channel.ChannelHandlerContext,
	 *      java.lang.Object)
	 */
	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
		if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
			IdleStateEvent event = (IdleStateEvent) evt;
			if (event.state() == IdleState.READER_IDLE) {
				System.out.println("read 空闲 关闭链接");
				ctx.disconnect();	
			} 
		}
	}
	

	/**
	 * 
	 * @return
	 * @author zhangwei<wei.zw@corp.netease.com>
	 */
	private NettyMessage buildHeartBeat(byte type) {
		NettyMessage msg = new NettyMessage();
		Header header = new Header();
		header.setType(type);
		msg.setHeader(header);
		return msg;
	}

}

 

public class NettyMessage implements Serializable{
	
	/**  */
	private static final long serialVersionUID = 1L;

	private Header header;
	
	private Object body;

	public Header getHeader() {
		return header;
	}

	public void setHeader(Header header) {
		this.header = header;
	}

	public Object getBody() {
		return body;
	}

	public void setBody(Object body) {
		this.body = body;
	}

	/** 
	 * @see java.lang.Object#toString()
	 */
	@Override
	public String toString() {
		return "NettyMessage [header=" + header + ", body=" + body + "]";
	}
	
	
}

 

 

 

 

public class Header implements Serializable{
	/**  */
	private static final long serialVersionUID = 1L;
	private int crcCode=0xabef0101;
	private int length;
	private long sessionId;
	private byte type;
	private byte priority;
	private Map<String,Object> attachment=new HashMap<>();
	public int getCrcCode() {
		return crcCode;
	}
	public void setCrcCode(int crcCode) {
		this.crcCode = crcCode;
	}
	public int getLength() {
		return length;
	}
	public void setLength(int length) {
		this.length = length;
	}
	public long getSessionId() {
		return sessionId;
	}
	public void setSessionId(long sessionId) {
		this.sessionId = sessionId;
	}
	public byte getType() {
		return type;
	}
	public void setType(byte type) {
		this.type = type;
	}
	public byte getPriority() {
		return priority;
	}
	public void setPriority(byte priority) {
		this.priority = priority;
	}
	public Map<String, Object> getAttachment() {
		return attachment;
	}
	public void setAttachment(Map<String, Object> attachment) {
		this.attachment = attachment;
	}
	/** 
	 * @see java.lang.Object#toString()
	 */
	@Override
	public String toString() {
		return "Header [crcCode=" + crcCode + ", length=" + length + ", sessionId=" + sessionId + ", type=" + type
				+ ", priority=" + priority + ", attachment=" + attachment + "]";
	}
	
	
}

客户端的结果是:

etty time Client connected at port 12000
write 空闲
发送的请求是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=3, priority=0, attachment={}], body=null]
接收到的消息是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=4, priority=0, attachment={}], body=null]
write 空闲
发送的请求是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=3, priority=0, attachment={}], body=null]
接收到的消息是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=4, priority=0, attachment={}], body=null]
write 空闲
发送的请求是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=3, priority=0, attachment={}], body=null]
接收到的消息是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=4, priority=0, attachment={}], body=null]
write 空闲
发送的请求是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=3, priority=0, attachment={}], body=null]
接收到的消息是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=4, priority=0, attachment={}], body=null]
write 空闲
发送的请求是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=3, priority=0, attachment={}], body=null]
接收到的消息是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=4, priority=0, attachment={}], body=null]

 

 

1
4
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics