`
zhangwei_david
  • 浏览: 468506 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

实现RPC就是这么简单 之 Netty 实现

    博客分类:
  • Java
 
阅读更多

 

ServiceServer实现自动发现服务:

/**
 *
 * @author zhangwei
 * @version $Id: NettyServiceServer.java, v 0.1 2015年8月19日 下午2:08:37  $
 */
public class NettyServiceServer implements InitializingBean, Lifecycle, ApplicationContextAware {

    /**服务端口号**/
    private int              port = 12000;

    private RpcServerHandler rpcServerHandler;

    private void publishedService() throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup(5);
        EventLoopGroup workerGroup = new NioEventLoopGroup(5);
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel channel) throws Exception {
                    channel.pipeline().addLast(new RpcDecoder(SrpcRequest.class))
                    .addLast(new RpcEncoder(SrpcResponse.class)).addLast(rpcServerHandler);
                }
            });

            //绑定主机+端口
            ChannelFuture future = serverBootstrap.bind("127.0.0.1", port).sync();

            // 等待服务监听端口关闭
            future.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    /**
     * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
     */
    @Override
    public void afterPropertiesSet() throws Exception {
        publishedService();
    }

    /**
     * @see org.springframework.context.Lifecycle#start()
     */
    @Override
    public void start() {
    }

    /**
     * @see org.springframework.context.Lifecycle#stop()
     */
    @Override
    public void stop() {

    }

    /**
     * @see org.springframework.context.Lifecycle#isRunning()
     */
    @Override
    public boolean isRunning() {
        return false;
    }

    /**
     * @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext)
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(SRPC.class);
        Map<String, Object> handlerMap = new HashMap<String, Object>();
        if (serviceBeanMap != null && !serviceBeanMap.isEmpty()) {
            for (Object serviceBean : serviceBeanMap.values()) {
                String interfaceName = serviceBean.getClass().getAnnotation(SRPC.class).interf()
                        .getName();
                handlerMap.put(interfaceName, serviceBean);
            }
        }
        System.out.println("自动注册的服务SRPC 服务有:" + handlerMap.keySet());
        rpcServerHandler = new RpcServerHandler(handlerMap);
    }
}

 注意:在该类上添加了@Sharable注解,如果没有改注解则该ChannelHandler不允许多次读写

Netty将不会再同步地调用ChannelHandler的方法了,除非ChannelHandler由@Shareable注解

/**
 *
 * @author zhangwei_PF
 * @version $Id: RpcServerHandler.java, v 0.1 2015年8月19日 下午2:17:34  $
 */
@Sharable
public class RpcServerHandler extends SimpleChannelInboundHandler<SrpcRequest> {

    private final Map<String, Object> handlerMap;

    public RpcServerHandler(Map<String, Object> handlerMap) {
        this.handlerMap = handlerMap;
    }

    /**
     * @see io.netty.channel.SimpleChannelInboundHandler#channelRead0(io.netty.channel.ChannelHandlerContext, java.lang.Object)
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, SrpcRequest srpcRequest)
            throws Exception {

        SrpcResponse response = new SrpcResponse();
        response.setRequestId(srpcRequest.getRequestId());
        try {
            response.setResult(handle(srpcRequest));
        } catch (Exception e) {
            response.setError(e);
            e.printStackTrace();
        }

        ctx.writeAndFlush(response);
    }

    /**
     *
     * @param srpcRequest
     * @return
     * @throws Exception
     */
    private Object handle(SrpcRequest request) throws Exception {
        Object service = handlerMap.get(request.getInterfaceName());

        Method method = service.getClass().getMethod(request.getMethodName(),
            request.getParameterTypes());

        return method.invoke(service, request.getParameters());

    }

}

 

/**
 *
 * @author zhangwei_david
 * @version $Id: RpcEncoder.java, v 0.1 2014年12月31日 下午8:55:25 zhangwei_david Exp $
 */
@SuppressWarnings("rawtypes")
public class RpcEncoder extends MessageToByteEncoder {

    private Class<?> genericClass;

    public RpcEncoder(Class<?> genericClass) {
        this.genericClass = genericClass;
    }

    @Override
    public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
        if (genericClass.isInstance(in)) {
            byte[] data = SerializationUtil.serializer(in);
            out.writeInt(data.length);
            out.writeBytes(data);
        }
    }
}

 

/**
 *
 * @author zhangwei_david
 * @version $Id: RpcDecoder.java, v 0.1 2014年12月31日 下午8:53:16 zhangwei_david Exp $
 */
public class RpcDecoder extends ByteToMessageDecoder {

    private Class<?> genericClass;

    public RpcDecoder(Class<?> genericClass) {
        this.genericClass = genericClass;
    }

    @Override
    public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
            throws Exception {
        if (in.readableBytes() < 4) {
            return;
        }
        in.markReaderIndex();
        int dataLength = in.readInt();
        if (dataLength < 0) {
            ctx.close();
        }
        if (in.readableBytes() < dataLength) {
            in.resetReaderIndex();
        }
        byte[] data = new byte[dataLength];
        in.readBytes(data);

        Object obj = SerializationUtil.deserializer(data, genericClass);
        out.add(obj);
    }

 

/**
 *
 * @author zhangwei_david
 * @version $Id: SrpcResponse.java, v 0.1 2015年8月8日 下午1:47:46 zhangwei_david Exp $
 */
public class SrpcResponse implements Serializable {
    /**  */
    private static final long serialVersionUID = -5934073769679010930L;
    // 请求的Id
    private String            requestId;
    // 异常
    private Throwable         error;
    // 响应
    private Object            result;

    /**
     * Getter method for property <tt>requestId</tt>.
     *
     * @return property value of requestId
     */
    public String getRequestId() {
        return requestId;
    }

    /**
     * Setter method for property <tt>requestId</tt>.
     *
     * @param requestId value to be assigned to property requestId
     */
    public void setRequestId(String requestId) {
        this.requestId = requestId;
    }

    /**
     * Getter method for property <tt>error</tt>.
     *
     * @return property value of error
     */
    public Throwable getError() {
        return error;
    }

    /**
     * Setter method for property <tt>error</tt>.
     *
     * @param error value to be assigned to property error
     */
    public void setError(Throwable error) {
        this.error = error;
    }

    /**
     * Getter method for property <tt>result</tt>.
     *
     * @return property value of result
     */
    public Object getResult() {
        return result;
    }

    /**
     * Setter method for property <tt>result</tt>.
     *
     * @param result value to be assigned to property result
     */
    public void setResult(Object result) {
        this.result = result;
    }

    /**
     * @see java.lang.Object#toString()
     */
    @Override
    public String toString() {
        return "SrpcResponse [requestId=" + requestId + ", error=" + error + ", result=" + result
                + "]";
    }

}

 

/**
 *
 * @author zhangwei_david
 * @version $Id: SrpcRequest.java, v 0.1 2015年8月8日 下午1:45:53 zhangwei_david Exp $
 */
public class SrpcRequest implements Serializable {

    /**  */
    private static final long serialVersionUID = 6132853628325824727L;
    // 请求Id
    private String            requestId;
    // 远程调用接口名称
    private String            interfaceName;
    //远程调用方法名称
    private String            methodName;
    // 参数类型
    private Class<?>[]        parameterTypes;
    // 参数值
    private Object[]          parameters;

    /**
     * Getter method for property <tt>requestId</tt>.
     *
     * @return property value of requestId
     */
    public String getRequestId() {
        return requestId;
    }

    /**
     * Setter method for property <tt>requestId</tt>.
     *
     * @param requestId value to be assigned to property requestId
     */
    public void setRequestId(String requestId) {
        this.requestId = requestId;
    }

    /**
     * Getter method for property <tt>interfaceName</tt>.
     *
     * @return property value of interfaceName
     */
    public String getInterfaceName() {
        return interfaceName;
    }

    /**
     * Setter method for property <tt>interfaceName</tt>.
     *
     * @param interfaceName value to be assigned to property interfaceName
     */
    public void setInterfaceName(String interfaceName) {
        this.interfaceName = interfaceName;
    }

    /**
     * Getter method for property <tt>methodName</tt>.
     *
     * @return property value of methodName
     */
    public String getMethodName() {
        return methodName;
    }

    /**
     * Setter method for property <tt>methodName</tt>.
     *
     * @param methodName value to be assigned to property methodName
     */
    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    /**
     * Getter method for property <tt>parameterTypes</tt>.
     *
     * @return property value of parameterTypes
     */
    public Class<?>[] getParameterTypes() {
        return parameterTypes;
    }

    /**
     * Setter method for property <tt>parameterTypes</tt>.
     *
     * @param parameterTypes value to be assigned to property parameterTypes
     */
    public void setParameterTypes(Class<?>[] parameterTypes) {
        this.parameterTypes = parameterTypes;
    }

    /**
     * Getter method for property <tt>parameters</tt>.
     *
     * @return property value of parameters
     */
    public Object[] getParameters() {
        return parameters;
    }

    /**
     * Setter method for property <tt>parameters</tt>.
     *
     * @param parameters value to be assigned to property parameters
     */
    public void setParameters(Object[] parameters) {
        this.parameters = parameters;
    }

    /**
     * @see java.lang.Object#toString()
     */
    @Override
    public String toString() {
        return "SrpcRequest [requestId=" + requestId + ", interfaceName=" + interfaceName
                + ", methodName=" + methodName + ", parameterTypes="
                + Arrays.toString(parameterTypes) + ", parameters=" + Arrays.toString(parameters)
                + "]";
    }

}

 

/**
 *
 * @author zhangwei_PF
 * @version $Id: SrpcRequestSender.java, v 0.1 2015年8月20日 下午2:13:31  $
 */
@Sharable
public class SrpcRequestSender extends SimpleChannelInboundHandler<SrpcResponse> {

    //final CountDownLatch latch = new CountDownLatch(1);

    private BlockingQueue<SrpcResponse> responseHodler = new LinkedBlockingQueue<SrpcResponse>(1);

    //   private SrpcResponse                response;

    @Override
    public void channelRead0(ChannelHandlerContext ctx, SrpcResponse response) throws Exception {
        //  this.response = response;
        // latch.countDown();
        responseHodler.put(response);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    public SrpcResponse send(SrpcRequest request, String host, int port) throws Exception {

        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {

                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new RpcEncoder(SrpcRequest.class))
                            .addLast(new RpcDecoder(SrpcResponse.class))
                            .addLast(SrpcRequestSender.this);
                    }

                });
            ChannelFuture future = bootstrap.connect(host, port).sync();
            Channel channel = future.channel();

            channel.writeAndFlush(request).sync();
            /**
             *
             * 使用闭锁实现等待
             */
            //  latch.await();
            SrpcResponse response = responseHodler.take();
            System.out.println("send request is " + request + "receive response is " + response);
            channel.closeFuture();
            return response;
        } finally {
            group.shutdownGracefully();
        }

    }

}

 

/**
 *
 * @author zhangwei_PF
 * @version $Id: RpcClientProxy.java, v 0.1 2015年8月19日 下午6:01:26  $
 */
public class SrpcProxyFactory {

    /**
     * 引用服务
     *
     * @param <T> 接口泛型
     * @param interfaceClass 接口类型
     * @param host 服务器主机名
     * @param port 服务器端口
     * @return 远程服务
     * @throws Exception
     */
    @SuppressWarnings("unchecked")
    public static <T> T create(final Class<T> interfaceClass, final String host, final int port)
                                                                                                throws Exception {
        if (interfaceClass == null || !interfaceClass.isInterface()) {
            throw new IllegalArgumentException("必须指定服务接口");
        }

        if (host == null || host.length() == 0) {
            throw new IllegalArgumentException("必须指定服务器的地址和端口号");
        }

        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
            new Class<?>[] { interfaceClass }, new InvocationHandler() {
                @Override
                public Object invoke(Object proxy, Method method, Object[] arguments)
                                                                                     throws Throwable {

                    SrpcRequest request = new SrpcRequest();
                    request.setRequestId(UUID.randomUUID().toString());
                    request.setInterfaceName(interfaceClass.getName());
                    request.setMethodName(method.getName());
                    request.setParameterTypes(method.getParameterTypes());
                    request.setParameters(arguments);
                    SrpcResponse response = new SrpcRequestSender().send(request, host, port);

                    if (response == null
                        || !StringUtils.equals(request.getRequestId(), response.getRequestId())) {
                        return null;
                    }
                    if (response.getError() != null) {
                        throw response.getError();
                    }
                    return response.getResult();

                }
            });

    }
}

 

1
3
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics