Comments (11)
it sounds like you don't want to mix IDs across multiple files
Its mote like I don't want to mix id accros multiple path, one folder = one id more likely.
if the IDs are unique / message, then that will result in batches of size one
Ids are sensor id, so multiple reading will happen within couple of seconds.
from benthos.
@mihaitodor thank you for your help! I really appreciate the time you spent on my issue.
Maybe I should try to use AWS S3 storage instead of gcs if it fix my issue.
I will try to play more and see if I find a way to fix it.
from benthos.
Hey @anthonycorbacho 👋 thank you for raising this issue! While I think setting timeout: 60s
should be more than enough time for the Google APIs to delete a file, what might be happening is either that the write fails and we try to remove the file anyway or that writes take too long and they don't leave enough time in the same context to also do the deletion part (but I think the latter is unlikely).
Not sure what's the best way to address this, but I'd consider skipping the deletion if the write fails. Also, it might be worth either reducing the level of that "Failed to delete temporary file used for merging" log or introducing a flag which users can enable to silence this error.
Additionally, I think there's some opportunity for performance gains by passing more than one object in the ComposerFrom()
call. The docs say that up to 32 are supported, so that could be a good future enhancement.
PS: PRs are welcome!
from benthos.
Hi @mihaitodor thank you for the answer.
I tested different settings and am still having issues. I cloned the repo and added more debug to see the folder where the merge failed. The funny thing is that the file that was supposed to have a bunch of entries inside one has 1.
I tried with batch and having multiple file instead and same issue happen
gcp_cloud_storage:
bucket: "abucket"
path: benthos/dt=${!@dt}/id=${!@id}/file-${!timestamp_unix_nano()}.csv
content_type: binary/octet-stream
collision_mode: append
timeout: 60s
max_in_flight: 64
batching:
count: 100
Maybe my pipeline is causing the issue because each event I get generally has a bunch of IDs in the message.
e.g
{
"ID1": {ts: xxx, value: xxx},
"ID2": {ts: xxx, value: xxx},
"ID3": {ts: xxx, value: xxx},
"ID4": {ts: xxx, value: xxx},
}
and I am trying to
- explode the message as single line via
unarchive: format: json_map
so that I can work with individual message and then output them to the gcp cloud storage.
I tired to put every messages into a single file but I get a rate limit error.
from benthos.
I'm not sure what's going on there, but if you do path: benthos/dt=${!@dt}/id=${!@id}/file-${!timestamp_unix_nano()}.csv
, then that forces the output to create a new file for every write, ignoring the batching, because ${!timestamp_unix_nano()}
will resolve to a unique path for each message. One way around it is to add an archive
processor in the batching section with format: lines
and then you'll get a unique file per batch if the generated path is always different. If not, collision_mode: append
should result in an append.
PS: When configuring batching
with a count
you likely also want to add a period
, since your input will not always fill the last batch and then Benthos will keep waiting and waiting for the batch to fill up before emitting it.
from benthos.
@mihaitodor i followed your recommendation with the batching and the good new is that I don't get the error anymore, so I think this help.
But this bring a new issue, now with the following gcp output setting
batching:
period: 1s
processors:
- archive:
format: lines
each folder (dt=YYYY/MM/DD/id=XXXX) contain a mix of ids not the one I am targeting with the path.
so if the id in the path is id=ID1
I am expecting to have only id1 in the file but its a mix of all the ids (id1, id2, etc)
from benthos.
@anthonycorbacho That's great to hear! Regarding the path, it sounds like you don't want to mix IDs across multiple files, so just have a file for each ID value if I'm understanding you correctly. If that's what you need, then you can break up the batch into smaller batches by using the group_by_value
processor before the archive
one to create batches for each individual ID. However, if the IDs are unique / message, then that will result in batches of size one, which is probably not what you want.
from benthos.
@mihaitodor I tried with your last recommended way
and still getting the context error, and now it looks like I am loosing data
gcp_cloud_storage:
bucket: "abucket"
path: benthos/dt=${!@dt}/file-${!@id}.csv
content_type: application/octet-stream
collision_mode: append
timeout: 60s
max_in_flight: 64
batching:
period: 10s
processors:
- group_by_value:
value: ${! meta("id") }
- archive:
format: lines
I have a mix of
Failed to delete temporary file used for merging: context deadline exceeded
and
Failed to delete temporary file used for merging: context canceled
from benthos.
I have a feeling that max_in_flight: 64
is overwhelming the GCP API. I'd start by reducing that to 1 and then increase it gradually until performance plateaus.
and now it looks like I am loosing data
That is surprising. Benthos should keep retrying messages until the output reports success. Would be great to know if that is a reproducible issue somehow, since it should be fixed.
from benthos.
@mihaitodor I have tried with max_in_flight: 1
and I get the same error Failed to delete temporary file used for merging: context canceled
after a while and my files seems to not be updated anymore, it look like it work one time and then nothing.
I think its easily reproducible, I think if you have a lot of messages (~9k every sec) you can easily hit this issue, I don't have a complex use case, I just transform a map of json into multiple messages, change the formatting into csv and then try to save it into gcp cloud storage
from benthos.
Understood. I don't have the capacity now to look into this in more detail unfortunately and no GCP access to run tests. I think the hints I left in my previous comment should point in the right direction if anyone is interested in submitting a PR.
from benthos.
Related Issues (20)
- Cache Memory only works with numbers HOT 2
- Support for the Fission serverless framework HOT 3
- Unable to use Protobuf primitive wrapper types HOT 2
- Add additional field "max_retries" for retry processor
- Adding an `array` method to bloblang HOT 1
- WriteBatch Method in BatchOutput Interface Does Not Return Errors Anymore HOT 5
- how to use sql_select or sql_raw HOT 1
- MongoDB regression: ISODate fields get persisted as strings
- Mappings caveat or hidded beahaviour or bug with complex json HOT 1
- Incorrect default Redis port when creating configuration HOT 2
- http over amqp_0_9 proxy HOT 1
- sql_insert ORA-01483 HOT 7
- Bug: nats_jetstream input has a logic error in the stream check
- Allow setting benthos cli flags from environment variables
- Bloblang minification / alternative to newlines for separating statements
- schema_registry_encode double encodes path params HOT 1
- [Feature request] Telegram output HOT 1
- [Feature Request] Support inserting UUID from string in cassandra HOT 5
- NATS User Password Support HOT 1
- Workflow Processor - DAG Execution Ordering HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from benthos.