Coder Social home page Coder Social logo

cloudflare / goflow Goto Github PK

View Code? Open in Web Editor NEW
831.0 38.0 169.0 231 KB

The high-scalability sFlow/NetFlow/IPFIX collector used internally at Cloudflare.

License: BSD 3-Clause "New" or "Revised" License

Go 98.24% Makefile 1.50% Dockerfile 0.26%
flow netflow ipfix sflow cisco juniper kafka go

goflow's Introduction

GoFlow

This application is a NetFlow/IPFIX/sFlow collector in Go.

It gathers network information (IP, interfaces, routers) from different flow protocols, serializes it in a protobuf format and sends the messages to Kafka using Sarama's library.

Why

The diversity of devices and the amount of network samples at Cloudflare required its own pipeline. We focused on building tools that could be easily monitored and maintained. The main goal is to have full visibility of a network while allowing other teams to develop on it.

Modularity

In order to enable load-balancing and optimizations, the GoFlow library has a decoder which converts the payload of a flow packet into a Go structure.

The producer functions (one per protocol) then converts those structures into a protobuf (pb/flow.pb) which contains the fields a network engineer is interested in. The flow packets usually contains multiples samples This acts as an abstraction of a sample.

The transport provides different way of processing the protobuf. Either sending it via Kafka or print it on the console.

Finally, utils provide functions that are directly used by the CLI utils. GoFlow is a wrapper of all the functions and chains thems into producing bytes into Kafka. There is also one CLI tool per protocol.

