Coder Social home page Coder Social logo

snailclimb / guide-rpc-framework Goto Github PK

View Code? Open in Web Editor NEW
3.7K 50.0 2.0K 4.26 MB

A custom RPC framework implemented by Netty+Kyro+Zookeeper.(一款基于 Netty+Kyro+Zookeeper 实现的自定义 RPC 框架-附详细实现过程和相关教程。)

Home Page: https://gitee.com/SnailClimb/guide-rpc-framework

License: Other

Java 99.86% Shell 0.14%

guide-rpc-framework's Issues

关于ProtostuffSerializer 的使用问题

https://github.com/Snailclimb/guide-rpc-framework/blob/master/rpc-framework-simple/src/main/java/github/javaguide/serialize/protostuff/ProtostuffSerializer.java

/**
 * Avoid re applying buffer space every time serialization
 */
private static final LinkedBuffer BUFFER = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);

@Override
public byte[] serialize(Object obj) {
    Class<?> clazz = obj.getClass();
    Schema schema = RuntimeSchema.getSchema(clazz);
    byte[] bytes;
    try {
        bytes = ProtostuffIOUtil.toByteArray(obj, schema, BUFFER);
    } finally {
        BUFFER.clear();
    }
    return bytes;
}

@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
    Schema<T> schema = RuntimeSchema.getSchema(clazz);
    T obj = schema.newMessage();
    ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
    return obj;
}

1,其中LinkedBuffer 为静态,在并发情况下其中一个线程执行这个方法BUFFER.clear(),其他的是否会报错
2,Schema schema 这个可以用一个ConcurrentHashMap 存储起来,性能会更好
下面是我修改的代码:

private static Map<Class<?>, Schema<?>> schemaMap = new ConcurrentHashMap<>();
private static <T>Schema<T> getSchema(Class<?> clazz) {
    if (schemaMap.containsKey(clazz)) {
        return (Schema<T>)schemaMap.get(clazz);
    }
    Schema schema = RuntimeSchema.getSchema(clazz);
    // 双重检查
    if (schema != null) {
        schemaMap.put(clazz, schema);
    }
    return schema;
}

@Override
public byte[] serialize(Object object) {
    Class<?> clazz = object.getClass();
    LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
    try {
        Schema schema = getSchema(clazz);
        return ProtostuffIOUtil.toByteArray(object, schema, buffer);
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    } finally {
        buffer.clear();
    }
}

@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
    Schema<T> schema = getSchema(clazz);
    T obj = schema.newMessage();
    ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
    return obj;
}

关于NettyRpcClientHandler中NettyRpcClient使用单例工厂获取的问题,单例工厂中没有ExtensionLoader创建的client。

当前master中,NettyRpcClientHandler通过单例工厂获得NettyRpcClient,然后在userEventTriggered方法中调用Client的getChannel方法。
image
image
这种方式会不会造成:第一次获取NettyRpcClient对象的时候,一定会通过单例工厂重新构建一个新的NettyRpcClient对象。
如果改成,NettyRpcClientHandler通过构造函数NettyRpcClient传递方式,是否存在问题:
image

Error creating bean with name 'helloServiceImpl' defined in file

