Coder Social home page Coder Social logo

mmynk / otel-kafka-poc Goto Github PK

View Code? Open in Web Editor NEW
0.0 1.0 0.0 136 KB

A POC for collecting, publishing, receiving and processing metrics using OpenTelemety with Kafka.

Go 100.00%
kafka kafka-exporter opentelemetry opentelemetry-collector opentelemetry-go metrics

otel-kafka-poc's Introduction

otel-kafka-poc

Recently at my "day job", I have been exploring OpenTelemetry as part of our observability pipeline. I have studied the OpenTelemetry Collector in some detail, and have been impressed with its capabilities and the ease of setup. Although I would be remiss if I didn't mention the project is very much still in its infancy.

OpenTelemetry Collector

The OpenTelemetry Collector offers a vendor-agnostic implementation of how to receive, process and export telemetry data. It removes the need to run, operate, and maintain multiple agents/collectors. OpenTelemetry Architecture

Without diving too deep into the architecture, the collector is composed of three main components:

  • Receivers: scrapes telemetry data at regular intervals from a source
  • Processors: transforms the data as needed
  • Exporters: sends the data to a destination

Building a custom OpenTelemetry Collector

For this POC, I wanted to build a custom collector that scrapes virtual memory metrics and exports them to Kafka. Let's see how we can leverage the OpenTelemetry Collector to build this pipeline.

Step 0: Install the prerequisites

Step 1: Define the receiver

The first component we need to build is the receiver. We are calling it vmreceiver for obvious reasons. Any otel (OpenTelemetry) component is defined using a metadata.yaml file. This file defines the metrics (or logs or traces) that the component will collect and their dimensions (attributes).

Let's create a metadata.yaml file for our receiver at receivers/vmreceiver/:

type: vmstats

status:
  class: receiver
  stability:
    alpha: [metrics]
  distributions: [contrib]

attributes:
  hostname:
    description: Hostname of the machine
    type: string
  ...

metrics:
  swapped:
    enabled: true
    description: Amount of virtual memory used
    type: bytes
    gauge:
      value_type: int
    attributes:
      - hostname
  free:
    enabled: true
    description: Amount of idle memory
    unit: bytes
    gauge:
      value_type: int
    attributes:
      - hostname
  ...

The entire schema is defined here.

Now that we have defined the receiver metadata, we can generate the boilerplate code for the receiver using otel tool mdatagen:

$ cd receivers/vmreceiver
$ mdatagen metadata.yaml

Now we are ready to write our receiver code. We will use vmstat to scrape the virtual memory metrics. Let's start by defining the config which contains any tuning parameters for the receiver:

// config.go

type Config struct {
	Delay int `mapstructure:",omitempty"` // Delay is the delay between consecutive `vmstat` calls.
	Count int `mapstructure:",omitempty"` // Count is the number of `vmstat` calls to make.

	metadata.MetricsBuilderConfig           `mapstructure:",squash"` // MetricsBuilderConfig to enable/disable specific metrics (default: all enabled)
	scraperhelper.ScraperControllerSettings `mapstructure:",squash"` // ScraperControllerSettings to configure scraping interval (default: scrape every second)
}

The last two fields are embedded structs that we can use to configure the metrics and the scraping interval.

Let's now write a simple vmstat wrapper that will scrape the metrics.

// stat.go

type vmStat struct {
    Swapped         int64
    Free            int64
}

type vmStatReader struct {
    delay int
    count int
    
    logger *zap.Logger
}

func (r *vmStatReader) Read() (*vmStat, error) {
    cmd := exec.Command("vmstat", fmt.Sprintf("%d", r.delay), fmt.Sprintf("%d", r.count))
    out, err := cmd.Output()
    if err != nil {
        r.logger.Error("failed to execute vmstat", zap.Error(err))
        return nil, err
    }
    return r.parse(out)
}

func (r *vmStatReader) parse(out []byte) (*vmStat, error) {
	// parse the output of vmstat
}

Simple enough. Now let's plugin our vmStatReader into the receiver.

// scraper.go

type scraper struct {
	logger         *zap.Logger              // Logger to log events
	metricsBuilder *metadata.MetricsBuilder // MetricsBuilder to build metrics
	reader         *vmStatReader            // vmStatReader to read vmstat output
}