You can build your own collector using this base and replace parts:

  • Use different transport (eg: RabbitMQ instead of Kafka)
  • Convert to another format (eg: Cap'n Proto, Avro, instead of protobuf)
  • Decode different samples (eg: not only IP networks, add MPLS)
  • Different metrics system (eg: use expvar instead of Prometheus)

Protocol difference

The sampling protocols can be very different:

sFlow is a stateless protocol which sends the full header of a packet with router information (interfaces, destination AS) while NetFlow/IPFIX rely on templates that contain fields (eg: source IPv6).

The sampling rate in NetFlow/IPFIX is provided by Option Data Sets. This is why it can take a few minutes for the packets to be decoded until all the templates are received (Option Template and Data Template).

Both of these protocols bundle multiple samples (Data Set in NetFlow/IPFIX and Flow Sample in sFlow) in one packet.

The advantages of using an abstract network flow format, such as protobuf, is it enables summing over the protocols (eg: per ASN or per port, rather than per (ASN, router) and (port, router)).

Features

Collection:

  • NetFlow v5
  • IPFIX/NetFlow v9
    • Handles sampling rate provided by the Option Data Set
  • sFlow v5: RAW, IPv4, IPv6, Ethernet samples, Gateway data, router data, switch data

Production:

  • Convert to protobuf
  • Sends to Kafka producer
  • Prints to the console

Monitoring:

  • Prometheus metrics
  • Time to decode
  • Samples rates
  • Payload information
  • NetFlow Templates

Run

Download the latest release and just run the following command:

./goflow -h

Enable or disable a protocol using -nf=false or -sflow=false. Define the port and addresses of the protocols using -nf.addr, -nf.port for NetFlow and -sflow.addr, -slow.port for sFlow.

Set the brokers or the Kafka brokers SRV record using: -kafka.brokers 127.0.0.1:9092,[::1]:9092 or -kafka.srv. Disable Kafka sending -kafka=false. You can hash the protobuf by key when you send it to Kafka.

You can collect NetFlow/IPFIX, NetFlow v5 and sFlow using the same collector or use the single-protocol collectors.

You can define the number of workers per protocol using -workers .

Docker

We also provide a all-in-one Docker container. To run it in debug mode without sending into Kafka:

$ sudo docker run --net=host -ti cloudflare/goflow:latest -kafka=false

Environment

To get an example of pipeline, check out flow-pipeline

How is it used at Cloudflare

The samples flowing into Kafka are processed and special fields are inserted using other databases:

  • User plan
  • Country
  • ASN and BGP information

The extended protobuf has the same base of the one in this repo. The compatibility with other software is preserved when adding new fields (thus the fields will be lost if re-serialized).

Once the updated flows are back into Kafka, they are consumed by database inserters (Clickhouse, Amazon Redshift, Google BigTable...) to allow for static analysis. Other teams access the network data just like any other log (SQL query).

Output format

If you want to develop applications, build pb/flow.proto into the language you want:

Example in Go:

PROTOCPATH=$HOME/go/bin/ make proto

Example in Java:

export SRC_DIR="path/to/goflow-pb"
export DST_DIR="path/to/java/app/src/main/java"
protoc -I=$SRC_DIR --java_out=$DST_DIR $SRC_DIR/flow.proto

The fields are listed in the following table.

You can find information on how they are populated from the original source:

Field Description NetFlow v5 sFlow NetFlow v9 IPFIX
Type Type of flow message NETFLOW_V5 SFLOW_5 NETFLOW_V9 IPFIX
TimeReceived Timestamp of when the message was received Included Included Included Included
SequenceNum Sequence number of the flow packet Included Included Included Included
SamplingRate Sampling rate of the flow Included Included Included Included
FlowDirection Direction of the flow DIRECTION (61) flowDirection (61)
SamplerAddress Address of the device that generated the packet IP source of packet Agent IP IP source of packet IP source of packet
TimeFlowStart Time the flow started System uptime and first =TimeReceived System uptime and FIRST_SWITCHED (22) flowStartXXX (150, 152, 154, 156)
TimeFlowEnd Time the flow ended System uptime and last =TimeReceived System uptime and LAST_SWITCHED (23) flowEndXXX (151, 153, 155, 157)
Bytes Number of bytes in flow dOctets Length of sample IN_BYTES (1) OUT_BYTES (23) octetDeltaCount (1) postOctetDeltaCount (23)
Packets Number of packets in flow dPkts =1 IN_PKTS (2) OUT_PKTS (24) packetDeltaCount (1) postPacketDeltaCount (24)
SrcAddr Source address (IP) srcaddr (IPv4 only) Included Included IPV4_SRC_ADDR (8) IPV6_SRC_ADDR (27)
DstAddr Destination address (IP) dstaddr (IPv4 only) Included Included IPV4_DST_ADDR (12) IPV6_DST_ADDR (28)
Etype Ethernet type (0x86dd for IPv6...) IPv4 Included Included Included
Proto Protocol (UDP, TCP, ICMP...) prot Included PROTOCOL (4) protocolIdentifier (4)
SrcPort Source port (when UDP/TCP/SCTP) srcport Included L4_SRC_PORT (7) sourceTransportPort (7)
DstPort Destination port (when UDP/TCP/SCTP) dstport Included L4_DST_PORT (11) destinationTransportPort (11)
InIf Input interface input Included INPUT_SNMP (10) ingressInterface (10)
OutIf Output interface output Included OUTPUT_SNMP (14) egressInterface (14)
SrcMac Source mac address Included IN_SRC_MAC (56) sourceMacAddress (56)
DstMac Destination mac address Included OUT_DST_MAC (57) postDestinationMacAddress (57)
SrcVlan Source VLAN ID From ExtendedSwitch SRC_VLAN (59) vlanId (58)
DstVlan Destination VLAN ID From ExtendedSwitch DST_VLAN (59) postVlanId (59)
VlanId 802.11q VLAN ID Included SRC_VLAN (59) postVlanId (59)
IngressVrfID VRF ID ingressVRFID (234)
EgressVrfID VRF ID egressVRFID (235)
IPTos IP Type of Service tos Included SRC_TOS (5) ipClassOfService (5)
ForwardingStatus Forwarding status FORWARDING_STATUS (89) forwardingStatus (89)
IPTTL IP Time to Live Included IPTTL (52) minimumTTL (52
TCPFlags TCP flags tcp_flags Included TCP_FLAGS (6) tcpControlBits (6)
IcmpType ICMP Type Included ICMP_TYPE (32) icmpTypeXXX (176, 178) icmpTypeCodeXXX (32, 139)
IcmpCode ICMP Code Included ICMP_TYPE (32) icmpCodeXXX (177, 179) icmpTypeCodeXXX (32, 139)
IPv6FlowLabel IPv6 Flow Label Included IPV6_FLOW_LABEL (31) flowLabelIPv6 (31)
FragmentId IP Fragment ID Included IPV4_IDENT (54) fragmentIdentification (54)
FragmentOffset IP Fragment Offset Included FRAGMENT_OFFSET (88) fragmentOffset (88) and fragmentFlags (197)
BiFlowDirection BiFlow Identification biflowDirection (239)
SrcAS Source AS number src_as From ExtendedGateway SRC_AS (16) bgpSourceAsNumber (16)
DstAS Destination AS number dst_as From ExtendedGateway DST_AS (17) bgpDestinationAsNumber (17)
NextHop Nexthop address nexthop From ExtendedGateway IPV4_NEXT_HOP (15) BGP_IPV4_NEXT_HOP (18) IPV6_NEXT_HOP (62) BGP_IPV6_NEXT_HOP (63) ipNextHopIPv4Address (15) bgpNextHopIPv4Address (18) ipNextHopIPv6Address (62) bgpNextHopIPv6Address (63)
NextHopAS Nexthop AS number From ExtendedGateway
SrcNet Source address mask src_mask From ExtendedRouter SRC_MASK (9) IPV6_SRC_MASK (29) sourceIPv4PrefixLength (9) sourceIPv6PrefixLength (29)
DstNet Destination address mask dst_mask From ExtendedRouter DST_MASK (13) IPV6_DST_MASK (30) destinationIPv4PrefixLength (13) destinationIPv6PrefixLength (30)
HasEncap Indicates if has GRE encapsulation Included
xxxEncap fields Same as field but inside GRE Included
HasMPLS Indicates the presence of MPLS header Included
MPLSCount Count of MPLS layers Included
MPLSxTTL TTL of the MPLS label Included
MPLSxLabel MPLS label Included

If you are implementing flow processors to add more data to the protobuf, we suggest you use field IDs โ‰ฅ 1000.

Implementation notes

The pipeline at Cloudflare is connecting collectors with flow processors that will add more information: with IP address, add country, ASN, etc.

For aggregation, we are using Materialized tables in Clickhouse. Dictionaries help correlating flows with country and ASNs. A few collectors can treat hundred of thousands of samples.

We also experimented successfully flow aggregation with Flink using a Keyed Session Window: this sums the Bytes x SamplingRate and Packets x SamplingRate received during a 5 minutes window while allowing 2 more minutes in the case where some flows were delayed before closing the session.

The BGP information provided by routers can be unreliable (if the router does not have a BGP full-table or it is a static route). You can use Maxmind prefix to ASN in order to solve this issue.

License

Licensed under the BSD 3 License.

goflow's People

Contributors

davmatjo avatar debugloop avatar joao-reis avatar kanocz avatar loganmc10 avatar lspgn avatar mmlb avatar morrowc avatar mpursley avatar rkosegi avatar rzerda avatar shyam334-w1 avatar simpod avatar superq avatar taktv6 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

goflow's Issues

Losing flows from many network devices with 1 collector

Hi Guys!
I want to send flows from many routers to goflow collector and then push those flows to kafka topic, my question is as follow... Do you have any recommendation to know/calculate the relation between network devices (routers) and colletors.
Now I have 3 routers target to 1 goflow collector and I have the impression that I'm losing packages. Do you have any advice/hint ?
Btw, I'm running goflow through docker... should I need to run many containers?
thanks for your time guys ;)

Question: Flow field support?

Hi

Please do correct me if I'm wrong but it doesn't appear I can export NetFlow fields that aren't available across all the NetFlow protocols. For example, in my case I'm dealing with IPFIX biflows and require fields specific to IPFIX such as 'biflowDirection'. Is there a way to request theses fields in the protobuff definition or is this a feature i could request?

Thanks!

Missing Flow Duration or Flow End Time

case netflow.NFV9_FIELD_LAST_SWITCHED:
var timeLastSwitched uint32
DecodeUNumber(v, &timeLastSwitched)
timeDiff := (uptime - timeLastSwitched) / 1000
flowMessage.TimeFlow = uint64(baseTime - timeDiff)
}

