Comments (13)
UPDATE distributed_lock SET lock_value = ?, expire = ? WHERE distributed_lock.lock_key = ?
doesn't seem to use database-specific locks. Lock wait timeout exceeded; try restarting transaction
indicates that another transaction is ongoing and that the current connection didn't get a lock to update the row.
Have you checked what transaction is holding the lock?
from spring-data-examples.
UPDATE distributed_lock SET lock_value = ?, expire = ? WHERE distributed_lock.lock_key = ?
doesn't seem to use database-specific locks.Lock wait timeout exceeded; try restarting transaction
indicates that another transaction is ongoing and that the current connection didn't get a lock to update the row.Have you checked what transaction is holding the lock?
distributedLockRepository.findByLockKey(distributedLockDO.getLockKey()) The lock is held, which means that his transaction is not maintained to the update sql block,or it's not the same transaction as the update transaction
from spring-data-examples.
'for update' and 'update' are not connected in a transaction, which causes 'update' to time out while waiting for an x-lock on the database, how do I get 'select for update' and 'update' to stay in the same transaction? @mp911de
from spring-data-examples.
Make sure that you have a single connection factory to obtain connections from. Any routing might cause interferences.
from spring-data-examples.
Make sure that you have a single connection factory to obtain connections from. Any routing might cause interferences.
You can try this example, the transaction is invalid
from spring-data-examples.
@ConditionalOnBean(DatabaseClient.class)
@Component
public class R2dbcDistributedLockerDAO implements DistributedLocker {
private static final Logger LOGGER = LoggerFactory.getLogger(R2dbcDistributedLockerDAO.class);
private final BeanCopier distributedLockDOToEntity = BeanCopier.create(DistributedLockDO.class, DistributedLock.class, false);
@Resource
private DistributedLockRepository distributedLockRepository;
@Resource
private TransactionalOperator operator;
/**
* Instantiates a new Log store data base dao.
*/
public R2dbcDistributedLockerDAO() {
}
@Override
public boolean acquireLock(DistributedLockDO distributedLockDO) {
try {
return Boolean.TRUE.equals(distributedLockRepository.findByLockKey(distributedLockDO.getLockKey())
.publishOn(Schedulers.boundedElastic()).map(distributedLock -> {
if (distributedLock != null && StringUtils.isNotBlank(distributedLock.getLockValue())
&& !StringUtils.equals(distributedLock.getLockValue(), distributedLockDO.getLockValue())
&& System.currentTimeMillis() < distributedLock.getExpireTime()) {
return false;
}
distributedLockDO.setExpireTime(distributedLockDO.getExpireTime() + System.currentTimeMillis());
if (distributedLock != null) {
if (!StringUtils.equals(distributedLock.getLockValue(), distributedLockDO.getLockValue())) {
distributedLock.setLockValue(distributedLockDO.getLockValue());
}
distributedLock.setNewLock(false);
return distributedLockRepository.save(distributedLock).block() != null;
}
distributedLock = new DistributedLock();
distributedLockDOToEntity.copy(distributedLockDO, distributedLock, null);
return distributedLockRepository.save(distributedLock).block() != null;
}).as(operator::transactional).block());
} catch (R2dbcDataIntegrityViolationException e) {
// being scrambled by other threads to succeed
return false;
}
}
}
@SpringBootApplication( exclude = R2dbcAutoConfiguration.class)
public class ServerApplication {
public static void main(String[] args) throws IOException {
// run the spring-boot application
SpringApplication.run(ServerApplication.class, args);
}
}
@Configuration
@Import(R2dbcDataAutoConfiguration.class)
public class R2dbcAutoConfiguration {
}
@Configuration
@EnableConfigurationProperties(R2dbcProperties.class)
@AutoConfigureBefore(R2dbcAutoConfiguration.class)
public class R2dbcConfiguration extends AbstractDataSourceProvider {
@Bean
public DatabaseClient databaseClient(ConnectionFactory connectionFactory) {
R2dbcDialect dialect = DialectResolver.getDialect(connectionFactory);
return DatabaseClient.builder().connectionFactory(connectionFactory)
.bindMarkers(dialect.getBindMarkersFactory()).build();
}
@Bean
public ReactiveTransactionManager reactiveTransactionManager(ConnectionFactory connectionFactory) {
return new R2dbcTransactionManager(connectionFactory);
}
@Bean
public R2dbcEntityTemplate r2dbcEntityTemplate(DatabaseClient databaseClient) {
R2dbcDialect dialect = DialectResolver.getDialect(databaseClient.getConnectionFactory());
return new R2dbcEntityTemplate(databaseClient, dialect);
}
@Bean
public ConnectionPool connectionFactory(R2dbcProperties r2dbcProperties) {
String url = getUrl();
ConnectionInfo connectionInfo = URLParser.parser(url);
String[] dbPeer = connectionInfo.getDbPeer().split(":");
String host = dbPeer[0];
int port = Integer.parseInt(dbPeer[1]);
ConnectionFactoryOptions.Builder options = ConnectionFactoryOptions.builder()
.option(DRIVER, getDBType().name().toLowerCase()).option(HOST, host).option(USER, getUser())
.option(PORT, port).option(PASSWORD, getPassword()).option(DATABASE, connectionInfo.getDbInstance())
.option(CONNECT_TIMEOUT, Duration.ofMillis(getMaxWait()));
String paramUrl = url.substring(url.indexOf("?") + 1);
if (StringUtils.isNotBlank(paramUrl)) {
String useSSL = "useSSL";
if (paramUrl.contains(useSSL)) {
String[] params = paramUrl.split("&");
for (String param : params) {
if (param.contains(useSSL)) {
options.option(SSL, Boolean.parseBoolean(param.split("=")[1]));
break;
}
}
}
}
ConnectionFactory connectionFactory = ConnectionFactories.get(options.build());
R2dbcProperties.Pool pool = r2dbcProperties.getPool();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
ConnectionPoolConfiguration.Builder builder = ConnectionPoolConfiguration.builder(connectionFactory);
map.from(Duration.ofMillis(getMaxWait())).to(builder::maxIdleTime);
map.from(pool.getMaxAcquireTime()).to(builder::maxAcquireTime);
map.from(pool.getMaxCreateConnectionTime()).to(builder::maxCreateConnectionTime);
map.from(getMinConn()).to(builder::initialSize);
map.from(getMaxConn()).to(builder::maxSize);
map.from(pool.getValidationQuery()).whenHasText().to(builder::validationQuery);
map.from(pool.getValidationDepth()).to(builder::validationDepth);
return new ConnectionPool(builder.build());
}
@Bean
public R2dbcMappingContext r2dbcMappingContext(ObjectProvider<NamingStrategy> namingStrategy,
R2dbcCustomConversions r2dbcCustomConversions) {
R2dbcMappingContext relationalMappingContext =
new R2dbcMappingContext(namingStrategy.getIfAvailable(() -> NamingStrategy.INSTANCE));
relationalMappingContext.setSimpleTypeHolder(r2dbcCustomConversions.getSimpleTypeHolder());
return relationalMappingContext;
}
@Bean
public MappingR2dbcConverter r2dbcConverter(R2dbcMappingContext mappingContext,
R2dbcCustomConversions r2dbcCustomConversions) {
return new MappingR2dbcConverter(mappingContext, r2dbcCustomConversions);
}
@Bean
public R2dbcCustomConversions r2dbcCustomConversions(ConnectionFactory connectionFactory) {
R2dbcDialect dialect = DialectResolver.getDialect(connectionFactory);
List<Object> converters = new ArrayList<>(dialect.getConverters());
converters.addAll(R2dbcCustomConversions.STORE_CONVERTERS);
return new R2dbcCustomConversions(
CustomConversions.StoreConversions.of(dialect.getSimpleTypeHolder(), converters), Collections.emptyList());
}
}
from spring-data-examples.
I changed the code to the following and the transaction took effect, I will continue to watch and learn tomorrow, thanks for your help @mp911de
@Transactional
public Mono<Boolean> acquireLock(DistributedLockDO distributedLockDO) {
try {
return Mono.from(connectionFactory.create()).flatMap(connection -> Mono.from(connection.beginTransaction())
.then(distributedLockRepository.findByLockKey(distributedLockDO.getLockKey())).map(distributedLock -> {
if (distributedLock != null && StringUtils.isNotBlank(distributedLock.getLockValue())
&& !StringUtils.equals(distributedLock.getLockValue(), distributedLockDO.getLockValue())
&& System.currentTimeMillis() < distributedLock.getExpireTime()) {
return Mono.just(false);
}
distributedLockDO.setExpireTime(distributedLockDO.getExpireTime() + System.currentTimeMillis());
if (distributedLock != null) {
if (!StringUtils.equals(distributedLock.getLockValue(), distributedLockDO.getLockValue())) {
distributedLock.setLockValue(distributedLockDO.getLockValue());
}
distributedLock.setNewLock(false);
return distributedLockRepository.save(distributedLock).then(Mono.just(true));
}
distributedLock = new DistributedLock();
distributedLockDOToEntity.copy(distributedLockDO, distributedLock, null);
return distributedLockRepository.save(distributedLock).then(Mono.just(true));
}).flatMap(Mono::from)
.delayUntil(bool -> bool ? connection.commitTransaction() : connection.rollbackTransaction())
.doFinally(c -> connection.close()));
} catch (R2dbcDataIntegrityViolationException e) {
// being scrambled by other threads to succeed
return Mono.just(false);
}
}
from spring-data-examples.
Changing the above code to use TransactionalOperator will invalidate the transaction
return Boolean.TRUE
.equals(distributedLockRepository.findByLockKey(distributedLockDO.getLockKey()).map(distributedLock -> {
if (distributedLock != null && StringUtils.isNotBlank(distributedLock.getLockValue())
&& !StringUtils.equals(distributedLock.getLockValue(), distributedLockDO.getLockValue())
&& System.currentTimeMillis() < distributedLock.getExpireTime()) {
return Mono.just(false);
}
distributedLockDO.setExpireTime(distributedLockDO.getExpireTime() + System.currentTimeMillis());
if (distributedLock != null) {
if (!StringUtils.equals(distributedLock.getLockValue(), distributedLockDO.getLockValue())) {
distributedLock.setLockValue(distributedLockDO.getLockValue());
}
distributedLock.setNewLock(false);
return distributedLockRepository.save(distributedLock).then(Mono.just(true));
}
distributedLock = new DistributedLock();
distributedLockDOToEntity.copy(distributedLockDO, distributedLock, null);
return distributedLockRepository.save(distributedLock).then(Mono.just(true));
}).flatMap(Mono::from).as(operator::transactional).block());
Lock wait timeout exceeded; try restarting transaction
at dev.miku.r2dbc.mysql.ExceptionFactory.createException(ExceptionFactory.java:69)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint ⇢ SQL "UPDATE distributed_lock SET lock_value = ?, expire = ? WHERE distributed_lock.lock_key = ?" [DatabaseClient]
Original Stack Trace:
at dev.miku.r2dbc.mysql.ExceptionFactory.createException(ExceptionFactory.java:69)
at dev.miku.r2dbc.mysql.TextQueryHandler.accept(QueryFlow.java:317)
at dev.miku.r2dbc.mysql.TextQueryHandler.accept(QueryFlow.java:292)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:176)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
from spring-data-examples.
The 2 example transactions above both fail except that they no longer output 'Lock wait timeout exceeded'
Can you give me an example of 'select for update ' combined with ' update/save'?
I need the for update to hold the x lock on the database to ensure that my update is correct
@mp911de
from spring-data-examples.
Can anyone tell me what to do for this application scenario
from spring-data-examples.
help
from spring-data-examples.
return distributedLockRepository.save(distributedLock).block()
does not participate within a transaction because there's no context propagation. I also wonder why you use R2DBC if your calling code is blocking. Either rewrite everything to return Mono
/Flux
or use JDBC. Never call .block
as that is the source of your transaction context issues.
from spring-data-examples.
return distributedLockRepository.save(distributedLock).block()
does not participate within a transaction because there's no context propagation. I also wonder why you use R2DBC if your calling code is blocking. Either rewrite everything to returnMono
/Flux
or use JDBC. Never call.block
as that is the source of your transaction context issues.
In a transaction, there is inherently the possibility of relying on query results, so how can you ensure that data is not read dirty if it is not in the same transaction? Can you tell me how to have both query and insert actions in a single transaction in r2dbc?
from spring-data-examples.
Related Issues (20)
- @Query support any parser, for example. HOT 4
- Add transactions example for couchbase HOT 1
- couchbase transactions example needs to be on 3.0.0-RC2 as reactor in 3.0.0-SNAPSHOT introduced a regression HOT 1
- Additions to couchbase transactions example HOT 1
- Upgrade to Spring Boot 3.0.2
- Upgrade to Spring Boot 3.0 GA
- spring boot data jdbc 2.7.6 saveAll as batch insert HOT 1
- Upgrade to Spring Boot 3.1.0-RC1
- Update the Elasticsearch setup to the current version 8 of Elasticsearch. HOT 2
- Migrate Couchbase tests to JUnit 5
- Add examples using Virtual Threads
- Create an example for Schema Generation in Spring Data JDBC
- Upgrade to Spring Boot 3.2
- Add Kotlin value classes sample
- Update examples to demonstrate usage of Limit parameter.
- Connect to Huawei Cloud guassDB HOT 1
- Add Example for Multi Tenancy using Parent-Child relationship HOT 1
- Error when running jpa/envers: Illegal argument on static metamodel field injection HOT 2
- Add example of Kotlin Coroutine and Reactive MongoDB mixture HOT 3
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from spring-data-examples.