Coder Social home page Coder Social logo

Comments (9)

frsyuki avatar frsyuki commented on July 16, 2024

Current Fluentd's code is as following (pseudo code):

if current_buffer_chunk.size + new_data.size <= buffer_chunk_limit
  current_buffer_chunk << new_data
  return
## this check is disabled:
#elsif new_data.size > buffer_size_limit
#  error: received data too large
elsif multi_event_stream.size > buffer_chunk_limit
    error: queue size exceeds limit
end
current_buffer_chunk = create_new_chunk
current_buffer_chunk << new_data

Do you mean that "new_data.size > buffer_size_limit" check is needed?
The reason why the check is disabled is that whole new_data is thrown away even if each records are small.
Any ideas?

from fluentd.

repeatedly avatar repeatedly commented on July 16, 2024

Any ideas?

Nothing because input doesn't consider buffer and output, but the policy is needed.
We should clarify where responsibility lies to avoid this issue.

from fluentd.

oza avatar oza commented on July 16, 2024

We sent pull request to solve this problem - user defined filter per records.

The overview of this patch is:

  1. load user-defined script specified by the option 'filter_script'
  2. invoke filter method in user-defined script per record.

The sample codes are here : https://gist.github.com/1950170

Note that this code is just a sample, therefore, there remain some tasks to merge.
Todo list is as follows:

  1. Add test cases.
  2. Multiple filter support.

Please tell me if you have any ideas or questions. Thank you!

from fluentd.

frsyuki avatar frsyuki commented on July 16, 2024

I commented about your pull-req at #42.
Filtering or something simliar features will be useful but its context will be the different from this problem.

from fluentd.

frsyuki avatar frsyuki commented on July 16, 2024

To solve this problem, we need to understand following problems:

a) ObjectBufferedOutput (including out_forward) collects multiple chunks into one chunk for optimization. Thus a size of chunk tends to be large.
b) Fluentd has a transaction mechanism that make it possible to guarantee the atomicity of one call of emit. It's enabled by the BufferedOutput class which is the base class of buffered output plugins.
c) Buffer plugins can't divide a chunk into multiple records because it doesn't know boundaries of records.

Thuns the current code can't raise exceptions nor throwing away the chunk.

I'm trying to implement following solution:

  1. Divide buffer plugins' emit method into open->append->append->…->close
  2. Change BufferedOutput#format_stream method as following:
@buffer.open {|con|
  es.each {|time,record|
    begin
      con.append format(tag, time, record)
    rescue
      # logging
    end
  }
}
  1. ObjectBufferedOutput#emit fallbacks to non-optimized routine that calls BufferedOutput#format_stream if the optimized routine raised BufferChunkLimitError.

from fluentd.

oza avatar oza commented on July 16, 2024

@frsyuki Thank you for your being dealing with this difficult problem.

This solution seems to work well on the occurrence of exceeding chunk limit.
Could I ask you whether my understanding is correct or not?

Divide buffer plugins' emit method into open->append->append->…->close
Change BufferedOutput#format_stream method as following:

You mean, you'll change the current chunk format (|chunk header|chunk body|) into new chunk format like as follows:
|chunk header|record header|user's-formatted record|record header|user's-formatted record|record header|user's-formatted record|, right?

ObjectBufferedOutput#emit fallbacks to non-optimized routine that calls BufferedOutput#format_stream if the optimized routine raised BufferChunkLimitError.

"The non-optimized routine" means "the routine which processes per-record (not per-chunk)", doesn't it?

from fluentd.

frsyuki avatar frsyuki commented on July 16, 2024

you'll change the current chunk format

No. First, even current chunk format doesn't include any specifications for boundaries, headers or bodies. BufferedOutput plugins define the format.

See following pseudo codes of Buffer#open:

class Connection < Struct.new(:key, :buffer)
  def append(data)
    @buffer.append(key, data)
  end
end

def open(key, &block)
  synchronize do  # cost of this mutex is expensive
    yield Connection.new(key, self)
  end
end

def append(key, data)
  chunk = get_current_top_chunk

  # buffer plugins can check buffer_chunk_limit per record
  if chunk.size + data.size > @buffer_chunk_limit
    # raise error
  end

  ...
end

"The non-optimized routine" means "the routine which processes per-record (not per-chunk)", doesn't it?

(Almost) yes. The non-optimized routine appends data into the buffer per-record regardless of the atomic unit of one emit.

(Note that "per-chunk" is not an exact expression because the word "chunk" is used in the different context. Don't confuse the buffer chunk used in buffer plugins, with an unit of emit used by input plugins.)

from fluentd.

oza avatar oza commented on July 16, 2024

@frsyuki Okey, I understood what you're going to do. I believe that it works well :-)
I can help you if you need, and I'll watch the modification if you create the branch for this problem.

from fluentd.

repeatedly avatar repeatedly commented on July 16, 2024

Temporary closed. Maybe, need to redesign core.

from fluentd.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.