Coder Social home page Coder Social logo

Comments (13)

mp911de avatar mp911de commented on May 24, 2024

UPDATE distributed_lock SET lock_value = ?, expire = ? WHERE distributed_lock.lock_key = ? doesn't seem to use database-specific locks. Lock wait timeout exceeded; try restarting transaction indicates that another transaction is ongoing and that the current connection didn't get a lock to update the row.

Have you checked what transaction is holding the lock?

from spring-data-examples.

funky-eyes avatar funky-eyes commented on May 24, 2024

UPDATE distributed_lock SET lock_value = ?, expire = ? WHERE distributed_lock.lock_key = ? doesn't seem to use database-specific locks. Lock wait timeout exceeded; try restarting transaction indicates that another transaction is ongoing and that the current connection didn't get a lock to update the row.

Have you checked what transaction is holding the lock?

distributedLockRepository.findByLockKey(distributedLockDO.getLockKey()) The lock is held, which means that his transaction is not maintained to the update sql block,or it's not the same transaction as the update transaction

from spring-data-examples.

funky-eyes avatar funky-eyes commented on May 24, 2024

'for update' and 'update' are not connected in a transaction, which causes 'update' to time out while waiting for an x-lock on the database, how do I get 'select for update' and 'update' to stay in the same transaction? @mp911de

from spring-data-examples.

mp911de avatar mp911de commented on May 24, 2024

Make sure that you have a single connection factory to obtain connections from. Any routing might cause interferences.

from spring-data-examples.

funky-eyes avatar funky-eyes commented on May 24, 2024

Make sure that you have a single connection factory to obtain connections from. Any routing might cause interferences.

You can try this example, the transaction is invalid

from spring-data-examples.

funky-eyes avatar funky-eyes commented on May 24, 2024
@ConditionalOnBean(DatabaseClient.class)
@Component
public class R2dbcDistributedLockerDAO implements DistributedLocker {
    private static final Logger LOGGER = LoggerFactory.getLogger(R2dbcDistributedLockerDAO.class);

    private final BeanCopier distributedLockDOToEntity = BeanCopier.create(DistributedLockDO.class, DistributedLock.class, false);

    @Resource
    private DistributedLockRepository distributedLockRepository;

    @Resource
    private TransactionalOperator operator;

    /**
     * Instantiates a new Log store data base dao.
     */
    public R2dbcDistributedLockerDAO() {
    }

    @Override
    public boolean acquireLock(DistributedLockDO distributedLockDO) {
        try {
            return Boolean.TRUE.equals(distributedLockRepository.findByLockKey(distributedLockDO.getLockKey())
                .publishOn(Schedulers.boundedElastic()).map(distributedLock -> {
                    if (distributedLock != null && StringUtils.isNotBlank(distributedLock.getLockValue())
                        && !StringUtils.equals(distributedLock.getLockValue(), distributedLockDO.getLockValue())
                        && System.currentTimeMillis() < distributedLock.getExpireTime()) {
                        return false;
                    }
                    distributedLockDO.setExpireTime(distributedLockDO.getExpireTime() + System.currentTimeMillis());
                    if (distributedLock != null) {
                        if (!StringUtils.equals(distributedLock.getLockValue(), distributedLockDO.getLockValue())) {
                            distributedLock.setLockValue(distributedLockDO.getLockValue());
                        }
                        distributedLock.setNewLock(false);
                        return distributedLockRepository.save(distributedLock).block() != null;
                    }
                    distributedLock = new DistributedLock();
                    distributedLockDOToEntity.copy(distributedLockDO, distributedLock, null);
                    return distributedLockRepository.save(distributedLock).block() != null;
                }).as(operator::transactional).block());
        } catch (R2dbcDataIntegrityViolationException e) {
            // being scrambled by other threads to succeed
            return false;
        }
    }
}
@SpringBootApplication( exclude = R2dbcAutoConfiguration.class)
public class ServerApplication {
    public static void main(String[] args) throws IOException {
        // run the spring-boot application
        SpringApplication.run(ServerApplication.class, args);
    }
}
@Configuration
@Import(R2dbcDataAutoConfiguration.class)
public class R2dbcAutoConfiguration {
}
@Configuration
@EnableConfigurationProperties(R2dbcProperties.class)
@AutoConfigureBefore(R2dbcAutoConfiguration.class)
public class R2dbcConfiguration extends AbstractDataSourceProvider {

    @Bean
    public DatabaseClient databaseClient(ConnectionFactory connectionFactory) {
        R2dbcDialect dialect = DialectResolver.getDialect(connectionFactory);
        return DatabaseClient.builder().connectionFactory(connectionFactory)
            .bindMarkers(dialect.getBindMarkersFactory()).build();
    }