The above case statement decodes the nfv9 LAST_SWITCHED field (flowEndSysUpTime, as IPFIX calls it) into TimeFlow. I am wondering why the FIRST_SWITCHED (flowStartSysUpTime) field is not being decoded. The switch statement in 281 right below does the TimeFlow for IPFIX, but also only considers flowEnd* fields.

I was about to implement some way of having more timing information, but I am stuck with some basic issues:

  1. Renaming TimeFlow in protobuf would be bad.
  2. Adding TimeFlowEnd without having TimeFlowStart would be very confusing.
  3. Adding FlowDuration might be a solution, but it does not seem as convenient as having an absolute time.

I am leaning toward implementing option 3, but I thought I should gather some opinions first.

There might also be another issue implementing this: I am not sure in what order records arrive (line 138), and thus computing the FlowDuration might be difficult, depending on wether the flowMessage object has already been populated with FlowTime. As far as I understand right now (after some superficial reading) I can not access other raw records by ID or name in a reliable way, and I would be decoding that field twice then anyways.

Ip address after deserialized unreadable

I'm having a problem. After deserialized NetFlow v9 from Kafka I get unreadable string.
For example
SrcIP: "dA\004\334"
DstIP: "^d\256\305"

If i try get byteArray from SrcIP then i get array byte who content 4 number. For example [100, 65, 4, -36]

I using protoc v3.8.0 for generate java class from .proto

Support UDP relay or reflector feature

I'd like to have goflow parse the received packets and at the same time relay/reflect the packets to other agents. So I'm proposing to add relay.addr and relay.port options, which if not specified would not change the current behavior.

Sending through Logstash shows corrupted data

Hi, not sure if I'm not understanding how to properly use goflow.
I ran with -kafka=false flag and it shows flows correctly

Type:NETFLOW_V9 TimeReceived:1571325708 SequenceNum:82208684 SamplingRate:0 SamplerAddress:10.101.11.211 TimeFlowStart:1571325692 TimeFlowEnd:1571325692 Bytes:72 Packets:1 SrcAddr:2607:f8b0:4008:80f::200a DstAddr:2800:bf0:2a7:1122:14bd:7ee3:e8a8:c58d Etype:34525 Proto:6 SrcPort:443 DstPort:59056 SrcIf:216 DstIf:219 SrcMac:00:00:00:00:00:00 DstMac:00:00:00:00:00:00 SrcVlan:0 DstVlan:0 VlanId:0 IngressVrfID:1610612739 EgressVrfID:1610612736 IPTos:0 ForwardingStatus:64 IPTTL:0 TCPFlags:17 IcmpType:0 IcmpCode:0 IPv6FlowLabel:592069 FragmentId:0 FragmentOffset:0 BiFlowDirection:0 SrcAS:0 DstAS:0 NextHop:10.101.21.192 NextHopAS:0 SrcNet:48 DstNet:48

Then I enable kafka and I'm using logstash to send the data to elasticsearch for further analysis, I don't filter the data inside logstash, but stdout shows something like this:

