Coder Social home page Coder Social logo

Comments (6)

Jeffail avatar Jeffail commented on July 2, 2024 1

@mohitmarwal when you say "previously", do you mean version 4.26.0? And is there a an example you can provide where this behaviour shows up? For example, the following config behaves as I'd expect:

input:
  http_server: {}

output:
  http_client:
    url: example.com/not/going/to/work

Running curl http://localhost:4195/post -d "hello world" returns an error as expected.

@FerroEduardo this is a separate issue, unfortunately for some components when they are failing during a connection loop they will block traffic even when a DLQ is configured. There's an existing issue for this: #1210

from benthos.

FerroEduardo avatar FerroEduardo commented on July 2, 2024

I'm facing a similar problem when using the elasticsearch output, which I cannot handle the connection error and the pipeline gets stuck in a loop. The code below never reaches the stdout output:

input:
  generate:
    mapping: 'root = {"hello": "world"}'
    interval: 1s
    count: 1

pipeline:
  processors:
    - log:
        level: INFO
        message: "processing event: ${!content()}"

output:
  broker:
    pattern: fan_out_sequential_fail_fast
    outputs:
      - elasticsearch:
          urls:
            - https://localhost:1234
          index: "my-index"
          id: ${!timestamp_unix()}
          max_retries: 1
          tls:
            enabled: true
            skip_cert_verify: true
          basic_auth:
            enabled: true
            username: elastic
            password: admin
          healthcheck: false
          sniff: false
        processors:
          - mapping: '{"message": "elasticsearch preprocessor", "timestamp": timestamp_unix()}'
      - stdout:
          codec: lines
        processors:
          - mapping: '{"message": "stdout preprocessor", "timestamp": timestamp_unix()}'
Output
INFO Running main config from specified file       @service=benthos benthos_version=unknown path=test-elastic.yaml
INFO Listening for HTTP requests at: http://0.0.0.0:4195  @service=benthos
INFO Input type generate is now active             @service=benthos label="" path=root.input
INFO Output type elasticsearch is now active       @service=benthos label="" path=root.output.broker.outputs.0
INFO Launching a benthos instance, use CTRL+C to close  @service=benthos
INFO Output type stdout is now active              @service=benthos label="" path=root.output.broker.outputs.1
INFO processing event: {"hello":"world"}           @service=benthos label="" path=root.pipeline.processors.0
ERRO Failed to send message to elasticsearch: Post "https://localhost:1234/_bulk": dial tcp 127.0.0.1:1234: connect: connection refused  @service=benthos label="" path=root.output.broker.outputs.0
INFO processing event: {"hello":"world"}           @service=benthos label="" path=root.pipeline.processors.0
ERRO Failed to send message to elasticsearch: no available connection: no Elasticsearch node available  @service=benthos label="" path=root.output.broker.outputs.0
INFO processing event: {"hello":"world"}           @service=benthos label="" path=root.pipeline.processors.0
ERRO Failed to send message to elasticsearch: Post "https://localhost:1234/_bulk": dial tcp 127.0.0.1:1234: connect: connection refused  @service=benthos label="" path=root.output.broker.outputs.0
INFO processing event: {"hello":"world"}           @service=benthos label="" path=root.pipeline.processors.0
ERRO Failed to send message to elasticsearch: no available connection: no Elasticsearch node available  @service=benthos label="" path=root.output.broker.outputs.0
INFO processing event: {"hello":"world"}           @service=benthos label="" path=root.pipeline.processors.0

As the problem seems related to the BatchOutput interface, other components, such as aws_s3, behave in the same way.

from benthos.

mohitmarwal avatar mohitmarwal commented on July 2, 2024

@Jeffail Issue occured when i updated from version V 4.24.0 and 4.25.1.

from benthos.

mohitmarwal avatar mohitmarwal commented on July 2, 2024

@Jeffail l here is the suedo code where the issue occurs
type myBatchOutput struct {
count int // Just one simple field
}

func (m *myBatchOutput) WriteBatch(ctx context.Context, batch service.MessageBatch) error {
// Implement your WriteBatch method here
for _, msg := range batch {
// Process each message
fmt.Println("Processing message:", msg)
}
return nil
}

func (m *myBatchOutput) Close(ctx context.Context) error {
fmt.Println("disconnected:")
return nil
}

func (m *myBatchOutput) Connect(ctx context.Context) error {

// Implement your Connect method here
return nil

}

func main() {
// Initialize your BatchOutput implementation and use it as needed
err := service.RegisterBatchOutput("my_batch_output", service.NewConfigSpec(), newMyBatchOutput)
if err != nil {
panic(err)
}
service.RunCLI(context.Background())
}

func newMyBatchOutput(conf *service.ParsedConfig, mgr *service.Resources) (
output service.BatchOutput,
batchPolicy service.BatchPolicy,
maxInFlight int,
err error,
) {

output = &myBatchOutput{
	count: 10, // Set a default value or configure from conf
}
// Assign a default max in flight
maxInFlight = 10
fmt.Println("newbatchoutputfunc:")
return 

}

input:
type: http_server
address: ":80"
path: "/test"
http_server:
path: "/test"
allowed_verbs:
- POST
- GET

pipeline:
processors: []

output:
type: plugin
plugin:
name: your_custom_plugin_name

output:
type: stdout

from benthos.

mohitmarwal avatar mohitmarwal commented on July 2, 2024

@Jeffail in my setup even this doesnt return the hello world but gives me request time out error

input:
http_server: {
path: /test
}
output:
http_client:
url: https://httpbin.org/hidden-basic-auth/:user/:passwd

MINGW64 ~/Downloads/benthos_4.27.0_windows_amd64.tar/benthos_4.27.0_windows_amd64
$ curl -X POST http://localhost:4195/test -d "hello world"
Request timed out

from benthos.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.