[main] WARN org.springframework.context.annotation.AnnotationConfigApplicationContext - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'helloServiceImpl' defined in file [/Users/liuyangos8888/20200808/guide-rpc-framework/example-server/target/classes/github/javaguide/serviceimpl/HelloServiceImpl.class]: Initialization of bean failed; nested exception is github.javaguide.exception.RpcException: KeeperErrorCode = Unimplemented for /my-rpc/github.javaguide.HelloServicetest1version1/127.0.0.1:9998
Exception in thread "main" org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'helloServiceImpl' defined in file [/Users/liuyangos8888/20200808/guide-rpc-framework/example-server/target/classes/github/javaguide/serviceimpl/HelloServiceImpl.class]: Initialization of bean failed; nested exception is github.javaguide.exception.RpcException: KeeperErrorCode = Unimplemented for /my-rpc/github.javaguide.HelloServicetest1version1/127.0.0.1:9998
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:603)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:517)
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:323)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:226)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:321)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:893)
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:879)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:551)
at org.springframework.context.annotation.AnnotationConfigApplicationContext.(AnnotationConfigApplicationContext.java:89)
at NettyServerMain.main(NettyServerMain.java:18)
Caused by: github.javaguide.exception.RpcException: KeeperErrorCode = Unimplemented for /my-rpc/github.javaguide.HelloServicetest1version1/127.0.0.1:9998
at github.javaguide.registry.zk.util.CuratorUtils.createPersistentNode(CuratorUtils.java:58)
at github.javaguide.registry.zk.ZkServiceRegistry.registerService(ZkServiceRegistry.java:23)
at github.javaguide.provider.ServiceProviderImpl.publishService(ServiceProviderImpl.java:73)
at github.javaguide.spring.SpringBeanPostProcessor.postProcessBeforeInitialization(SpringBeanPostProcessor.java:48)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:416)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1788)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:595)
... 10 more

关于加载gzip扩展的一个小错误

下载最新的代码,运行之后报错如下
image

发现rpc-framework-simple下面的MERA-INF中关于Compress的配置文件名称写错了,因为github.javaguide.compress.Compress接口是大写字母开头的,而配置文件名称是小写字母c所以导致找不到这个接口的SPI扩展类gzip

关于 SingletonFactory 获取单例的问题

https://github.com/Snailclimb/guide-rpc-framework/blob/master/rpc-framework-common/src/main/java/github/javaguide/factory/SingletonFactory.java

public static <T> T getInstance(Class<T> c) {
        String key = c.toString();
        Object instance = null;
        if (instance == null) {
            synchronized (SingletonFactory.class) {
                instance = OBJECT_MAP.get(key);
                if (instance == null) {
                    try {
                        instance = c.getDeclaredConstructor().newInstance();
                        OBJECT_MAP.put(key, instance);
                    } catch (IllegalAccessException | InstantiationException e) {
                        throw new RuntimeException(e.getMessage(), e);
                    } catch (NoSuchMethodException | InvocationTargetException e) {
                        e.printStackTrace();
                    }
                }
            }
        }

        return c.cast(instance);
    }

这一段感觉在模仿标准的 dcl 写法,但是