{ "@timestamp" => 2019-10-17T15:19:50.121Z, "message" => "\b\u0003\u0010๏ฟฝ๏ฟฝ๏ฟฝ๏ฟฝ\u0005 ๏ฟฝ๏ฟฝ'(๏ฟฝ๏ฟฝ\u00052\u0004\u001F\rC\u0014:\u0004-๏ฟฝ4\nH๏ฟฝ\u0016P\u0002Z\u0004\ne\v๏ฟฝb\u0004\ne5๏ฟฝ๏ฟฝ\u0001\u0018๏ฟฝ\u0001\u0018๏ฟฝ\u0001๏ฟฝ\u0001๏ฟฝ\u0001๏ฟฝ\u0001๏ฟฝ\u0001\u0006๏ฟฝ\u0001๏ฟฝ\u0003๏ฟฝ\u0001๏ฟฝ๏ฟฝ\u0002๏ฟฝ\u0001@๏ฟฝ\u0001\u0010๏ฟฝ\u0001๏ฟฝ\u0010๏ฟฝ\u0002๎„ข๏ฟฝ\u0005๏ฟฝ\u0002๏ฟฝ๏ฟฝ๏ฟฝ๏ฟฝ\u0006๏ฟฝ\u0002๏ฟฝ๏ฟฝ๏ฟฝ๏ฟฝ\u0006", "@version" => "1" } { "@timestamp" => 2019-10-17T15:19:50.121Z, "message" => "\b\u0003\u0010๏ฟฝ๏ฟฝ๏ฟฝ๏ฟฝ\u0005 ๏ฟฝ๏ฟฝ'(๎„ข๏ฟฝ\u00052\u0004\u0011๏ฟฝ๏ฟฝ๏ฟฝ:\u0004๏ฟฝ๏ฟฝ1๏ฟฝH๏ฟฝ\vP\u0001Z\u0004\ne\v๏ฟฝb\u0004\u0000\u0000\u0000\u0000๏ฟฝ\u0001\u0018๏ฟฝ\u0001๏ฟฝ\u0001๏ฟฝ\u0001\u0006๏ฟฝ\u0001๏ฟฝ\u0003๏ฟฝ\u0001๏ฟฝ๏ฟฝ\u0002๏ฟฝ\u0001\u0002๏ฟฝ\u0001@๏ฟฝ\u0001\u0010๏ฟฝ\u0001๏ฟฝ\u0010๏ฟฝ\u0002๎„ข๏ฟฝ\u0005๏ฟฝ\u0002๏ฟฝ๏ฟฝ๏ฟฝ๏ฟฝ\u0006", "@version" => "1" }

I want to be able to send the data to elasticsearch to create graphs and query ES for obtaining data related to traffic and outgoing interfaces.

I'm barely noob when it comes to Go and Kafka so I'll look forward to any suggestions made in this thread.

Is the current flow.proto correct?

Great project, thank you! I am trying to use the output format of protobuf with kafka and then into Clickhouse. I keep getting this message from CH leading me to think something may not be right with the flow.proto file.
Code: 444. DB::Exception: Received from localhost:9000. DB::Exception: Protobuf messages are corrupted or don't match the provided schema. Please note that Protobuf stream is length-delimited: every message is prefixed by its length in varint.
If flow.proto is confirmed correct I'll take this to the CH forum.

Netflow v5 -- Unknown