    @Bean
    public ReactiveTransactionManager reactiveTransactionManager(ConnectionFactory connectionFactory) {
        return new R2dbcTransactionManager(connectionFactory);
    }

    @Bean
    public R2dbcEntityTemplate r2dbcEntityTemplate(DatabaseClient databaseClient) {
        R2dbcDialect dialect = DialectResolver.getDialect(databaseClient.getConnectionFactory());
        return new R2dbcEntityTemplate(databaseClient, dialect);
    }

    @Bean
    public ConnectionPool connectionFactory(R2dbcProperties r2dbcProperties) {
        String url = getUrl();
        ConnectionInfo connectionInfo = URLParser.parser(url);
        String[] dbPeer = connectionInfo.getDbPeer().split(":");
        String host = dbPeer[0];
        int port = Integer.parseInt(dbPeer[1]);
        ConnectionFactoryOptions.Builder options = ConnectionFactoryOptions.builder()
            .option(DRIVER, getDBType().name().toLowerCase()).option(HOST, host).option(USER, getUser())
            .option(PORT, port).option(PASSWORD, getPassword()).option(DATABASE, connectionInfo.getDbInstance())
            .option(CONNECT_TIMEOUT, Duration.ofMillis(getMaxWait()));
        String paramUrl = url.substring(url.indexOf("?") + 1);
        if (StringUtils.isNotBlank(paramUrl)) {
            String useSSL = "useSSL";
            if (paramUrl.contains(useSSL)) {
                String[] params = paramUrl.split("&");
                for (String param : params) {
                    if (param.contains(useSSL)) {
                        options.option(SSL, Boolean.parseBoolean(param.split("=")[1]));
                        break;
                    }
                }
            }
        }
        ConnectionFactory connectionFactory = ConnectionFactories.get(options.build());
        R2dbcProperties.Pool pool = r2dbcProperties.getPool();
        PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
        ConnectionPoolConfiguration.Builder builder = ConnectionPoolConfiguration.builder(connectionFactory);
        map.from(Duration.ofMillis(getMaxWait())).to(builder::maxIdleTime);
        map.from(pool.getMaxAcquireTime()).to(builder::maxAcquireTime);
        map.from(pool.getMaxCreateConnectionTime()).to(builder::maxCreateConnectionTime);
        map.from(getMinConn()).to(builder::initialSize);
        map.from(getMaxConn()).to(builder::maxSize);
        map.from(pool.getValidationQuery()).whenHasText().to(builder::validationQuery);
        map.from(pool.getValidationDepth()).to(builder::validationDepth);
        return new ConnectionPool(builder.build());
    }

    @Bean
    public R2dbcMappingContext r2dbcMappingContext(ObjectProvider<NamingStrategy> namingStrategy,
        R2dbcCustomConversions r2dbcCustomConversions) {
        R2dbcMappingContext relationalMappingContext =
            new R2dbcMappingContext(namingStrategy.getIfAvailable(() -> NamingStrategy.INSTANCE));
        relationalMappingContext.setSimpleTypeHolder(r2dbcCustomConversions.getSimpleTypeHolder());
        return relationalMappingContext;
    }

    @Bean
    public MappingR2dbcConverter r2dbcConverter(R2dbcMappingContext mappingContext,
        R2dbcCustomConversions r2dbcCustomConversions) {
        return new MappingR2dbcConverter(mappingContext, r2dbcCustomConversions);
    }

    @Bean
    public R2dbcCustomConversions r2dbcCustomConversions(ConnectionFactory connectionFactory) {
        R2dbcDialect dialect = DialectResolver.getDialect(connectionFactory);
        List<Object> converters = new ArrayList<>(dialect.getConverters());
        converters.addAll(R2dbcCustomConversions.STORE_CONVERTERS);
        return new R2dbcCustomConversions(
            CustomConversions.StoreConversions.of(dialect.getSimpleTypeHolder(), converters), Collections.emptyList());
    }
}

apache/incubator-seata#4926

from spring-data-examples.

funky-eyes avatar funky-eyes commented on May 24, 2024

