Coder Social home page Coder Social logo

Comments (5)

Snailclimb avatar Snailclimb commented on June 12, 2024

https://github.com/Snailclimb/guide-rpc-framework/blob/master/rpc-framework-simple/src/main/java/github/javaguide/serialize/protostuff/ProtostuffSerializer.java

/**
 * Avoid re applying buffer space every time serialization
 */
private static final LinkedBuffer BUFFER = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);

@Override
public byte[] serialize(Object obj) {
    Class<?> clazz = obj.getClass();
    Schema schema = RuntimeSchema.getSchema(clazz);
    byte[] bytes;
    try {
        bytes = ProtostuffIOUtil.toByteArray(obj, schema, BUFFER);
    } finally {
        BUFFER.clear();
    }
    return bytes;
}

@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
    Schema<T> schema = RuntimeSchema.getSchema(clazz);
    T obj = schema.newMessage();
    ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
    return obj;
}

1,其中LinkedBuffer 为静态,在并发情况下其中一个线程执行这个方法BUFFER.clear(),其他的是否会报错
2,Schema schema 这个可以用一个ConcurrentHashMap 存储起来,性能会更好
下面是我修改的代码:

private static Map<Class<?>, Schema<?>> schemaMap = new ConcurrentHashMap<>();
private static <T>Schema<T> getSchema(Class<?> clazz) {
    if (schemaMap.containsKey(clazz)) {
        return (Schema<T>)schemaMap.get(clazz);
    }
    Schema schema = RuntimeSchema.getSchema(clazz);
    // 双重检查
    if (schema != null) {
        schemaMap.put(clazz, schema);
    }
    return schema;
}

@Override
public byte[] serialize(Object object) {
    Class<?> clazz = object.getClass();
    LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
    try {
        Schema schema = getSchema(clazz);
        return ProtostuffIOUtil.toByteArray(object, schema, buffer);
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    } finally {
        buffer.clear();
    }
}

@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
    Schema<T> schema = getSchema(clazz);
    T obj = schema.newMessage();
    ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
    return obj;
}

老哥 可以提交一个pr么?

from guide-rpc-framework.

ChoKhoOu avatar ChoKhoOu commented on June 12, 2024

https://github.com/Snailclimb/guide-rpc-framework/blob/master/rpc-framework-simple/src/main/java/github/javaguide/serialize/protostuff/ProtostuffSerializer.java

/**
 * Avoid re applying buffer space every time serialization
 */
private static final LinkedBuffer BUFFER = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);

@Override
public byte[] serialize(Object obj) {
    Class<?> clazz = obj.getClass();
    Schema schema = RuntimeSchema.getSchema(clazz);
    byte[] bytes;
    try {
        bytes = ProtostuffIOUtil.toByteArray(obj, schema, BUFFER);
    } finally {
        BUFFER.clear();
    }
    return bytes;
}

@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
    Schema<T> schema = RuntimeSchema.getSchema(clazz);
    T obj = schema.newMessage();
    ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
    return obj;
}

1,其中LinkedBuffer 为静态,在并发情况下其中一个线程执行这个方法BUFFER.clear(),其他的是否会报错
2,Schema schema 这个可以用一个ConcurrentHashMap 存储起来,性能会更好
下面是我修改的代码:

private static Map<Class<?>, Schema<?>> schemaMap = new ConcurrentHashMap<>();
private static <T>Schema<T> getSchema(Class<?> clazz) {
    if (schemaMap.containsKey(clazz)) {
        return (Schema<T>)schemaMap.get(clazz);
    }
    Schema schema = RuntimeSchema.getSchema(clazz);
    // 双重检查
    if (schema != null) {
        schemaMap.put(clazz, schema);
    }
    return schema;
}

@Override
public byte[] serialize(Object object) {
    Class<?> clazz = object.getClass();
    LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
    try {
        Schema schema = getSchema(clazz);
        return ProtostuffIOUtil.toByteArray(object, schema, buffer);
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    } finally {
        buffer.clear();
    }
}

@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
    Schema<T> schema = getSchema(clazz);
    T obj = schema.newMessage();
    ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
    return obj;
}

老哥 可以提交一个pr么?

private static final LinkedBuffer BUFFER = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);

guide哥,我想问下,这个buffer不会有线程安全问题么? 我看他内部好像没能保证线程安全

from guide-rpc-framework.

microhardsmith avatar microhardsmith commented on June 12, 2024

想到了同样的问题,这里肯定是有线程安全问题的

from guide-rpc-framework.

huangxiao1234 avatar huangxiao1234 commented on June 12, 2024

该问题我在压测负载均衡功能时也出现了

  1. 在多线程场景下,如果只有一个provider,那么所有并发请求都会交由同一个Channel进行处理,而Channel是天然线程安全的,一个Channel只会对应唯一一个线程进行执行,那么使用ProtostuffSerializer进行序列化时不会有线程安全问题
  2. 当有多个provider时,并发请求进来就会拿到多个Channel,从而多个Channel同时进行序列化,ProtostuffSerializer会出现线程安全的问题,如Buffer previously used and had not been reset.

对此,我的解决办法如下

public class ProtostuffSerializer implements Serializer {

    /**
     * Avoid re applying buffer space every time serialization
     *
     */
//    private static final LinkedBuffer BUFFER = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); 线程不安全
    private static final LinkedBuffer[] BUFFERS = new LinkedBuffer[]{LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE),LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE)};

    @Override
    public byte[] serialize(Object obj) {
        System.out.println(Thread.currentThread().getId());
        Class<?> clazz = obj.getClass();
        Schema schema = RuntimeSchema.getSchema(clazz);
        byte[] bytes;
        try {
            bytes = ProtostuffIOUtil.toByteArray(obj, schema, BUFFERS[(int) (Thread.currentThread().getId()%2)]);
        } finally {
            BUFFERS[(int) (Thread.currentThread().getId()%2)].clear();
        }
        return bytes;
    }

    @Override
    public <T> T deserialize(byte[] bytes, Class<T> clazz) {
        Schema<T> schema = RuntimeSchema.getSchema(clazz);
        T obj = schema.newMessage();
        ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
        return obj;
    }
}

from guide-rpc-framework.

zhaojiale1213 avatar zhaojiale1213 commented on June 12, 2024

双重检查一样会有线程安全问题,建议修改为computeIfAbsent()方法,会保证本次操作是线程安全的
1654051649(1)

from guide-rpc-framework.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.