Hi,
my goflow container (latest 3.4.3) is receiving flow records with initial flags like this: (i'm inspecting them via nfcapd on a VM)

Flow Record:
Flags = 0x00 NETFLOW v5, Unsampled
label =

However goflow returns continously this error:
Error from: NetFlow (0) duration: 12.92ยตs. Unknown NetFlow version 5 (only decodes v9 and v10/IPFIX)"

Does goflow support netflow v5 ? Can you help me troubleshoot this ?

Refactor, repackage, lint

For version 3.x.x:

  • Go fmt/lint/go mod
  • Split front-ends into three parsers (NetFlowV9/IPFIX, NetFlowV5, sFlow) in a cmd directory.
  • Error handling (especially when decoding)
  • Logging in decoder (passing log object to have more feedback)
  • Tests for decoding
  • Cleaning in protobuf (#22)
  • More Kafka options (sharding/keying)

Support for custom sampling rate

Please add support for IPFIX/Netflow v9/sFlow custom sampling rate (as argument on goflow). It's needed in some cases when devices not sending Option Data Sets /Flow Sample for some reasons or IPFIX/Netflow v9/sFlow load-balanced across multiple goflow anycast nodes (because general IPFIX data and Option Data Sets sended from different source udp ports - different 5-tuple).

Empty data with IPFIX

When I receive Data Template, goflow seems to decode messages, but if I print the record I get a empty array of bytes [].

IP address fields in debug output are packed and therefor are packed when retrieved by the Kafka consumer

I'm having a problem where the IP address values added to Elasticsearch via logstash are packed. I spent quite a while trying to figure out why and then realized that even the debug output of goflow has them packed. I think Logstash is doing exactly what it has been told to do and the issue might be with goflow...

Debug output from ./goflow-v2.0.4-linux-x86_64 -loglevel debug -kafka=false

DEBU[2278] Packet received: Type:NFV9 TimeRecvd:1531427723 SequenceNum:55500 TimeFlow:1531427711 SrcIP:"F&<V" DstIP:"\n\n\0032" IPversion:IPv4 RouterAddr:"\n\n\000\002" NextHop:"@\017\220\001" SrcNet:32 DstNet:32 SrcIf:5 DstIf:6 Proto:1  
DEBU[2278] Packet received: Type:NFV9 TimeRecvd:1531427723 SequenceNum:55500 TimeFlow:1531427711 SrcIP:"F&=\252" DstIP:"\n\n\0032" IPversion:IPv4 RouterAddr:"\n\n\000\002" NextHop:"@\017\220\001" SrcNet:32 DstNet:32 SrcIf:5 DstIf:6 Proto:1  
DEBU[2278] Packet received: Type:NFV9 TimeRecvd:1531427723 SequenceNum:55500 TimeFlow:1531427711 SrcIP:"F&:\266" DstIP:"\n\n\0032" IPversion:IPv4 RouterAddr:"\n\n\000\002" NextHop:"@\017\220\001" SrcNet:32 DstNet:32 SrcIf:5 DstIf:6 Proto:1  
DEBU[2278] Packet received: Type:NFV9 TimeRecvd:1531427723 SequenceNum:55500 TimeFlow:1531427711 SrcIP:"F&:\265" DstIP:"\n\n\0032" IPversion:IPv4 RouterAddr:"\n\n\000\002" NextHop:"@\017\220\001" SrcNet:32 DstNet:32 SrcIf:5 DstIf:6 Proto:1  
DEBU[2278] Packet received: Type:NFV9 TimeRecvd:1531427723 SequenceNum:55500 TimeFlow:1531427711 SrcIP:"F&:\264" DstIP:"\n\n\0032" IPversion:IPv4 RouterAddr:"\n\n\000\002" NextHop:"@\017\220\001" SrcNet:32 DstNet:32 SrcIf:5 DstIf:6 Proto:1  
DEBU[2278] Packet received: Type:NFV9 TimeRecvd:1531427723 SequenceNum:55500 TimeFlow:1531427711 SrcIP:"F&9\212" DstIP:"\n\n\0032" IPversion:IPv4 RouterAddr:"\n\n\000\002" NextHop:"@\017\220\001" SrcNet:32 DstNet:32 SrcIf:5 DstIf:6 Proto:1  
DEBU[2278] Packet received: Type:NFV9 TimeRecvd:1531427723 SequenceNum:55500 TimeFlow:1531427711 SrcIP:"F&9\211" DstIP:"\n\n\0032" IPversion:IPv4 RouterAddr:"\n\n\000\002" NextHop:"@\017\220\001" SrcNet:32 DstNet:32 SrcIf:5 DstIf:6 Proto:1  
DEBU[2278] Packet received: Type:NFV9 TimeRecvd:1531427723 SequenceNum:55500 TimeFlow:1531427711 SrcIP:"F&9\236" DstIP:"\n\n\0032" IPversion:IPv4 RouterAddr:"\n\n\000\002" NextHop:"@\017\220\001" SrcNet:32 DstNet:32 SrcIf:5 DstIf:6 Proto:1  
DEBU[2278] Packet received: Type:NFV9 TimeRecvd:1531427723 SequenceNum:55500 TimeFlow:1531427711 SrcIP:"F&9\235" DstIP:"\n\n\0032" IPversion:IPv4 RouterAddr:"\n\n\000\002" NextHop:"@\017\220\001" SrcNet:32 DstNet:32 SrcIf:5 DstIf:6 Proto:1  
DEBU[2278] Packet received: Type:NFV9 TimeRecvd:1531427723 SequenceNum:55500 TimeFlow:1531427711 SrcIP:"F&9\234" DstIP:"\n\n\0032" IPversion:IPv4 RouterAddr:"\n\n\000\002" NextHop:"@\017\220\001" SrcNet:32 DstNet:32 SrcIf:5 DstIf:6 Proto:1  

Incidentally the issue is the same using v1.1.0 and compiling flow.proto from it's source but the debug output does not contain enough information to see the same problem.

DEBU[0325] Message processed                             count_flowmessages=24 samplingRate=16834 seqnum=56009 source="10.10.0.2:65356" type=NetFlow/IPFIX version=9
DEBU[0327] Message processed                             count_flowmessages=21 samplingRate=16834 seqnum=56010 source="10.10.0.2:65356" type=NetFlow/IPFIX version=9
DEBU[0329] Message processed                             count_flowmessages=23 samplingRate=16834 seqnum=31730 source="10.10.0.2:65356" type=NetFlow/IPFIX version=9

Protobuf format question

Hi Guys.

I have a question about the protobuf format used with goflow. I am attempting to decode the messages but is getting an error from protubuf. When I look at the message itself, it appears to have non protobuf data in the value extracted from the protobuf message:

such as:

--snip
b'2021-02-11T09:56:10.870Z %{host} \x08\x03\x10\x
--snip

it appears to be a timestamp with an unresolved template field for the host.

Is there any preprocessing that need be done one the kafka message before attempting to decode the protobuf?

I use the python confluent kafka library.

Thank you.

Support output formats other than protobuf over Kafka

Background/Use Case

InfluxData's Telegraf project received a request (influxdata/telegraf#8425) to integrated into goFlow for IPFIX/NetFlow support. Currently Telegraf does not support binary (ex: protobuf) formats.

Feature Request

Request for vFlow to add one of the following output formats to ease the integration with Telegraf.

  • InfluxDB Line Protocol
  • Carbon2
  • Graphite
  • JSON
  • Prometheus
  • SplunkMetric
  • Wavefront
  • ServiceNow Metrics

Also not sure if goFlow is meant to be embedded in other programs as there might not be a stable library API for other programs use.

Idea/Feature: Extensible decoder

Netflow, IPFIX support templates, which supports device, device-class, vendor specific fields.
These extended fields contain useful information, but it doesn't necessarily make sense to add to goflow's default list of supported fields.

The feature request is to have an extensible decoder for NFv9 and IPFIX, such that users are able to customise the decoder to support additional fields without having to maintain a full fledged fork of goflow.

As for implementation, I thought it could be :

  • Configuration driven system - which is good for simple mapping. (record field -> pb value)
  • Plugin system - decoder extension, which has access to the DataField and FlowMessage, which could enable complex transformations.
  • Other ideas.

What are your thoughts.

Add support for NetFlow v5

Adding support for NetFlow v5 format was discussed here but I wanted to put this feature request in its own issue.

Many routers still export in v5. It would allow a much more streamlined process to not require converting from v5 prior to ingesting.

If its any help there is a Verizon library that implements v5. I'm sure its not something you can copy over but it likely makes understanding the v5 format easier to have existing native Go code to look at.
https://github.com/VerizonDigital/vflow/tree/master/netflow/v5

Vlan id, no dst+src vlan

I was comparing flows received by this library and by nfcapd. It seems to me that in VlanId is the same content as would normally be in srcVlan field. Why is goflow designed to have VlanId only and not both src+dst vlan?

Performance impact question

Hi there folks, here's the picture, I'm receiving flows from different routers, for some reason I receive a "copy" of the flows sent to another collector (I'm just another exporter on IOS-XR) so I get data from ports that I don't really need and I can't change the network configuration.

In order to save disk space I decided to hardcode these ports into goflow (specifically SendKafkaFlowMessage func) so when the flow comes from port "X" it does not send it to Kafka, it just ignores the message.

And it does the job but I'm not quite sure if I may still be overloading the server, in networking when you discard a packet before processing it this does not represent a performance issue for the router, would this be the case too? perhaps the impact is meaningless and I should not worry at all?

My netflow traffic is considerable and continuously growing so I can't tell if the server is overloading due to traffic peaks or because I introduced a little bit more processing overhead.

I would appreciate some insight into this. Thank you.

Compiling FlowMessagePb proto

Hello,
i need to compile the flow.proto into a jar to pass it to a protobuf converter that will read it.
I've managed to do it by using protoc, my question is: since the protobuf converter expects to be provided with the compiled class name, should i provide com.cloudflare.net.flowagg.FlowMessagePb (but i get the error : not a valid proto3 class) or com.cloudflare.net.flowagg.FlowMessagePb$FlowMessage (which seems to proceed further but i'm getting the "Protocol message end-group tag did not match expected tag" error) ?

Issue in NetFlow processor stop logic

Noticed an issue in NetFlow decoder's stop logic which can result in-
fatal error: all goroutines are asleep - deadlock!

Below is the issue described in decoders/decoder.go code-

func (w Worker) Start() {   
	go func() {  
		//log.Debugf("Worker %v started", w.Id)  
		for {  
			// w.WorkerPool <- w.InMsg  -- Issue commented  
			select {  
			case <-w.Quit:  
				break  
			// case msg := <-w.InMsg:  -- Issue commented  
			case w.WorkerPool <- w.InMsg:  // -- Fix added  
			        msg := <-w.InMsg    // -- Fix added  
                                timeTrackStart := time.Now()  

Go playground for quick check-
https://play.golang.org/p/LnDFz8vT_4G

Thanks!

Memory leak

Hello!
I tried to use goflow to collect sflow statistics and write it to logstash (ELK stack) through kafka broker.

The pipeline is
goflow -> kafka -> logstash-input-kafka (protobuf codec) -> elastic

I get accurate statistics, but see a fast memory leak of goflow collector. My VM have 32 GB of RAM, which is enough for about 2 hours. The memory is freed after goflow service restart.

Does anyone know what could be the reason ?

Kafka Timestamp not set

Not sure wether this is a feature request or a bug.

Netflow packets that were written into a Kafka Topic don't have a Kafka Timestamp assigned. I expect that the current system timestamp is used.

If the timestamp is missing on purpopse, is there a way to get the current timestamp being used instead?

Data race in utils/netflow.go

When running goflow with the workers flag set to greater than 1, the application will sometimes crash due to a concurrent map read and write.

Running goflow with go build -race also highlights many data races in the same places.

I believe the issue is in utils/netflow.go DecodeFlow() where templates and sampling can be written to when only a RLock has been acquired from the RWMutex.

Netflow errors

I get the following error when trying to run

./goflow.go:66:2: undefined: initMetrics
./goflow.go:109:2: undefined: NetFlowTemplatesStats
./goflow.go:157:4: undefined: NetFlowErrors
./goflow.go:164:4: undefined: NetFlowErrors
./goflow.go:171:4: undefined: NetFlowErrors
./goflow.go:178:4: undefined: NetFlowErrors
./goflow.go:192:3: undefined: NetFlowStats
./goflow.go:202:5: undefined: NetFlowSetStatsSum
./goflow.go:210:5: undefined: NetFlowSetRecordsStatsSum
./goflow.go:219:5: undefined: NetFlowSetStatsSum
./goflow.go:219:5: too many errors

I am not sure why

V1.0 garble and V2.0 doesn't send result to Kafka.

  • I use two version of goflow to decode sflow data, and found the flowing problem:
./goflow-linux-x86_64  -kafka.out.brokers xxxxxxxa.com.cn:9092  -kafka.out.topic vflow -loglevel debug -sworkers 10


./goflow-v2.0.1-linux-x86_64   -kafka.out.brokers xxxxx.com.cn:9092  -kafka.out.topic vflow  -loglevel debug -sworkers 10
  • the first one send data to kafka, but it's all garbled.

  • the second can do decode, but doesn't send data to Kafka.

Not receiving sflow even though UDP requests are incoming

Running 2.0.4

./goflow -v
GoFlow v2.0.4

When started no flows are being displayed

./goflow -loglevel=debug  -kafka=false -netflow=false -sport=9992
INFO[0000] Starting GoFlow                               NetFlow=false sFlow=true
DEBU[0000] Starting workers                              Name=sFlow
INFO[0000] Listening on UDP :9992                        Type=sFlow

However when I run tcpdump -i enp175s0f1 udp and port 9992 I can see that sflow data are coming from that device through UDP.

Am I doing something wrong?

add xtra flag to send json on Kafka transport

The protobuffer has support for marshaling protobuffer messages as json: google.golang.org/protobuf/encoding/protojson

For some popular tools, like logstash (that are more text oriented), having json would be a better option as message format. (See logstash-plugins/logstash-codec-protobuf#15). At least it will make it a lot easier to share data between (legacy) systems.

Basically the impact would be minimum: in the Kafka transport the protobuffer message is marshaled as json format instead of wire format based on a flag. Fields of type bytes are then converted as base64 strings which is, for some systems, safer.

I've implemented a working version for this: gjelu@e2a0c56

Some extra required changes:

  • I am using protojson from the protobuf APIv2, so I need to convert the v1 message to a v2 message.
  • for this I need to upgrade to github.com/golang/[email protected]

I can do a PR if it's feasible that this can be accepted? Or if you have any extra comments on this?

DstIf/SrcIf vs InIf/OutIf naming

Hi, what is the reason for naming fields as DstIf and SrcIf? So far I have seen InIf and OutIf naming for the same thing. Thanks!

Flink code example?

Hi,

Is the flink code used to process the result published somewhere? Probably not to use it as-is (our big data guys want to use spark not flink) but to get some ideas. The RIPE presentations are awfully nice (thank you so much for them) but actual code examples would be better.

How much data can goflow handle ?

Hello are there any benchmarks about how much data can a single pod of goflow handle ?
Would it be reasonable to loadbalance data to different goflow pods provided every source network device sends data to a load balancer that will route data to the same goflow pod ? (so not to mix up flows from different sources to different golfow pods)

Avro vs Protobuf

Hi, I wonder whether there was a reason for Protobuf msg format and not Avro. In Kafka ecosystem Avro is AFAIK used more (supported by Schema registry etc.). Would Avro make eventually sense for goflow?

Thanks!

Support for new IPFIX features support

Raising this ticket to request support for features new to IPFIX as compared to Netflow V9-

  1. Support for variable length information element-
    ====== snip from RFC 7011 ========
  1. Variable-Length Information Element

The IPFIX Template mechanism is optimized for fixed-length
Information Elements [RFC7012]. Where an Information Element has a
variable length, the following mechanism MUST be used to carry the
length information for both the IANA-assigned and enterprise-specific
Information Elements.

In the Template Set, the Information Element Field Length is recorded
as 65535. This reserved length value notifies the Collecting Process
that the length value of the Information Element will be carried in
the Information Element content itself.

In most cases, the length of the Information Element will be less
than 255 octets. The following length-encoding mechanism optimizes
the overhead of carrying the Information Element length in this more
common case. The length is carried in the octet before the
Information Element, as shown in Figure R.

  0                   1                   2                   3
  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 | Length (< 255)|          Information Element                  |
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 |                      ... continuing as needed                 |
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

        Figure R: Variable-Length Information Element (IE)
                       (Length < 255 Octets)

The length may also be encoded into 3 octets before the Information
Element, allowing the length of the Information Element to be greater
than or equal to 255 octets. In this case, the first octet of the
Length field MUST be 255, and the length is carried in the second and
third octets, as shown in Figure S.

 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|      255      |      Length (0 to 65535)      |       IE      |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                      ... continuing as needed                 |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

        Figure S: Variable-Length Information Element (IE)
                    (Length 0 to 65535 Octets)

====================================

  1. Support for enterprise specific information elements-
    ==== snip from RFC 7011 =====
    3.2. Field Specifier Format

    Vendors need the ability to define proprietary Information Elements,
    because, for example, they are delivering a pre-standards product, or
    the Information Element is in some way commercially sensitive. This
    section describes the Field Specifier format for both IANA-registered
    Information Elements [IANA-IPFIX] and enterprise-specific Information
    Elements.

The Field Specifier format is shown in Figure G.

 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 |E|  Information Element ident. |        Field Length           |
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 |                      Enterprise Number                        |
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

                 Figure G: Field Specifier Format

Where:

E

  Enterprise bit.  This is the first bit of the Field Specifier.  If
  this bit is zero, the Information Element identifier identifies an
  Information Element in [IANA-IPFIX], and the four-octet Enterprise
  Number field MUST NOT be present.  If this bit is one, the
  Information Element identifier identifies an enterprise-specific
  Information Element, and the Enterprise Number field MUST be
  present.

Information Element identifier

  A numeric value that represents the Information Element.  Refer to
  [IANA-IPFIX].

Field Length

  The length of the corresponding encoded Information Element, in
  octets.  Refer to [IANA-IPFIX].  The Field Length may be smaller
  than that listed in [IANA-IPFIX] if the reduced-size encoding is
  used (see Section 6.2).  The value 65535 is reserved for variable-
  length Information Elements (see Section 7).

Enterprise Number

  IANA enterprise number [IANA-PEN] of the authority defining the
  Information Element identifier in this Template Record.

========================

Saw that these features are not supported currently. Please let me know if any plans to support these.

Thanks!

NetFlow does not appear in debug mode

Hi,
I'm trying out your goflow-solution, but I still don't get it to log NetFlow-Data.
When using sFlow it works just fine.
I did a tcpdump & wireshark to make sure that packets are arriving. And
it seems everything to be ok with the NetFlow-Agent from OpenvSwitch:

Frame 15: 162 bytes on wire (1296 bits), 162 bytes captured (1296 bits) on interface 0
Ethernet II, Src: PcsCompu_65:3b:b0 (08:00:27:65:3b:b0), Dst: PcsCompu_99:cc:11 (08:00:27:99:cc:11)
Internet Protocol Version 4, Src: 10.0.2.4, Dst: 10.0.2.15
User Datagram Protocol, Src Port: 44835, Dst Port: 2055
Cisco NetFlow/IPFIX
Version: 5
Count: 2
SysUptime: 2606.495000000 seconds
Timestamp: Oct 22, 2018 00:11:51.286745386 CEST
FlowSequence: 13
EngineType: Unknown (5)
EngineId: 5
00.. .... .... .... = SamplingMode: No sampling mode configured (0)
..00 0000 0000 0000 = SampleRate: 0
pdu 1/2
SrcAddr: 10.0.0.1
DstAddr: 10.0.0.2
NextHop: 0.0.0.0
InputInt: 1
OutputInt: 10
Packets: 5
Octets: 1004
[Duration: 0.015000000 seconds]
SrcPort: 40736
DstPort: 1234
Padding: 00
TCP Flags: 0x19
Protocol: TCP (6)
IP ToS: 0x00
SrcAS: 0
DstAS: 0
SrcMask: 0 (prefix: 0.0.0.0/32)
DstMask: 0 (prefix: 0.0.0.0/32)
Padding: 0000
pdu 2/2
SrcAddr: 10.0.0.2
DstAddr: 10.0.0.1
NextHop: 0.0.0.0
InputInt: 10
OutputInt: 1
Packets: 3
Octets: 206
[Duration: 0.008000000 seconds]
SrcPort: 1234
DstPort: 40736
Padding: 00
TCP Flags: 0x11
Protocol: TCP (6)
IP ToS: 0x00
SrcAS: 0
DstAS: 0
SrcMask: 0 (prefix: 0.0.0.0/32)
DstMask: 0 (prefix: 0.0.0.0/32)
Padding: 0000

I used you proposed docker image.
Any idea why goflow does not process those NetFlow-Messages?

Kind Regards
Lucas

Changed RouterAddr on UDP Load Balancer

Hi guys,
I'm collecting data from one router with goflow and everything works well, now I'm trying to collect data from 500 Routers.
My first question is:
Due to the network throughput, should I put a udp load balancer in front of goflow or can goflow handle this network traffic?...
And my second question:

               +------------+
               |            |
               |            |
               |            |  NGINX UDP Load Balancer
               |            |
               |            |
               +------+-----+
                      |
                      |
                      |
      +---------------+--------------+
      |                              |
+-----+-----+                 +------+------+
|           |                 |             |
|           |                 |             |
|           |  Collector 1    |             |  Collector N
|           |                 |             |
+-----------+                 +-------------+

I'm trying to achieve this architecture and at first sight it works, but the problem with this arch is that I'm getting the Load Balancer IP as a RouterAddr because it seems that goflow takes UDP source datagram as a RouterAddr.
Now I'm trying to pass the original client Ip to goflow instances whit this:
(important parts here)
user root;
stream {
upstream collectors {
server x.x.x.x:xxxx;
server x.x.x.x:xxxx;
}
server {
listen 5000 udp;
proxy_bind $remote_addr transparent;
proxy_pass collectors;
}
}

but did not work... even if this (UDP Load Balancer) could work, I'm not sure that I can get the RouterAddr from goflow :/

Any advice/help or another approach on this would be appreciated?

I'm using goflow with docker and this image id -> 4f84dcd62e08

Failed to start the container

While running the command docker run --net=host -ti cloudflare/goflow:latest -kafka=false
INFO[0000] Starting GoFlow
INFO[0000] Listening on UDP :2056 Type=NetFlowLegacy
INFO[0000] Listening on UDP :6343 Type=sFlow
INFO[0000] Listening on UDP :2055 Type=NetFlow
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x7c93a6]

goroutine 11 [running]:
github.com/cloudflare/goflow/utils.UDPRoutine(0xa06284, 0x7, 0xc000051420, 0x1, 0x0, 0x0, 0x807, 0x0, 0xac2760, 0xc000064960, ...)
/build/utils/utils.go:164 +0xff6
github.com/cloudflare/goflow/utils.(*StateNetFlow).FlowRoutine(0xc000020e00, 0x1, 0x0, 0x0, 0x807, 0x0, 0x2, 0x0)
/build/utils/netflow.go:348 +0x15a
main.main.func2(0xc000020e00, 0xc0000268a0)
/build/cmd/goflow/goflow.go:129 +0x21b
created by main.main
/build/cmd/goflow/goflow.go:124 +0x3e3

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.