I changed the code to the following and the transaction took effect, I will continue to watch and learn tomorrow, thanks for your help @mp911de

    @Transactional
    public Mono<Boolean> acquireLock(DistributedLockDO distributedLockDO) {
        try {
            return Mono.from(connectionFactory.create()).flatMap(connection -> Mono.from(connection.beginTransaction())
                .then(distributedLockRepository.findByLockKey(distributedLockDO.getLockKey())).map(distributedLock -> {
                    if (distributedLock != null && StringUtils.isNotBlank(distributedLock.getLockValue())
                        && !StringUtils.equals(distributedLock.getLockValue(), distributedLockDO.getLockValue())
                        && System.currentTimeMillis() < distributedLock.getExpireTime()) {
                        return Mono.just(false);
                    }
                    distributedLockDO.setExpireTime(distributedLockDO.getExpireTime() + System.currentTimeMillis());
                    if (distributedLock != null) {
                        if (!StringUtils.equals(distributedLock.getLockValue(), distributedLockDO.getLockValue())) {
                            distributedLock.setLockValue(distributedLockDO.getLockValue());
                        }
                        distributedLock.setNewLock(false);
                        return distributedLockRepository.save(distributedLock).then(Mono.just(true));
                    }
                    distributedLock = new DistributedLock();
                    distributedLockDOToEntity.copy(distributedLockDO, distributedLock, null);
                    return distributedLockRepository.save(distributedLock).then(Mono.just(true));
                }).flatMap(Mono::from)
                .delayUntil(bool -> bool ? connection.commitTransaction() : connection.rollbackTransaction())
                .doFinally(c -> connection.close()));
        } catch (R2dbcDataIntegrityViolationException e) {
            // being scrambled by other threads to succeed
            return Mono.just(false);
        }
    }

from spring-data-examples.

funky-eyes avatar funky-eyes commented on May 24, 2024

Changing the above code to use TransactionalOperator will invalidate the transaction

            return Boolean.TRUE
                .equals(distributedLockRepository.findByLockKey(distributedLockDO.getLockKey()).map(distributedLock -> {
                    if (distributedLock != null && StringUtils.isNotBlank(distributedLock.getLockValue())
                        && !StringUtils.equals(distributedLock.getLockValue(), distributedLockDO.getLockValue())
                        && System.currentTimeMillis() < distributedLock.getExpireTime()) {
                        return Mono.just(false);
                    }
                    distributedLockDO.setExpireTime(distributedLockDO.getExpireTime() + System.currentTimeMillis());
                    if (distributedLock != null) {
                        if (!StringUtils.equals(distributedLock.getLockValue(), distributedLockDO.getLockValue())) {
                            distributedLock.setLockValue(distributedLockDO.getLockValue());
                        }
                        distributedLock.setNewLock(false);
                        return distributedLockRepository.save(distributedLock).then(Mono.just(true));
                    }
                    distributedLock = new DistributedLock();
                    distributedLockDOToEntity.copy(distributedLockDO, distributedLock, null);
                    return distributedLockRepository.save(distributedLock).then(Mono.just(true));
                }).flatMap(Mono::from).as(operator::transactional).block());
 Lock wait timeout exceeded; try restarting transaction
	at dev.miku.r2dbc.mysql.ExceptionFactory.createException(ExceptionFactory.java:69)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
	*__checkpoint ⇢ SQL "UPDATE distributed_lock SET lock_value = ?, expire = ? WHERE distributed_lock.lock_key = ?" [DatabaseClient]
Original Stack Trace:
		at dev.miku.r2dbc.mysql.ExceptionFactory.createException(ExceptionFactory.java:69)
		at dev.miku.r2dbc.mysql.TextQueryHandler.accept(QueryFlow.java:317)
		at dev.miku.r2dbc.mysql.TextQueryHandler.accept(QueryFlow.java:292)
		at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:176)
		at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)

from spring-data-examples.

funky-eyes avatar funky-eyes commented on May 24, 2024

The 2 example transactions above both fail except that they no longer output 'Lock wait timeout exceeded'
Can you give me an example of 'select for update ' combined with ' update/save'?
I need the for update to hold the x lock on the database to ensure that my update is correct
@mp911de

from spring-data-examples.

funky-eyes avatar funky-eyes commented on May 24, 2024

Can anyone tell me what to do for this application scenario

from spring-data-examples.

funky-eyes avatar funky-eyes commented on May 24, 2024

help

from spring-data-examples.

mp911de avatar mp911de commented on May 24, 2024

return distributedLockRepository.save(distributedLock).block() does not participate within a transaction because there's no context propagation. I also wonder why you use R2DBC if your calling code is blocking. Either rewrite everything to return Mono/Flux or use JDBC. Never call .block as that is the source of your transaction context issues.

from spring-data-examples.

funky-eyes avatar funky-eyes commented on May 24, 2024

return distributedLockRepository.save(distributedLock).block() does not participate within a transaction because there's no context propagation. I also wonder why you use R2DBC if your calling code is blocking. Either rewrite everything to return Mono/Flux or use JDBC. Never call .block as that is the source of your transaction context issues.

In a transaction, there is inherently the possibility of relying on query results, so how can you ensure that data is not read dirty if it is not in the same transaction? Can you tell me how to have both query and insert actions in a single transaction in r2dbc?

from spring-data-examples.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.