fluent-plugins-nursery / fluent-plugin-out-http Goto Github PK
View Code? Open in Web Editor NEWA generic fluentd output plugin for sending logs to an HTTP endpoint.
License: Other
A generic fluentd output plugin for sending logs to an HTTP endpoint.
License: Other
Not sure if your plugin would be responsible for the action or not, but upon failing to post, it does not appear to retry the post (in case of intermittent network failure).
Referring to: fluent/fluentd#1879
I'd like to ask for similar functionality in out_http
.
None (feature request).
<match *>
@type http
...
tls_client_cert_path /etc/ssl/endpoint1.cert # default: ''
tls_client_private_key_path /etc/ssl/endpoint1.pem # default: ''
tls_client_private_key_passphrase topsecret # default: '', secret: true
</match>
3.8.2
NAME="Alpine Linux"
ID=alpine
VERSION_ID=3.8.2
PRETTY_NAME="Alpine Linux v3.8"
HOME_URL="http://alpinelinux.org"
BUG_REPORT_URL="http://bugs.alpinelinux.org"
fluentd 1.4.0
1.1.7
For logging to services where authentication tokens are sent using the HTTP Headers, it would be really useful to be able to add headers in the config of this plugin.
How to GET with Dynamic Parameters? like the following:
def create_request(tag, time, record)
url = format_url(tag, time, record)
uri = URI.parse(url)
req = Net::HTTP.const_get(@http_method.to_s.capitalize).new(uri.request_uri)
set_body(req, tag, time, record)
set_header(req, tag, time, record)
return req, uri
At present, has only the above code, no dynamic parameter or connect with strings.
Of course, I see raw
, text
format, but in the conf, url
is always fixed.
raw
and text
only is content.
I am using output plugin in my product, so how to use retry if http Api is failed..
Can you please help on that..
Hi Guys,
I want to use a fluentd as a tracking server, nginx will redirect after immediately, but will append URLs to log files, make requests just before the redirection (using access multiple logs formatting).
Then fluentd will act a a messaging queue to read the logs which will actually be formatted as URLs list. seems to me that fluent-plugin-out-http will do the job.
This stack should replace messaging queue, and be an initiative to use fluentd as scalable messaging queue and tracking server rather than in addition to its usage as log shipper. This stack can turn into the most scalable tracking server as it require only pure nginx at realtime.
Well this s the background, and my question is, will this be bullet-proof? what are the probability to loose a line of log when agent shut down or when log rotates? I plan to have a single local fluentd for each nginx, so no need for fancy HA topology, in order to make it highly available right?
I have asked many other questions in stackoverflow
Will be very happy to get tips from you guys.
Sorry for opening an issue for a question, but I didn't know who else to turn to :)
Hope to get your support.
It would be great if this plugin could support http://docs.fluentd.org/v0.12/articles/in_http plugin. I know it seems like a bit of an odd request, but I have two fluentd nodes that are connecting over the internet. And one is sitting behind an nginx proxy that is securing the access via basic auth, hence I am using your plugin. And if there was a way to define e.g. endpoint_url https://address.com/:tag
or something, that it then dynamically creates the URL that would be great.
Currently - even when using buffering - events are sent out as individual POSTS/PUTS, etc. Would it be possible to combine events into single POSTS/PUTS? For example, when using JSON, you could put all events in a JSON array.
I'm unable to catch that out_http plugin was not able to send message. We are receiving error.
failed to POST https://xx.xx.xx.xx/ (422 Unprocessable Entity {"message":"The given data was invalid.","success":false,"status":422,"errors":{"data.status":["The data.status field is required."]}})
Will be possible to forward this to @error label or increase counter in prometheus metric fluentd_output_status_num_errors
I use compress gzip
buffer (built-in, no plugin) and compress_request true
with this http output plugin.
Fluentd attempts to gunzip the buffer from disk, which is then recompressed by this plugin.
# Upload configuration for Syslog events
<match syslog.events>
@type http
endpoint_url "https://<redacted>"
http_method post
# send compressed events
compress_request true
serializer json
buffered true
bulk_request true
# specify recoverable/repeatable status codes
recoverable_status_codes 404, 500, 502, 503, 504
# every 5 minutes or every 10 MBs
<buffer tag,time>
@type file
path /shared/logs_5/buffer/syslog
timekey 5m
timekey_wait 0m
timekey_use_utc true
chunk_limit_size 10MB
compress gzip
total_limit_size 50GB
overflow_action drop_oldest_chunk
retry_timeout 7d
retry_max_interval 3600
</buffer>
<format>
@type json
add_newline true
</format>
</match>
According to Fluentd doc https://docs.fluentd.org/configuration/buffer-section#:~:text=Fluentd%20will%20decompress,plugin%20as%20is):
Fluentd will decompress these compressed chunks automatically before passing them to the output plugin (The exceptional case is when the output plugin can transfer data in compressed form. In this case, the data will be passed to the plugin as is).
Can we somehow let fluentd know that this output plugin can transfer data in compressed form and skip the decomp / re-comp?
The main reason why we came to this revelation is due to fluentd having errors sometimes when decompressing the gzip'ed buffer chunks and choke on it with the same up-to-1-week retry logic that we put in place for cases like network loss. We'd rather fluentd pass the bad chunks to this plugin, which sends them as-is to my endpoint in the cloud, where we have all the processing power to attempt to recover them or discard them without choking up the pipe.
abbrev (default: 0.1.0)
async (1.31.0)
async-http (0.60.1)
async-io (1.34.3)
async-pool (0.4.0)
base64 (default: 0.1.1)
benchmark (default: 0.2.0)
bigdecimal (default: 3.1.1)
bson (4.15.0)
bundler (default: 2.3.26)
cgi (default: 0.3.6)
concurrent-ruby (1.2.2)
console (1.16.2)
cool.io (1.7.1)
csv (default: 3.2.5)
date (default: 3.2.2)
debug (1.6.3)
delegate (default: 0.2.0)
did_you_mean (default: 1.6.1)
digest (default: 3.1.0)
drb (default: 2.1.0)
english (default: 0.7.1)
erb (default: 2.2.3)
error_highlight (default: 0.3.0)
etc (default: 1.3.0)
fcntl (default: 1.0.1)
fiber-local (1.0.0)
fiddle (default: 1.1.0)
fileutils (default: 1.6.0)
find (default: 0.1.1)
fluent-config-regexp-type (1.0.0)
fluent-plugin-mongo (1.6.0)
fluent-plugin-multi-format-parser (1.0.0)
fluent-plugin-out-http (1.3.4)
fluent-plugin-prometheus (2.1.0)
fluent-plugin-rewrite-tag-filter (2.4.0)
fluentd (1.16.1)
forwardable (default: 1.3.2)
getoptlong (default: 0.1.1)
http_parser.rb (0.8.0)
io-console (default: 0.5.11)
io-nonblock (default: 0.1.0)
io-wait (default: 0.2.1)
ipaddr (default: 1.2.4)
irb (default: 1.4.1)
json (2.6.3, default: 2.6.1)
logger (default: 1.5.0)
matrix (0.4.2)
minitest (5.15.0)
mongo (2.18.3)
msgpack (1.7.0)
mutex_m (default: 0.1.1)
net-ftp (0.1.3)
net-http (default: 0.3.0)
net-imap (0.2.3)
net-pop (0.1.1)
net-protocol (default: 0.1.2)
net-smtp (0.3.1)
nio4r (2.5.9)
nkf (default: 0.1.1)
observer (default: 0.1.1)
oj (3.14.3)
open-uri (default: 0.2.0)
open3 (default: 0.1.1)
openssl (default: 3.0.1)
optparse (default: 0.2.0)
ostruct (default: 0.5.2)
pathname (default: 0.2.0)
power_assert (2.0.1)
pp (default: 0.3.0)
prettyprint (default: 0.1.1)
prime (0.1.2)
prometheus-client (4.2.2)
protocol-hpack (1.4.2)
protocol-http (0.24.1)
protocol-http1 (0.15.0)
protocol-http2 (0.15.1)
pstore (default: 0.1.1)
psych (default: 4.0.4)
racc (default: 1.6.0)
rake (13.0.6)
rbs (2.7.0)
rdoc (default: 6.4.0)
readline (default: 0.0.3)
readline-ext (default: 0.1.4)
reline (default: 0.3.1)
resolv (default: 0.2.1)
resolv-replace (default: 0.1.0)
rexml (3.2.5)
rinda (default: 0.1.1)
rss (0.2.9)
ruby2_keywords (default: 0.0.5)
securerandom (default: 0.2.0)
serverengine (2.3.2)
set (default: 1.0.2)
shellwords (default: 0.1.0)
sigdump (0.2.4)
singleton (default: 0.1.1)
stringio (default: 3.0.1)
strptime (0.2.5)
strscan (default: 3.0.1)
syslog (default: 0.1.0)
tempfile (default: 0.1.2)
test-unit (3.5.3)
time (default: 0.2.2)
timeout (default: 0.2.0)
timers (4.3.5)
tmpdir (default: 0.1.2)
traces (0.9.1)
tsort (default: 0.1.0)
typeprof (0.21.3)
tzinfo (2.0.6)
tzinfo-data (1.2023.3)
un (default: 0.2.0)
uri (default: 0.12.1)
weakref (default: 0.1.1)
webrick (1.8.1)
yajl-ruby (1.4.3)
yaml (default: 0.2.0)
zlib (default: 2.1.1)
Setting up endpoint_url as 127.0.0.1:3000 yields errors :(
Hi, I want to send /var/log/containers/*.log
to my remote http server via http, but it doesn't seem to work.
kubectl logs -f -n kube-system fluentd-es-v2.6.0-lp5fw
There are many warning logs as follows:
2019-08-20 06:50:55 +0000 [warn]: no patterns matched tag="kubelet"
2019-08-20 06:52:34 +0000 [warn]: no patterns matched tag="kubernetes.var.log.containers.nginx-ingress-controller-68b4679fb5-rmsh2_ingress-nginx_nginx-ingress-controller-66d4bac29eb19fd33242bcd177dcb57cffcae3e0b7cf5fc77709b74e3f09c19b.log"
And my http server didn't receive any data
...
Provide example config and message
My fluentd configuration information is as follows:
containers.input.conf: |-
<source>
@id fluentd-containers.log
@type tail
path /var/log/containers/*.log
pos_file /var/log/es-containers.log.pos
tag raw.kubernetes.*
read_from_head true
<parse>
@type multi_format
<pattern>
format json
time_key time
time_format %Y-%m-%dT%H:%M:%S.%NZ
</pattern>
<pattern>
format /^(?<time>.+) (?<stream>stdout|stderr) [^ ]* (?<log>.*)$/
time_format %Y-%m-%dT%H:%M:%S.%N%:z
</pattern>
</parse>
</source>
output.conf: |-
<match **>
@type http
endpoint_url http://10.2.208.184:9090/
serializer json
buffered true
</match>
My http server received the log sent by fluentd.
...
Linux Debian 4.19.28.bsk.3-amd64
fluentd --version
or td-agent --version
$ fluentd --version
fluentd 1.5.1
fluent-gem list
, td-agent-gem list
or your Gemfile.lock$ fluent-gem list
fluent-plugin-out-http (1.3.1)
When bulk_request is set to true, the "ndjson" lines are not actually JSON-encoded.
Config:
<store>
@type http
endpoint_url http://localhost:6666
serializer json
bulk_request true
</store>
Example POST request:
Accept-Encoding: gzip;q=1.0,deflate;q=0.6,identity;q=0.3
Accept: */*
User-Agent: Ruby
Content-Type: application/x-ndjson
Host: localhost:6666
Content-Length: 162
{"message"=>"Oct 5 15:27:50 skunk fluentd[19365]: 2019-10-05 15:27:50 -0700 [warn]: #0 suppressed same stacktrace", "path"=>"/var/log/syslog", "host"=>"skunk"}
I expected the records to be JSON encoded, not Ruby hashes.
...
Use http output plugin with bearer token
TOken should be print as xxxxxx masked.
...
fluentd --version
or td-agent --version
fluent-gem list
, td-agent-gem list
or your Gemfile.lockHi,
It would be good to use dynamic endpoint URL variable from event data like this, not only from predefined constant:
<store>
type http
endpoint_url ${record["url"]}
serializer json
</store>
When do messages go into retry loop?
I have a POST operation that's done by the http-out plugin used in conjunction with buffered plugin. Does it retry delivering the message on a 400 response?
I want to use two options which prevent sending multiply messages in one request, I want every json to be send out a separate call, since our receiving application only accepts such communication.
I old fluentd I could use:
buffered false
bulk_request false
but in the last td-agent looks it does not work.
How to fix it?
Thanks in advance
No special steps
Provide example config and message
<system>
log_level debug
</system>
<source>
@type http
@id inputhttp
port 24224
</source>
<filter am.log>
@type array_spin
key_name "alerts"
</filter>
<match am.log hb.log>
@type route
@id LABELS
<route am.log>
copy
@label @TIGER
</route>
<route hb.log>
copy
@label @HB
</route>
</match>
<label @TIGER>
<filter am.log>
@type record_transformer
@id RECORD-TRANSFOR-NEWTIGER-AM
enable_ruby true
renew_record true
auto_typecast true
<record>
app "oceancrew"
client "oceancrew"
data ${{"message" => record["annotations"]["description"].nil? ? "EMPTY" : record["annotations"]["description"], "environment" => record["labels"]["environment"].nil? ? "EMPTY" : record["labels"]["environment"], "event_name" => record["labels"]["alertname"].nil? ? "EMPTY" : record["labels"]["alertname"], "hostname" => record["labels"]["nodename"].nil? ? "EMPTY" : record["labels"]["nodename"], "event_id" => record["labels"]["id"].nil? ? "EMPTY" : record["labels"]["id"], "backlink" =>record["generatorURL"], "domain" => "no", "severity" => record["labels"]["severity"].nil? ? "EMPTY" : record["labels"]["severity"], "object_id" => record["labels"]["object_id"].nil? ? "EMPTY" : record["labels"]["object_id"], "started_at" => record["startsAt"]}}
</record>
keep_keys app,client,data
</filter>
<match am.log>
@type copy
<store>
@type http
@id HTTP-NEWTIGER-AM-AMS
endpoint https://10.232.43.37/oceancrew
http_method post
tls_verify_mode none
content_type application/json
json_array false
buffered: false
bulk_request: false
</store>
<store>
@type stdout
</store>
<store>
@type http
@id HTTP-NEWTIGER-AM-AMS2
endpoint http://localhost:8888
http_method post
tls_verify_mode none
content_type application/json
json_array false
</store>
</match>
</label>
<label @HB>
<filter hb.log>
@type record_transformer
@id RECORD-TRANSFOR-NEWHB
enable_ruby true
renew_record false
auto_typecast true
</filter>
<filter hb.log>
@type record_transformer
@id RECORD-TRANSFOR-HB-AM-MESSAGE
enable_ruby true
renew_record false
auto_typecast true
</filter>
<filter hb.log>
@type record_transformer
@id RECORD-TRANSFOR-HB-AM-MESSAGE2
enable_ruby true
renew_record true
auto_typecast true
#adds parameters to the hb message (record passed by fluentd input)
<record>
module "oceancrew"
source_name "alertmanager_pdtmon_node"
timeout "180"
</record>
keep_keys app,client,timeout
</filter>
<match hb.log>
@type http
@log_level debug
@id HTTP-HB-AM-AMS
endpoint https://10.232.43.37/oceancrew
http_method post
tls_verify_mode none
content_type application/json
</match>
</label>
<label @ERROR>
<match **>
@type stdout
</match>
</label>
be able to set:
buffered false
bulk_request false
When I valide it:
/opt/td-agent/bin/fluentd --dry-run -c /etc/td-agent/td-agent.conf
I am getting:
.....
2021-03-12 15:06:32 +0000 [warn]: parameter '**buffered**:' in <store>
@type "http"
@id HTTP-NEWTIGER-AM-AMS
endpoint "https://10.232.43.37/oceancrew"
http_method post
tls_verify_mode none
content_type "application/json"
json_array false
buffered: false
bulk_request: false
</store> is not used.
2021-03-12 15:06:32 +0000 [warn]: parameter **'bulk_request:'** in <store>
@type "http"
@id HTTP-NEWTIGER-AM-AMS
endpoint "https://10.232.43.37/oceancrew"
http_method post
tls_verify_mode none
content_type "application/json"
json_array false
buffered: false
bulk_request: false
</store> is not used.
Bare Metal
td-agent-4.1.0-1.el7.x86_64
Red Hat Enterprise Linux Server release 7.9 (Maipo)
Hi,
I stumbled upon your lib and really like the concept. However, it appears some information is missing when sending the http request. The report is posted as json but the tag and date/time of the entry are missing.
After a quick look at the source code, it seems that indeed, that info is simply discarded when forging the request. In my case, I would find it interesting to have that info. Thinking about the best way to handle this, I thought about adding it via the http headers. Does this seems acceptable to you ?
X-Fluent-date: ENTRYDATETIME
X-Fluent-tag: my.fluent.tag
If ok, I'll try to submit a PR later today.
I have a FluentD docker image running your plugin and I have a docker image that runs a Wildfly microservice listening on port 8081. I'm trying to forward from the FluentD plugin to my servlet but the gem doesn't seem to use or resolve docker-compose network names properly. In my docker-compose file I have a service defined called 'logservice'. That service is configured with:
services:
logservice:
networks:
- lognet
Then my network is configured as:
networks:
lognet:
driver: bridge
Other services can now contact docker images using the service name but the FluentD plugin doesn't work that way. I've tried the following configurations:
<match application.ags.**>
type http
endpoint_url http://localhost.local:8081/logservice
...
<match application.ags.**>
type http
endpoint_url http://logservice.local:8081/logservice
...
<match application.ags.**>
type http
endpoint_url http://logservice:8081/logservice
...
I think (but am not sure) that I've fixed the name resolution part but the services are still not talking to each other. Forgot to add the actual console output for the configuration:
endpoint_url http://logservice:8081/logservice
I get the following output:
fluentd| 2017-04-19 13:16:46 +0000 [warn]: Net::HTTP.Put raises exception: Errno::ECONNREFUSED, 'Failed to open TCP connection to logservice:8081 (Connection refused - connect(2) for "logservice" port 8081)'
Any idea what might be going on?
Hi,
I am new to fluentd and trying to figure out how to monitor a path and collect logs via fluentd and its not working. Could you please tell me what I am missing
Below is the code in use and I am not getting any error while restarting fluent daemon. I am trying to monitor aishtest.log file from the temp directory.
<source>
@type tail
path /tmp/aishtest.log
pos_file /tmp/pos_file_secure.log.pos
tag "*.*"
read_from_head true
<parse>
@type none
</parse>
</source>
<filter *.*>
@type parser
key_name log
reserve_data true
remove_key_name_field true
<parse>
@type json
</parse>
emit_invalid_record_to_error false
</filter>
<match *>
@type stdout
@type kafka2
brokers broker1 broker2
ssl_ca_cert abc
ssl_client_cert "/etc/fluent/certs/##"
ssl_client_cert_key "/etc/fluent/certs/#"
ssl_ca_certs_from_system false
ssl_verify_hostname false
client_id xyz
use_event_time true
<format>
@type json
</format>
compression_codec gzip
max_send_retries 1024
required_acks -1
sasl_over_ssl false
</match>
fluentd --version
or td-agent --version
fluent-gem list
, td-agent-gem list
or your Gemfile.lockはじめまして。こちらのplugin、いつも便利に使わせていただいています。
111行目あたりに以下のような記述がありますが、
unless res and res.is_a?(Net::HTTPSuccess)
直前のHTTP接続処理で例外等が発生するとresがnilになるケースがあり、その際に上記の行で掲題のような例外が発生します。
こちらの例外はプログラム上補足されていないので、この箇所で例外が起こると以後out_httpの処理はFluentd本体再起動まで再開不可能になるようです。
たとえば
if res and res.is_a?(Net::HTTPSuccess)
という風に unless ではなく if に記述を変えると、掲題のような例外は起こらなくなります。
こちらのロギング処理をどのような意図で実装されているかちょっと分からなかったのでpullreqでなくissueとして起票させていただきました。
以上ご報告まで。
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.