Comments (9)
Current Fluentd's code is as following (pseudo code):
if current_buffer_chunk.size + new_data.size <= buffer_chunk_limit
current_buffer_chunk << new_data
return
## this check is disabled:
#elsif new_data.size > buffer_size_limit
# error: received data too large
elsif multi_event_stream.size > buffer_chunk_limit
error: queue size exceeds limit
end
current_buffer_chunk = create_new_chunk
current_buffer_chunk << new_data
Do you mean that "new_data.size > buffer_size_limit" check is needed?
The reason why the check is disabled is that whole new_data is thrown away even if each records are small.
Any ideas?
from fluentd.
Any ideas?
Nothing because input doesn't consider buffer and output, but the policy is needed.
We should clarify where responsibility lies to avoid this issue.
from fluentd.
We sent pull request to solve this problem - user defined filter per records.
The overview of this patch is:
- load user-defined script specified by the option 'filter_script'
- invoke filter method in user-defined script per record.
The sample codes are here : https://gist.github.com/1950170
Note that this code is just a sample, therefore, there remain some tasks to merge.
Todo list is as follows:
- Add test cases.
- Multiple filter support.
Please tell me if you have any ideas or questions. Thank you!
from fluentd.
I commented about your pull-req at #42.
Filtering or something simliar features will be useful but its context will be the different from this problem.
from fluentd.
To solve this problem, we need to understand following problems:
a) ObjectBufferedOutput (including out_forward) collects multiple chunks into one chunk for optimization. Thus a size of chunk tends to be large.
b) Fluentd has a transaction mechanism that make it possible to guarantee the atomicity of one call of emit
. It's enabled by the BufferedOutput class which is the base class of buffered output plugins.
c) Buffer plugins can't divide a chunk into multiple records because it doesn't know boundaries of records.
Thuns the current code can't raise exceptions nor throwing away the chunk.
I'm trying to implement following solution:
- Divide buffer plugins'
emit
method intoopen
->append
->append
->…->close
- Change
BufferedOutput#format_stream
method as following:
@buffer.open {|con|
es.each {|time,record|
begin
con.append format(tag, time, record)
rescue
# logging
end
}
}
ObjectBufferedOutput#emit
fallbacks to non-optimized routine that callsBufferedOutput#format_stream
if the optimized routine raised BufferChunkLimitError.
from fluentd.
@frsyuki Thank you for your being dealing with this difficult problem.
This solution seems to work well on the occurrence of exceeding chunk limit.
Could I ask you whether my understanding is correct or not?
Divide buffer plugins' emit method into open->append->append->…->close
Change BufferedOutput#format_stream method as following:
You mean, you'll change the current chunk format (|chunk header|chunk body|
) into new chunk format like as follows:
|chunk header|record header|user's-formatted record|record header|user's-formatted record|record header|user's-formatted record|
, right?
ObjectBufferedOutput#emit fallbacks to non-optimized routine that calls BufferedOutput#format_stream if the optimized routine raised BufferChunkLimitError.
"The non-optimized routine" means "the routine which processes per-record (not per-chunk)", doesn't it?
from fluentd.
you'll change the current chunk format
No. First, even current chunk format doesn't include any specifications for boundaries, headers or bodies. BufferedOutput plugins define the format.
See following pseudo codes of Buffer#open
:
class Connection < Struct.new(:key, :buffer)
def append(data)
@buffer.append(key, data)
end
end
def open(key, &block)
synchronize do # cost of this mutex is expensive
yield Connection.new(key, self)
end
end
def append(key, data)
chunk = get_current_top_chunk
# buffer plugins can check buffer_chunk_limit per record
if chunk.size + data.size > @buffer_chunk_limit
# raise error
end
...
end
"The non-optimized routine" means "the routine which processes per-record (not per-chunk)", doesn't it?
(Almost) yes. The non-optimized routine appends data into the buffer per-record regardless of the atomic unit of one emit
.
(Note that "per-chunk" is not an exact expression because the word "chunk" is used in the different context. Don't confuse the buffer chunk used in buffer plugins, with an unit of emit used by input plugins.)
from fluentd.
@frsyuki Okey, I understood what you're going to do. I believe that it works well :-)
I can help you if you need, and I'll watch the modification if you create the branch for this problem.
from fluentd.
Temporary closed. Maybe, need to redesign core.
from fluentd.
Related Issues (20)
- Fluentd logs HOT 1
- Support AWS SigV4 in the http output plugin HOT 3
- Buffer: v1.16.4: Emit error by IOError HOT 8
- Match directive not working HOT 1
- What will be the impact after removal of OpenSSL c_rehash script from td-agent
- in_exec: Can't handle non-ASCII characters output HOT 1
- Syslog TLS: [client_cert_auth false] settings is not applied if [insecure true] is not set. HOT 4
- [YAML] 'log_level' is deprecated parameter name. use '@log_level' instead // parameter '$log_level' is not used HOT 3
- Broken hadoop_snappy compression in some cases HOT 1
- 'console' gem v1.25 causes LoadError HOT 2
- Warning: "Fiber#storage has borked keys and is being monkey-patched." HOT 2
- out_file:Inconsistent path settings and symlink_path settings. HOT 2
- CI: Windows: An error occurred while installing cool.io (1.8.0), and Bundler cannot continue. HOT 8
- in_tail plugin can cause breakdowns in fluentd
- Fluentd application logs are not captured by either <match fluent.*> and <label @FLUENT_LOG> when SIGUSR2 signal is triggered. HOT 1
- Allow serving compresses responses via Prometheus plugin
- fluentd eat disk HOT 1
- failed to write data into buffer by buffer overflow action=:throw_exception
- roate_size not working as expected
- many CLOSE_WAITs in eventloop (HTTP input) HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from fluentd.