func newScraper(cfg *Config, metricsBuilder *metadata.MetricsBuilder, logger *zap.Logger) *scraper {
	return &scraper{
		logger:         logger,
		metricsBuilder: metricsBuilder,
		reader:         newVmStatReader(cfg, logger),
	}
}

func (s *scraper) scrape(_ context.Context) (pmetric.Metrics, error) {
	vmStat, err := s.reader.Read()
	if err != nil {
		return pmetric.Metrics{}, err
	}
	attr := newAttributeReader(s.logger).getAttributes()
	s.recordVmStats(vmStat, attr)
	return s.metricsBuilder.Emit(), nil
}

func (s *scraper) recordVmStats(stat *vmStat, attr *attributes) {
	now := pcommon.NewTimestampFromTime(time.Now())

	s.metricsBuilder.RecordSwappedDataPoint(now, stat.Swapped, attr.host, attr.os, attr.arch, "memory")
	s.metricsBuilder.RecordFreeDataPoint(now, stat.Free, attr.host, attr.os, attr.arch, "memory")
}

Now finally, we can define a Factory that will be the entrypoint for the receiver.

// factory.go

func NewFactory() receiver.Factory {
	return receiver.NewFactory(
		metadata.Type,
		createDefaultConfig,
		receiver.WithMetrics(CreateVmStatReceiver, component.StabilityLevelDevelopment),
	)
}

func CreateVmStatReceiver(
	_ context.Context,
	settings receiver.CreateSettings,
	cfg component.Config,
	consumer consumer.Metrics,
) (receiver.Metrics, error) {
	logger := settings.Logger
	config, ok := cfg.(*Config)
	if !ok {
		em := "failed to cast to type Config"
		logger.Error(em)
		return nil, fmt.Errorf(em)
	}

	mb := metadata.NewMetricsBuilder(config.MetricsBuilderConfig, settings)

	ns := newScraper(config, mb, logger)
	scraper, err := scraperhelper.NewScraper(metadata.Type, ns.scrape)
	if err != nil {
		logger.Error("failed to create scraper", zap.Error(err))
		return nil, err
	}

	return scraperhelper.NewScraperControllerReceiver(
		&config.ScraperControllerSettings,
		settings,
		consumer,
		scraperhelper.AddScraper(scraper),
	)
}

Well, that's about it. Our receiver is ready to start scraping metrics.

Step 2: Define the configs

We need two configuration files for an OpenTelemetry Collector (obviously, the names are arbitrary):

  • builder-config.yaml: defines the components of the collector
  • otelcol.yaml: defines the configuration of each of the components
# builder-config.yaml
receivers:
  - gomod: github.com/mmynk/otel-kafka-poc/receivers/vmreceiver v0.0.1
    import: github.com/mmynk/otel-kafka-poc/receivers/vmreceiver
    name: 'vmreceiver'
    path: './receivers/vmreceiver'
# otelcol.yaml

receivers:
  vmstats:
    collection_interval: 10s
    delay: 2
    count: 2

Before we write our own exporter, we can actually deploy our collector by using a pre-built exporter from the OpenTelemetry Collector Contrib repo. Let's add the Prometheus exporter to our pipeline.

# builder-config.yaml
exporters:
  - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusexporter v0.92.0

# otelcol.yaml
exporters:
  prometheus:
    endpoint: 0.0.0.0:8889

service:
    pipelines:
        metrics:
        receivers: [vmstats]
        exporters: [prometheus]

Step 3: Deploy the collector

Let's fire up the collector and see if it works.

$ builder --config builder-config.yaml
...
2024-01-22T01:02:53.018Z	INFO	builder/main.go:121	Compiled	{"binary": "./otelcol-dev/otelcol"}

This will generate a binary otelcol-dev/otelcol in the current directory.

If everything went well, we should be able to run the binary.

$ ./otelcol-dev/otelcol --config otelcol.yaml
...
2024-01-22T01:04:20.061Z	info	[email protected]/telemetry.go:159	Serving metrics	{"address": ":8888", "level": "Basic"}
...
2024-01-22T01:04:20.062Z	info	[email protected]/service.go:177	Everything is ready. Begin running and processing data.

We should now be able to see the metrics at http://localhost:8888/metrics.

Yay! We have successfully built a custom collector.

Step 4: Build a custom exporter

TODO

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.