ServiceServer实现自动发现服务:
/** * * @author zhangwei * @version $Id: NettyServiceServer.java, v 0.1 2015年8月19日 下午2:08:37 $ */ public class NettyServiceServer implements InitializingBean, Lifecycle, ApplicationContextAware { /**服务端口号**/ private int port = 12000; private RpcServerHandler rpcServerHandler; private void publishedService() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(5); EventLoopGroup workerGroup = new NioEventLoopGroup(5); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel channel) throws Exception { channel.pipeline().addLast(new RpcDecoder(SrpcRequest.class)) .addLast(new RpcEncoder(SrpcResponse.class)).addLast(rpcServerHandler); } }); //绑定主机+端口 ChannelFuture future = serverBootstrap.bind("127.0.0.1", port).sync(); // 等待服务监听端口关闭 future.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } /** * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() */ @Override public void afterPropertiesSet() throws Exception { publishedService(); } /** * @see org.springframework.context.Lifecycle#start() */ @Override public void start() { } /** * @see org.springframework.context.Lifecycle#stop() */ @Override public void stop() { } /** * @see org.springframework.context.Lifecycle#isRunning() */ @Override public boolean isRunning() { return false; } /** * @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext) */ @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(SRPC.class); Map<String, Object> handlerMap = new HashMap<String, Object>(); if (serviceBeanMap != null && !serviceBeanMap.isEmpty()) { for (Object serviceBean : serviceBeanMap.values()) { String interfaceName = serviceBean.getClass().getAnnotation(SRPC.class).interf() .getName(); handlerMap.put(interfaceName, serviceBean); } } System.out.println("自动注册的服务SRPC 服务有:" + handlerMap.keySet()); rpcServerHandler = new RpcServerHandler(handlerMap); } }
注意:在该类上添加了@Sharable注解,如果没有改注解则该ChannelHandler不允许多次读写
Netty将不会再同步地调用ChannelHandler的方法了,除非ChannelHandler由@Shareable注解
/** * * @author zhangwei_PF * @version $Id: RpcServerHandler.java, v 0.1 2015年8月19日 下午2:17:34 $ */ @Sharable public class RpcServerHandler extends SimpleChannelInboundHandler<SrpcRequest> { private final Map<String, Object> handlerMap; public RpcServerHandler(Map<String, Object> handlerMap) { this.handlerMap = handlerMap; } /** * @see io.netty.channel.SimpleChannelInboundHandler#channelRead0(io.netty.channel.ChannelHandlerContext, java.lang.Object) */ @Override protected void channelRead0(ChannelHandlerContext ctx, SrpcRequest srpcRequest) throws Exception { SrpcResponse response = new SrpcResponse(); response.setRequestId(srpcRequest.getRequestId()); try { response.setResult(handle(srpcRequest)); } catch (Exception e) { response.setError(e); e.printStackTrace(); } ctx.writeAndFlush(response); } /** * * @param srpcRequest * @return * @throws Exception */ private Object handle(SrpcRequest request) throws Exception { Object service = handlerMap.get(request.getInterfaceName()); Method method = service.getClass().getMethod(request.getMethodName(), request.getParameterTypes()); return method.invoke(service, request.getParameters()); } }
/** * * @author zhangwei_david * @version $Id: RpcEncoder.java, v 0.1 2014年12月31日 下午8:55:25 zhangwei_david Exp $ */ @SuppressWarnings("rawtypes") public class RpcEncoder extends MessageToByteEncoder { private Class<?> genericClass; public RpcEncoder(Class<?> genericClass) { this.genericClass = genericClass; } @Override public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception { if (genericClass.isInstance(in)) { byte[] data = SerializationUtil.serializer(in); out.writeInt(data.length); out.writeBytes(data); } } }
/** * * @author zhangwei_david * @version $Id: RpcDecoder.java, v 0.1 2014年12月31日 下午8:53:16 zhangwei_david Exp $ */ public class RpcDecoder extends ByteToMessageDecoder { private Class<?> genericClass; public RpcDecoder(Class<?> genericClass) { this.genericClass = genericClass; } @Override public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() < 4) { return; } in.markReaderIndex(); int dataLength = in.readInt(); if (dataLength < 0) { ctx.close(); } if (in.readableBytes() < dataLength) { in.resetReaderIndex(); } byte[] data = new byte[dataLength]; in.readBytes(data); Object obj = SerializationUtil.deserializer(data, genericClass); out.add(obj); }
/** * * @author zhangwei_david * @version $Id: SrpcResponse.java, v 0.1 2015年8月8日 下午1:47:46 zhangwei_david Exp $ */ public class SrpcResponse implements Serializable { /** */ private static final long serialVersionUID = -5934073769679010930L; // 请求的Id private String requestId; // 异常 private Throwable error; // 响应 private Object result; /** * Getter method for property <tt>requestId</tt>. * * @return property value of requestId */ public String getRequestId() { return requestId; } /** * Setter method for property <tt>requestId</tt>. * * @param requestId value to be assigned to property requestId */ public void setRequestId(String requestId) { this.requestId = requestId; } /** * Getter method for property <tt>error</tt>. * * @return property value of error */ public Throwable getError() { return error; } /** * Setter method for property <tt>error</tt>. * * @param error value to be assigned to property error */ public void setError(Throwable error) { this.error = error; } /** * Getter method for property <tt>result</tt>. * * @return property value of result */ public Object getResult() { return result; } /** * Setter method for property <tt>result</tt>. * * @param result value to be assigned to property result */ public void setResult(Object result) { this.result = result; } /** * @see java.lang.Object#toString() */ @Override public String toString() { return "SrpcResponse [requestId=" + requestId + ", error=" + error + ", result=" + result + "]"; } }
/** * * @author zhangwei_david * @version $Id: SrpcRequest.java, v 0.1 2015年8月8日 下午1:45:53 zhangwei_david Exp $ */ public class SrpcRequest implements Serializable { /** */ private static final long serialVersionUID = 6132853628325824727L; // 请求Id private String requestId; // 远程调用接口名称 private String interfaceName; //远程调用方法名称 private String methodName; // 参数类型 private Class<?>[] parameterTypes; // 参数值 private Object[] parameters; /** * Getter method for property <tt>requestId</tt>. * * @return property value of requestId */ public String getRequestId() { return requestId; } /** * Setter method for property <tt>requestId</tt>. * * @param requestId value to be assigned to property requestId */ public void setRequestId(String requestId) { this.requestId = requestId; } /** * Getter method for property <tt>interfaceName</tt>. * * @return property value of interfaceName */ public String getInterfaceName() { return interfaceName; } /** * Setter method for property <tt>interfaceName</tt>. * * @param interfaceName value to be assigned to property interfaceName */ public void setInterfaceName(String interfaceName) { this.interfaceName = interfaceName; } /** * Getter method for property <tt>methodName</tt>. * * @return property value of methodName */ public String getMethodName() { return methodName; } /** * Setter method for property <tt>methodName</tt>. * * @param methodName value to be assigned to property methodName */ public void setMethodName(String methodName) { this.methodName = methodName; } /** * Getter method for property <tt>parameterTypes</tt>. * * @return property value of parameterTypes */ public Class<?>[] getParameterTypes() { return parameterTypes; } /** * Setter method for property <tt>parameterTypes</tt>. * * @param parameterTypes value to be assigned to property parameterTypes */ public void setParameterTypes(Class<?>[] parameterTypes) { this.parameterTypes = parameterTypes; } /** * Getter method for property <tt>parameters</tt>. * * @return property value of parameters */ public Object[] getParameters() { return parameters; } /** * Setter method for property <tt>parameters</tt>. * * @param parameters value to be assigned to property parameters */ public void setParameters(Object[] parameters) { this.parameters = parameters; } /** * @see java.lang.Object#toString() */ @Override public String toString() { return "SrpcRequest [requestId=" + requestId + ", interfaceName=" + interfaceName + ", methodName=" + methodName + ", parameterTypes=" + Arrays.toString(parameterTypes) + ", parameters=" + Arrays.toString(parameters) + "]"; } }
/** * * @author zhangwei_PF * @version $Id: SrpcRequestSender.java, v 0.1 2015年8月20日 下午2:13:31 $ */ @Sharable public class SrpcRequestSender extends SimpleChannelInboundHandler<SrpcResponse> { //final CountDownLatch latch = new CountDownLatch(1); private BlockingQueue<SrpcResponse> responseHodler = new LinkedBlockingQueue<SrpcResponse>(1); // private SrpcResponse response; @Override public void channelRead0(ChannelHandlerContext ctx, SrpcResponse response) throws Exception { // this.response = response; // latch.countDown(); responseHodler.put(response); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } public SrpcResponse send(SrpcRequest request, String host, int port) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new RpcEncoder(SrpcRequest.class)) .addLast(new RpcDecoder(SrpcResponse.class)) .addLast(SrpcRequestSender.this); } }); ChannelFuture future = bootstrap.connect(host, port).sync(); Channel channel = future.channel(); channel.writeAndFlush(request).sync(); /** * * 使用闭锁实现等待 */ // latch.await(); SrpcResponse response = responseHodler.take(); System.out.println("send request is " + request + "receive response is " + response); channel.closeFuture(); return response; } finally { group.shutdownGracefully(); } } }
/** * * @author zhangwei_PF * @version $Id: RpcClientProxy.java, v 0.1 2015年8月19日 下午6:01:26 $ */ public class SrpcProxyFactory { /** * 引用服务 * * @param <T> 接口泛型 * @param interfaceClass 接口类型 * @param host 服务器主机名 * @param port 服务器端口 * @return 远程服务 * @throws Exception */ @SuppressWarnings("unchecked") public static <T> T create(final Class<T> interfaceClass, final String host, final int port) throws Exception { if (interfaceClass == null || !interfaceClass.isInterface()) { throw new IllegalArgumentException("必须指定服务接口"); } if (host == null || host.length() == 0) { throw new IllegalArgumentException("必须指定服务器的地址和端口号"); } return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] { interfaceClass }, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable { SrpcRequest request = new SrpcRequest(); request.setRequestId(UUID.randomUUID().toString()); request.setInterfaceName(interfaceClass.getName()); request.setMethodName(method.getName()); request.setParameterTypes(method.getParameterTypes()); request.setParameters(arguments); SrpcResponse response = new SrpcRequestSender().send(request, host, port); if (response == null || !StringUtils.equals(request.getRequestId(), response.getRequestId())) { return null; } if (response.getError() != null) { throw response.getError(); } return response.getResult(); } }); } }
相关推荐
netty的rpc协议的简单实现
基于Zookeeper+Netty+Protostuff实现的简单RPC框架源码,代码内有详细注释
NettyRpc 基于Netty,ZooKeeper和Spring的RPC框架中文详情: 特征: 简单的代码和框架 ZooKeeper的服务注册表/发现支持 高可用性,负载平衡和故障转移 支持不同的负载均衡策略 支持异步/同步调用 支持不同版本的...
使用netty作为底层的通讯工具实现了一个简单的rpc工具。
用Netty实现一个简单的RPC框架,基本上rpc主要的知识点都涉及到了,包括协议的定义,序列化反序列化,动态代理,Spring自动装配,Netty编解码器等。可以通过这个项目加强对Netty的学习掌握,也可以加深对RPC的理解。...
「喜欢自提」基于netty实现rpc。 网上找的开源项目,用心研究了一下,做了些调整,都写中文注释。 挺有意思的,如果你想研究开源项目,这或许是个不错的突破口
oh-netty-rpc-client : 基于netty实现的RPC client,使用JDK动态代理实现RPC client,隐藏了底层实现细节,使服务调用看起来像是本地调用(网络通讯、编解码、远程调用) oh-netty-rpc-protocol : 封装了RPC通讯之间...
EasyRpc EasyRpc是基于Netty,ZooKeeper和ProtoStuff开发的一个简单易用,便于学习的RPC框架。 1特性简单易用;注释完善,方便学习;低延迟,基于Netty 4;解决TCP粘包/拆包问题;支持非双向的同步/异步调用;基于...
guide 目前只实现了RPC框架最基本的功能,一些可优化点都在下面提到了,有兴趣的小伙伴可以自我完善。 通过这个简易的轮子,你可以学到RPC的替代原理和原理以及各种Java编码实践的运用。 你甚至可以把当做你的毕设/...
通过案例演示,相信小伙伴们对 Netty 的应用已经有了一个比较深刻的印象,本次内容,只是对 RPC 的基本实现原理做了一个简单的实现,感兴趣的小伙伴可以在本项 目的基础上继续完善 RPC 的其他细节。
基于Netty手写Dubbo,该资源包含 ...2、RPC远程调用实现。 3、netty服务调用,对象序列化和反序列化。 4、负载均衡的简单实现 详情见博客:https://blog.csdn.net/July_whj/article/details/89813536
包含RPC原理、NIO操作、netty简单的api、自定义RPC框架
实现的简单的RPC框架源码,基于Zookeeper+Netty+Protostuff开发
基于netty和zookeeper的RPC框架说明nzRpc是一个基于netty和zookeeper的RPC框架,使用netty作为底层socket通信框架。使用Zookeeper作为注册中心。服务提供者启动时会向服务注册中心注册相关信息消费者启动时,会获取...
可参考博客http://blog.csdn.net/u013177446/article/details/66473066 使用netty/反射/序列化反序列化等技术是一个一个简单的RPC框架
基于Netty实现远程通信; 服务端使用Guice进行各组件管理和整合; 基于kyro实现高效通用序列化协议; 基于zookeeper实现服务注册; 基于代理与反射使得接口简单易用; 客户端实现随机、一致性哈希、roundbin三种负载...
自学并实现的一个基于Netty的简单的rpc框架, 想不还不太成熟, 不过能运行了. 测试方式 运行com.tao.rpc.example.server.test.Server的main()方法,开启rpc服务器端. 运行com.tao.rpc.example.client.test包下的各种...
还实现了对应的自动扫描接口,并生产对应的代理类注入到对应的ioc容器中,并加上对应的自动DI操作,服务端对应的消息的分发模式,代码更加简单明白,网络部分,加上了对应的tcp粘包,json编解码
目前只实现了 RPC 框架最基本的功能,一些可优化点都在下面提到了,有兴趣的小伙伴可以自行完善。 通过这个简易的轮子,你可以学到 RPC 的底层原理和原理以及各种 Java 编码实践的运用。 你甚至可以把 当做你的毕设...
网络学习