Object instance = null;
if (instance == null) {
synchronized (SingletonFactory.class) {

明显 instance 肯定为空,判断多余,且synchronized (SingletonFactory.class) { 使用 类锁,锁的力度很大,在多线程情况下并发度很低阻塞会很严重

github.javaguide.provider.impl.ZkServiceProviderImpl类中为什么有serviceMap,还需要registeredService这个set去保存接口名?

请问github.javaguide.provider.impl.ZkServiceProviderImpl类中为什么有serviceMap,还需要registeredService这个set去保存接口名?

/**
     * key: rpc service name(interface name + version + group)
     * value: service object
     */
    private final Map<String, Object> serviceMap;
    private final Set<String> registeredService;

    // 在get方法中:
    String rpcServiceName = rpcServiceConfig.getRpcServiceName();
    if (registeredService.contains(rpcServiceName)) {
        return;
    }
    registeredService.add(rpcServiceName);
    serviceMap.put(rpcServiceName, rpcServiceConfig.getService());

直接用serviceMap.containsKey(rpcServiceName)不行吗?对此有些疑惑 :-)

发现的小问题

image
这个字节数组输出流不需要关闭么?感觉应该写到try后面的括号里

发现的一些小问题

我个人很喜欢这个项目,因为它能帮助我更好的理解rpc的细节。前段时间我看了李林峰的 《Netty权威指南》,也自己实现过简单的rpc,但是却不如你的全面和细节。在阅读源码的过程中,我发现了一下小问题,也可能是我没有get到你的思路,请指教

  1. github.javaguide.factory.SingletonFactory#getInstance,这个单例只是实现了一次校验,所以它恐怕不能正确的运行。
  2. github.javaguide.serialize.kyro.KryoSerializer#kryoThreadLocal,因为Kryo是非线程安全的,所以用了ThreadLocal为每个线程创建一个实例。由于netty的序列化操作是在线程池中运行,所以没必要每次序列化完成后都做remove操作。
  3. 我看项目中实现了以注解的方式发布服务,主要原理是用了SpringBeanPostProcessor对实例做了后置处理,但是SpringBeanPostProcessor能注册到容器原因是,你的示例项目example-server中NettyServerMain类中含有@componentscan注解,扫描的包名与rpc-framework-simple项目相同而已,这显然是不合理的。
  4. 许多资源的创建没有加锁,比如github.javaguide.registry.zk.util.CuratorUtils#getZkClient。当然,这些可以靠spring完成。

个人的一些理解,如有不足,还望指教

SPI实现的一点小疑问

image
我看这个 EXTENSION_INSTANCES 只是用来实例化,那么能不能改成这样?
难道只是因为 putIfAbsent 是原子性的?在进入方法之前,不也用 synchronized 加锁了?

    private T createExtension(String name) {
        // 从加载类缓存中获取 class 对象
        Class<?> clazz = getExtensionClasses().get(name);
        if (clazz == null) {
            throw new RuntimeException("No such extension of name " + name);
        }
        T instance = null;
        try {
            instance = (T) clazz.newInstance();
        } catch (InstantiationException | IllegalAccessException e) {
            log.error(e.getMessage());
        }
        return instance;
    }

问个问题

guide 教程可以开源吗。。。我就问问。。

@RpcScan相关的一个bug

运行项目的时候发现,虽然只定义了一个RpcService服务,但是log显示rpcServiceScanner扫描到了两个Bean。
image
是NettyRpcServer这个Bean被rpcServiceScanner扫描到了。但是,NettyRpcServer我觉得应该被springBeanScanner扫描到。这里被扫描到,是因为RpcScan定义扫描的包恰巧是github.javaguide。所以建议将springBeanScanner的扫描范围由:
"github.javaguide.spring" -> "github.javaguide"

发现个小问题

image
我跑这个项目是中文路径下,发现读rpc.properties文件的值读不到,然后打断点发现在PropertiesFileUtil里getResource()读取中文路径直接转义了,这个地方是不是加个转码更好呢

zookeeper版本和curator版本不匹配导致的问题

Zookeeper已启动
其它配置都和教程一致

服务端端运行状态:
[main-SendThread(127.0.0.1:2181)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
[main-SendThread(127.0.0.1:2181)] INFO org.apache.zookeeper.ClientCnxn - Socket connection established, initiating session, client: /127.0.0.1:59179, server: 127.0.0.1/127.0.0.1:2181
[main-SendThread(127.0.0.1:2181)] INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:2181, sessionid = 0x18318f179ba0006, negotiated timeout = 40000
[main-EventThread] INFO org.apache.curator.framework.state.ConnectionStateManager - State change: RECONNECTED

客户端运行状态:
[main-SendThread(127.0.0.1:2181)] INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:2181, sessionid = 0x18318f179ba0004, negotiated timeout = 40000
[main-EventThread] INFO org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
[main] ERROR github.javaguide.registry.zk.util.CuratorUtils - get children nodes for path [/my-rpc/github.javaguide.HelloServicetest1version1] fail
Exception in thread "main" github.javaguide.exception.RpcException: 没有找到指定的服务:github.javaguide.HelloServicetest1version1

RpcRequestHandler优化

想问下在RpcRequestHandler里对获取到的Method加个缓存会不会有性能上的提升/会不会有什么副作用?反射调用的invoke性能到底如何呢?

一致性哈希负载均衡器中,可以使用服务名的哈希值作为选择服务端地址的 key 吗?

如题:一致性哈希负载均衡器中,可以使用服务名的哈希值作为选择服务端地址的 key 吗?

Dubbo 中使用调用参数进行散列作为 key,保证相同参数的请求总是发到同一提供者。

如果使用服务名称散列后的值做为 key 的话,会不会导致不同客户端对同服务的所有请求都落在一个服务器上?

请问自定义RPC协议的设计思路是如何?

codec包中的编码器解码器是根据这个自定义协议设计的,那这个自定义协议如果让我自己设计,思路应该是怎样的?字段、长度等方面的考量

指的是以下部分
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
+-----+-----+-----+-----+--------+----+----+----+------+-----------+-------+----- --+-----+-----+-------+
| magic code |version | full length | messageType| codec|compress| RequestId |
+-----------------------+--------+---------------------+-----------+-----------+-----------+------------+
| |
| body |
| |
| ... ... |
+-------------------------------------------------------------------------------------------------------+
这个部分 这里没有对齐请谅解

zookeeper 的 watch 机制导致的问题

guide 哥我在复习 zookeeper 的时候发现它的 watcher 机制会在触发一次后就删除掉监听器,也就是说现在只能对节点的一次改变进行监听,第二次就不生效了。
目前有两种解决方案:

  1. 在回调函数里再次注册一个新的监听器
  2. 删除本地缓存的服务表,这样下一次去读取服务列表的时候就会重新去 zookeeper 拉最新数据,类似 Redis 和 MySQL 做双写一致性时的操作
PathChildrenCacheListener pathChildrenCacheListener = (curatorFramework, pathChildCacheEvent) -> {
    // List<String> serviceAddresses = curatorFramework.getChildren().forPath(servicePath);
    // SERVICE_ADDRESS_MAP.put(rpcServiceName, serviceAddresses);
    // 删除缓存以便下次从 zookeeper 拉取最新数据
    SERVICE_ADDRESS_MAP.remove(rpcServiceName);
};

关于SingletonFactory的一些问题,是否写成这样会更好?

当前master的SingletonFactory代码是这样的:

public final class SingletonFactory {
    private static final Map<String, Object> OBJECT_MAP = new HashMap<>();

    private SingletonFactory() {
    }

    public static <T> T getInstance(Class<T> c) {
        String key = c.toString();
        Object instance;
        synchronized (SingletonFactory.class) {
            instance = OBJECT_MAP.get(key);
            if (instance == null) {
                try {
                    instance = c.getDeclaredConstructor().newInstance();
                    OBJECT_MAP.put(key, instance);
                } catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
                    throw new RuntimeException(e.getMessage(), e);
                }
            }
        }
        return c.cast(instance);
    }
}

我很疑惑为什么所有单例的对象需要共享同一把锁?他们之间也互不冲突。

其次,代码中似乎没有体现出double check,目前master中代码的逻辑中,无论OBJECT_MAP中是否已经存在目标单例对象,都需要先与其他线程竞争锁,这一点是不是不太合理?

基于这些存在的问题, 是否改成下面的代码会更好?

public class SingletonFactory {
    private static final Map<String, Object> SINGLETON_MAP = new ConcurrentHashMap<>();
    private static final Map<Class<?>, Object> LOCK_MAP = new ConcurrentHashMap<>();

    private SingletonFactory() {
    }

    /**
     * Class must have no-arguments constructor.
     */
    public static <T> T getInstance(Class<T> clazz) {
        if (clazz == null) {
            throw new IllegalArgumentException();
        }
        String key = clazz.toString();
        Object instance = SINGLETON_MAP.get(key);
        if (instance == null) {
            synchronized (LOCK_MAP.computeIfAbsent(clazz, k -> new Object())) {
                instance = SINGLETON_MAP.get(key);
                if (instance == null) {
                    try {
                        instance = clazz.getDeclaredConstructor().newInstance();
                        SINGLETON_MAP.put(key, instance);
                    } catch (InstantiationException | InvocationTargetException | NoSuchMethodException
                            | IllegalAccessException e) {
                        throw new RuntimeException(e.getMessage(), e);
                    }
                }
            }
        }
        return clazz.cast(instance);
    }
}

一致性hash中,相同入参无法路由到同一个服务

return selector.select(rpcServiceName + Arrays.stream(rpcRequest.getParameters()));

Arrays.stream() 每次调用都会返回一个新对象,即使入参相同。如下图所示:
image

客户端调用,如下图所示:
image

是否可以直接将参数按顺序拼接呢,个人的一些思考,还望指教

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.