cirello-io / dynamolock Goto Github PK
View Code? Open in Web Editor NEWDynamoDB Lock Client for Go
Home Page: https://godoc.org/cirello.io/dynamolock/v2
License: Apache License 2.0
DynamoDB Lock Client for Go
Home Page: https://godoc.org/cirello.io/dynamolock/v2
License: Apache License 2.0
Hi,
I was trying to get the contents of a lock without acquiring the lock, but found that when I tried to get the lock,
lock, err = client.Get(dynamoLockName)
I received a lock with no data.
I looked into Get
, and found that this branch was getting executed: https://github.com/cirello-io/dynamolock/blob/master/client.go#L906, because the lock is currently not held. Therefore, an empty lock is returned, with no data.
I'm happy to make a PR to fix this by removing the lockItem.isReleased
check, but wanted to confirm.
refer to #157 (comment)
Additional Attributes lost when we try to acquire lock again. Can add the following to client.go at line 359 to fix it :
if existingLock != nil {
for k, v := range existingLock.additionalAttributes {
item[k] = v
}
}
As mentioned in #151, I want to start a discussion about the CI/CD solution for this project (since Travis has been made unusable ๐ ). I know this repo is part of a bigger private monorepo, but I just assume we can implement CI for just this repo, or you can port over these changes to your own files as well (please correct me if I'm wrong tho!)
Service | Pricing | Limitations |
---|---|---|
CircleCI | Free for OSS | 2,5k credits, 1 Job at a time, no macOS |
Semaphore | Free for OSS | 1300 minutes a month, 1 job at a time |
JFrog Pipelines | Free tier | 2k minutes a month, feels very enterprisey |
I think these are the viable options when it comes to SaaS, there is also Buildkite and DroneCI which look quite powerful but they require at least some part to be hosted on self-managed infrastructure.
All services have a YAML-configuration and seem to support all our required features.
@ucirello WDYT?
Use case: I have short-lived processes where I can't really wait for my whole lease duration all at once and hope that I get the lock, even with, say, 1-minute leases. I'd rather check if key is locked and if it is, spin down the process/allow it to move on and then check again after lease duration if the rvn has changed. Basically what AcquireLock already does, but manually so I can do other stuff in between.
It seems that, since the API doesn't expose the record-version-number and lease duration, there is no way to do an AcquireLock with FailIfLocked and then manually manage checking to see if the lock has the same version after lease duration has passed. So that, if I want to wait for a lease to expire, the only choice is to block for lease duration in an AcquireLock call. Is that all correct? If so, maybe there could be a way to Acquire plus FailIfLocked plus retrieve some data that could be used to actually acquire the lock once the lease duration has passed.
Again, I might be able to get a PR, but would like some guidance before coding.
Hi,
I believe it would be nice to have a chan
based mechanism for being notified of lock expiration. This would make some use-cases slightly more usable by avoiding polling on lock.IsExpired()
.
I'll try to cook up a small PR to get the ball rolling.
Hi, can you add support to save leaseDuration
in int64 (time.Duration().Milliseconds()
) . This would make it compatible to Java lock client, since it is trying to fetch leaseDuration
as AtomicLong
and that is causing java.lang.NumberFormatException
Would it be possible to add something where we can set a ttl if we wanted to enable it? These locks just end up hanging around forever, it would be nice if we can use dynamodb ttl to clean em up.
Noticed in testing that the heartbeat can leak goroutines if checking locks is slow. In 'heartbeat' if the update to c.lastHeartbeat takes longer than 2*c.heartbeatPeriod, enforceHeartbeat will create another goroutine that will exist until all locks are removed
func (c *Client) enforceHeartbeat() {
if c.heartbeatPeriod == 0 {
return
}
c.mu.Lock()
defer c.mu.Unlock()
lastHeartbeat := c.lastHeartbeat
isHeartbeatDead := time.Since(lastHeartbeat) > 2*c.heartbeatPeriod
if isHeartbeatDead {
go c.heartbeat()
}
}
func (c *Client) heartbeat() {
c.logger.Println("starting heartbeats")
tick := time.NewTicker(c.heartbeatPeriod)
defer tick.Stop()
for range tick.C {
touchedAnyLock := false
c.locks.Range(func(_ interface{}, value interface{}) bool {
touchedAnyLock = true
lockItem := value.(*Lock)
if err := c.SendHeartbeat(value.(*Lock)); err != nil {
c.logger.Println("error sending heartbeat to", lockItem.partitionKey, ":", err)
}
return true
})
if !touchedAnyLock {
c.logger.Println("no locks in the client, stopping heartbeat")
return
}
c.mu.Lock()
c.lastHeartbeat = time.Now()
c.mu.Unlock()
}
}
Maybe I'm just doing something wrong. I've created a small program based on the example provided that acquire a lock, but it does not close the client because I want to control a program that runs in two seconds every 5 minutes or so but needs to lock the execution in other nodes during more than 5 minutes.
My plan was to create a lock with a lease of 6 minutes and check for the lock before start the execution of the rest of the code.
But in my tests, unreleased locks are left there and never expire:
first execution:
[main] 14:35:38 Starting client
[main] 14:35:38 Acquiring lock
[dyna] 14:35:38 Call GetItem to see if the lock for lock = test exists in the table
[dyna] 14:35:38 starting heartbeats
[dyna] 14:35:39 Lock <nil>
[dyna] 14:35:39 Captured new Lock
[dyna] 14:35:39 Acquiring a new lock or an existing yet released lock on lock = test
[main] 14:35:39 lock content:
[main] 14:35:39 lock owner: test owner
[main] 14:35:39 expired? false
[main] 14:35:39 done
next execution, the lock is in the DynamoDB table but it is not released and the lease is for 1m
./main
[main] 14:35:46 Starting client
[main] 14:35:46 Acquiring lock
[dyna] 14:35:46 Call GetItem to see if the lock for lock = test exists in the table
[dyna] 14:35:46 starting heartbeats
[dyna] 14:35:46 Lock &{{0 0} 0xc000196d00 test [] test owner false false <nil> {13786376802911714670 247280565 0xcea6a0} ZGNHp7OU5dRRDsHGz05NbxQ6mNq2Rab4 60000000000 map[]}
[dyna] 14:35:46 Existing Lock Owner: test owner
[dyna] 14:35:46 New Lock Owner: test owner
[dyna] 14:35:46 Existing Lock isExpired: false
[dyna] 14:35:46 Existing Lock lookupTime: 2019-09-01 14:35:46.758971758 +0200 CEST m=+0.247280565
[main] 14:35:46 Error acquiring lock: Didn't acquire lock because it is locked and request is configured not to retry.
This was expected but, next time I execute it the lock remains in the table, and unreselased. This is normal because there's no code runing to release it. The lease was for 1m but the execution happens 10 minutes after the creation of the lock:
./main
[main] 14:46:51 Starting client
[main] 14:46:51 Acquiring lock
[dyna] 14:46:51 starting heartbeats
[dyna] 14:46:51 Call GetItem to see if the lock for lock = test exists in the table
[dyna] 14:46:52 Lock &{{0 0} 0xc000198820 test [] test owner false false <nil> {13786377517353580211 328045090 0xcea6a0} ZGNHp7OU5dRRDsHGz05NbxQ6mNq2Rab4 60000000000 map[]}
[dyna] 14:46:52 Existing Lock Owner: test owner
[dyna] 14:46:52 New Lock Owner: test owner
[dyna] 14:46:52 Existing Lock isExpired: false
[dyna] 14:46:52 Existing Lock lookupTime: 2019-09-01 14:46:52.088782515 +0200 CEST m=+0.328045090
[main] 14:46:52 Error acquiring lock: Didn't acquire lock because it is locked and request is configured not to retry.
Looks to me that the library would consider locked any existing lock that has not been properly released, regardless of the lease time.
But perhaps I'm missing something.
My code:
package main
import (
"log"
"os"
"time"
"cirello.io/dynamolock"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
)
func main() {
log.SetPrefix("[main] ")
log.SetFlags(log.Ltime)
svc := dynamodb.New(session.Must(session.NewSession(&aws.Config{
Region: aws.String("eu-west-1"),
})))
log.Println("Starting client")
c, err := dynamolock.New(svc,
"Locks",
dynamolock.WithLeaseDuration(1*time.Minute),
dynamolock.WithLogger(log.New(os.Stdout, "[dyna] ", log.Ltime)),
dynamolock.WithPartitionKeyName("lock"),
dynamolock.WithOwnerName("test owner"),
)
if err != nil {
log.Fatal(err)
}
log.Println("Acquiring lock")
lockedItem, err := c.AcquireLock("test",
dynamolock.ReplaceData(),
dynamolock.FailIfLocked(),
)
if err != nil {
log.Fatal("Error acquiring lock: ", err)
}
log.Println("lock content:", string(lockedItem.Data()))
log.Println("lock owner:", lockedItem.OwnerName())
log.Println("expired? ", lockedItem.IsExpired())
log.Println("done")
}
When Load testing, stale additional attributes are saved instead of the updated ones when acquiring the lock, due to a custom update during the time between the getItem and the putItem. Instead, could just use an updateItem when acquiring the lock instead ?
I was wondering what the difference between v1 and v2 is.
Does v2 support optimistic locking as described here:
aws/aws-sdk-go#409
Iโm assuming v1 only supports pessimistic locking because that issue was closed.
I'd like to have my process abort cleanly (and swiftly) upon SIGINT, but Client.AcquireLock()
blocks for up to the lease time if the lock is already acquired (or it's stale).
At first I thought I could use FailIfLocked()
and implement my own retry loop, but then I got bit by #44 because the stale lock detection is bypassed in that case.
I don't entirely understand the recommendation in #84, because even with DisableHeartbeat()
a call to AcquireLock()
will block until the full lease duration has elapsed.
Then I thought perhaps I could call Client.Close()
to force the issue, however that merely blocks waiting for the mutex held by AcquireLock()
to be released.
The right solution would involve a new method like AcquireLockWithContext()
where I can pass in a context which is checked between each refresh (which in turn calls the GetContext()
variants of the DynamoDB API, such as DynamoDB.GetItemWithContext()
rather than DynamoDB.GetItem()
, and passes in this user-provided context so things don't block untowardly within the AWS SDK either), however I'd be happy with a simple kludge to interrupt AcquireLock()
even if it meant waiting the refresh period, which is a bit more tolerable than the entire lease timeout.
Any ideas?
Thanks!
Currently the v2 client prints everything using the same Println
function, no matter if the print is meant for debug, info or error. This makes it hard to filter significant log lines in production.
WDYT changing the ContextLogger
a bit so it can support printing status Info / Debug or Error?
Hi there!
I'm wondering why does the GetWithContext
always returns a ReadOnly
lock (updates RVN to "")? ๐ค
The docs says that I should be able to release a lock after a get operation (if the client owns lock and my owns), but based on my tests it's not true. Maybe I missed something?
My use case:
I'd like to hide the implementation details of DynamoLock
and create a function that based on the key will Get the lock and Release it.
I've used the sample code in the README file, and while running for the first time, I can see this error:
2020/11/09 19:06:16 ensuring table exists
2020/11/09 19:06:16 ResourceNotFoundException: Requested resource not found
exit status 1
By looking at the AWS Console, I can see that the DynamoDB table is still being created when that error comes up and a subsequent run is successful. I haven't looked into the code, however, I think we need the CreateTable
to wait until the create operation is complete before exiting.
Currently there is no way to check if a lock is being held using the existing library. To ensure there is a lock holder without trying to acquire the lock, we need to monitor if the record revision number has changed. Just checking if the lock is released is not sufficient since the leader could just step down without setting the isReleased
flag to true.
One way is to expose the record revision number in the Get
API like #178 so that the user can periodically calls get and check if the revision number has changed.
When a lock has both a heartbeat and a session monitor the lookupTime
variable can be both written and read by multiple threads, creating a race condition.
We are seeing a lot of lock release failures in our application. After turning on the logs and digging through the code, this problems seems to trace back to a bug in the heartbeat code. When all locks are returned and the heartbeat goroutine exits, if a lock is immediately acquired, the lastHeartbeat
is too recent to start another heartbeat. Because enforceHeartbeat()
is only called when a lock is acquired, that lock will potentially expire unless another lock is acquired that can start the heartbeat in time.
func (c *Client) enforceHeartbeat() {
if c.heartbeatPeriod == 0 {
return
}
c.mu.Lock()
defer c.mu.Unlock()
lastHeartbeat := c.lastHeartbeat
isHeartbeatDead := time.Since(lastHeartbeat) > 2*c.heartbeatPeriod
if isHeartbeatDead {
go c.heartbeat()
}
}
func (c *Client) heartbeat() {
c.logger.Println("starting heartbeats")
tick := time.NewTicker(c.heartbeatPeriod)
defer tick.Stop()
for range tick.C {
touchedAnyLock := false
c.locks.Range(func(_ interface{}, value interface{}) bool {
touchedAnyLock = true
lockItem := value.(*Lock)
if err := c.SendHeartbeat(value.(*Lock)); err != nil {
c.logger.Println("error sending heartbeat to", lockItem.partitionKey, ":", err)
}
return true
})
if !touchedAnyLock {
c.logger.Println("no locks in the client, stopping heartbeat")
return
}
c.mu.Lock()
c.lastHeartbeat = time.Now()
c.mu.Unlock()
}
}
This can possibly be fixed by resetting the lastHeartbeat
to 0 time before the heartbeat stops so that it doesn't prevent another heartbeat thread from starting up.
Hi,
I've started using the nice project and maybe I am getting some things wrong, but things are breaking for me. So I am using the Cobra library for building a CLI app and using dynamolock for locking/unlocking a critical section. The workflow then is as follows
my-command lock //acquire the lock
my-command unlock //release the lock
I was following the given example, and the locking part (when I call my-command lock
) is as follows:
sessionAWS := session.Must(session.NewSession(&aws.Config{
Region: aws.String("eu-north-1"),
}))
sessionDynamo := dynamodb.New(sessionAWS)
client, err := dynamolock.New(
sessionDynamo,
"my-table",
dynamolock.WithLeaseDuration(10*time.Second),
dynamolock.WithHeartbeatPeriod(2*time.Second),
dynamolock.WithOwnerName("abcd"),
)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
//Do not close the connection, as we need to keep the
//item locked. Close after successful unlocking.
//defer client.Close()
lock, err := client.AcquireLock(
"my-key",
dynamolock.FailIfLocked(),
)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
fmt.Printf("%s acquired the lock\n", lock.OwnerName())
And then, when I call my-command unlock
I want the lock to be released. The unlock part is as follows:
sessionAWS := session.Must(session.NewSession(&aws.Config{
Region: aws.String("eu-north-1"),
}))
sessionDynamo := dynamodb.New(sessionAWS)
client, err := dynamolock.New(
sessionDynamo,
"my-table",
dynamolock.WithLeaseDuration(10*time.Second),
dynamolock.WithHeartbeatPeriod(2*time.Second),
dynamolock.WithOwnerName("abcd"),
)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
lock, err := client.Get("my-key")
if err != nil {
fmt.Println(err)
os.Exit(1)
}
if lock.IsExpired() {
fmt.Println("lock is expired")
os.Exit(1)
}
ok, err := client.ReleaseLock(lock)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
if !ok {
fmt.Println("lost lock before releasing")
os.Exit(1)
}
err = client.Close()
if err != nil {
fmt.Println(err)
os.Exit(1)
}
fmt.Printf("%s released the lock\n", lock.OwnerName())
The lock.isExpired
is always true. I was thinking that maybe it happens because the client gets created each time new, and the new client does not own the lock, and ultimately the isExpired
returns true. If so, do you have an idea to solve it?
What am I missing here?
Thank you
Hello!
I'm running into this error when acquiring a lock:
cannot store lock item: lock already acquired by other client: ConditionalCheckFailedException: The conditional request failed
This seems to happen when multiple clients are waiting for a lock and the read request succeeds for at least 2 of them, causing the conditional lock put request to be invoked at the same time. My proposal is that the client should continue to retry if encountering this error.
I appreciate your work on this project and considering this request.
I would like to improve the test workflow:
docker-compose.yml
with DynamoDB. This is for local development only.Makefile
which brings up the DynamoDB as well as runs tests via make test
.Happy to discuss the details here and submit a PR :)
It feels a bit awkward that New
does not require a partitionKeyName
, while NewWithSortKey
requires a sortKeyName
. I propose we change the interfaces from
func New(dynamoDB DynamoDBClient, tableName string,
opts ...ClientOption) (*Client, error) { ... }
func NewWithSortKey(dynamoDB DynamoDBClient, tableName, sortKeyName string,
opts ...ClientOption) (*ClientWithSortKey, error) { ... }
to
func New(dynamoDB DynamoDBClient, tableName, partitionKeyName string,
opts ...ClientOption) (*Client, error) { ... }
func NewWithSortKey(dynamoDB DynamoDBClient, tableName, partitionKeyName, sortKeyName string,
opts ...ClientOption) (*ClientWithSortKey, error) { ... }
This way, the interface is more uniform, as well as we no longer have WithPartitionKeyName
. To the end user, there wouldn't be much difference, as all they'd have to do is supply a field such as "lock"
.
Hello, is there any plan for a v2 release with support for the aws-sdk-go-v2 ? I'd be happy to help with contributions.
(Continued from #156 (comment))
It'd be quite helpful to have support for sort keys on locks. Our use case at Square is implementing a distributed load-balancing system, where locks are used to indicate that a shard is currently owned. To enable workers to pick up slack from peers that leave, we need a way to efficiently enumerate all locks that are currently a part of the sharded pool. Adding sortkeys would make this a lot easier (the partition key is the pool's name, whereas the sort key is a UUID for each worker).
As @zellyn mentioned, I've been working on a PR for this, which is fairly unintrusive:
sortKey
functions the way it currently doessortKeyName
is provided on Client creation, then it's required (via WithSortKey
) whenever a lock is acquired.Hi,
tag: 2.0.3
I've recently started using this client library and have noticed some of the interfaces do not work as expected, most importantly WithSessionMonitor's callback never gets invoked even when the client completely loses its connection with the underlying DynamoDB lock table. I simulated this within a K8s pod in which I used iptables
to reject all messages to and from the DynamoDB IP range, and the call back never gets invoked. An example of the code is:
c, err := dynamolock.New(dynamodb.NewFromConfig(cfg),
"some-service.locks",
dynamolock.WithLeaseDuration(10 * time.Second),
dynamolock.WithHeartbeatPeriod(3 * time.Second),
dynamolock.WithOwnerName(hostname),
)
if err != nil {
return nil, errors.Wrap(err, "failed to create dynamolock client")
}
...
// Chan used to signal lock is close to expiration
done := make(chan struct{})
lock, err := c.AcquireLockWithContext(ctx, c.lockKey,
dynamolock.WithDeleteLockOnRelease(),
dynamolock.WithSessionMonitor(10*time.Second, func() {
glog.Warning("session monitoring triggered alert for lock approaching expiration, will release and try to acquire again")
// THIS CODE IS NEVER REACHED
// Close the channel
close(done)
}),
)
if err != nil {
...
select {
case <-ctx.Done():
return nil
case <-done:
return errors.New("lock expiring")
}
Please let me know if there's anything else I can provide. Would love to know if I'm doing something wrong here or how we can get this issue resolved. I'll file another issue for the other issues I have reproduced. Thanks!
Hi @ucirello --
I just have a couple of questions about some features that are currently available in the Java version of the DynamoDB lock client developed by AWS Labs, but not yet available in this in this Go version:
Line 583, client.go:
func (c *Client) readFromDynamoDB(key string) (*dynamodb.GetItemOutput, error) {
dynamoDBKey := map[string]*dynamodb.AttributeValue{
c.partitionKeyName: {S: aws.String(key)},
}
return c.dynamoDB.GetItem(&dynamodb.GetItemInput{
ConsistentRead: aws.Bool(true),
TableName: aws.String(c.tableName),
Key: dynamoDBKey,
})
}
For tables that have both partition and sort key, this causes an error:
ValidationException: The provided key element does not match the schema
client.Get(key)
? Something like calling lock.IsReleased()
before attempting to call client.AcquireLock(..)
? I'm not so sure that the lock.IsExpired()
method fully covers this.Thank you for your continued help and support of this package :)
It is possible for the UpdateItem
call to error while actually succeeding on the server side. If this happens, the lock is immediately lost and no-one has it for one entire lease duration.
The order of operations that causes this is like so:
newRvn1
newRvn1
, and condition requiring oldRvn
oldRvn
newRvn2
oldRvn
newRvn1
For context: we were able to see this in our production environment in two different ways:
SendHeartbeatWithContext
with a timeout on the context. This can cause a race between the client giving up on the request and the request returning success.We wanted to file an issue first to gather feedback, but we had an initial idea to solve this:
RvnLookback
option to the client, defaulting to 0.Lock
struct for "attemptedRvns".We are happy to take on this work and open a pull request if this seems reasonable.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.