Unlimited services in the new APM plans! Monitor unlimited services with the new APM plans!

Kafka Producer and Consumer Instrumentation #

Since 2.1.4

Overview #

The Kafka Producer and Consumer instrumentation creates automatic Spans for all send operations on the Producer side, and provides a small API to create Spans for message processing on the consumer side.

In the example trace below, the producer.send Span is automatically created on the producer side, and the consumer.process Span is manually created on the consumer side, using the runWithConsumerSpan helper function.

Producer Spans #

A producer.send Span is automatically created for every producer record sent to Kafka, without any manual intervention. All producer Spans are tagged with:

  • component: With the value kafka.producer.
  • kafka.client-id: With the producer client id assigned by Kafka.
  • kafka.key: With the record key, if any.
  • kafka.topic: With the name of the destination topic for the record.
  • kafka.partition: With the partition number assigned to the record.

In addition to creating the producer Span, the instrumentation will also include a binary representation of the current context on the kctx (short for Kamon Context) header. The kctx header is required for context propagation and distributed tracing to work, so, don’t drop it!

Consumer Spans #

It is necessary to make a small code change on the consumer side if you want to trace how long it takes to process incoming records and get access to the context propagated from the consumer side. The helper functions for creating consumer Spans are part of the KafkaInstrumentation class.

All consumer spans created by the helper functions are tagged with:

  • component: With the value kafka.consumer.
  • kafka.client-id: With the consumer client id assigned by Kafka.
  • kafka.group-id: With the consumer group id, if any.
  • kafka.key: With the record key, if any.
  • kafka.topic: With the name of the destination topic for the record.
  • kafka.partition: With the partition number assigned to the record.
  • kafka.offset: With the consumed record’s offset.
  • kafka.poll-time: With the time it took for the poll operation that fetched the record to complete.
  • kafka.timestamp: With the timestamp associated with the consumer record.
  • kafka.timestamp-type: With the type of timestamp associated with the consumer record.

Continuing or Starting Traces #

Depending on your use case, you might want the consumer Spans to be part of the same trace as the producer Spans, or to start a new trace of their own and get a link to the producer trace. You can switch between these two behaviors using the continue-trace-on-consumer setting:

kamon.instrumentation.kafka {
  client.tracing {
    continue-trace-on-consumer = yes
  }
}

As rule of thumb, when your producer and consumer applications are part of a real time processing pipeline, you will want to keep the producer and consumer Spans in the same trace. Otherwise, letting the consumer create its own trace with a link to the producer trace is the way to go.

Creating Consumer Spans #

Using the runWithConsumerSpan Function

You can wrap your record processing logic with the KafkaInstrumentation.runWithConsumerSpan function to create a Span for each processed consumer record:

records.iterator().forEachRemaining(record => runWithConsumerSpan(record) {
  // Your record processing logic goes here...  
})

By default, the consumer Span is named consumer.process and is finished as soon as the provided code block finishes executing. There are different versions of the runWithConsumerSpan function that let you change the operation name and decide whether the Span should be finished after executing the processing logic or not.

Using the consumerSpan Function

If the runWithConsumerSpan function doesn’t match your execution model, you can use the consumerSpan function to create a new consumer Span with all the tags and info described above, but you will be in charge of managing that Span programmatically.

You probably will want to use the Kamon.runWithSpan function (or similar) when creating spans like this.

Accessing the Incoming Context

When you are only interested in getting access to the incoming context, you can use the extractContext function or import the KafkaInstrumentation.Syntax implicit class:

// Extracting the incoming Context from a consumer record:
val incomingContext = KafkaInstrumentation.extractContext(record)

// Or, importing KafkaInstrumentation.Syntax
val incomingContext = record.context

Manual Installation #

In case you are not using the Kamon Bundle, add the dependency below to your build.


libraryDependencies += "io.kamon" %% "kamon-kafka" % "2.2.3"



    <dependency>
      <groupId>io.kamon</groupId>
      <artifactId>kamon-kafka_2.13</artifactId>
      <version>2.2.3</version>
    </dependency>


implementation 'io.kamon:kamon-kafka_2.13:2.2.3'

You must start your application with the instrumentation agent for this module to work properly.