Skip to content

Unmarshal errors cause consumer group restarts and slow consumption #39909

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
don-zur opened this issue May 6, 2025 · 3 comments
Open

Unmarshal errors cause consumer group restarts and slow consumption #39909

don-zur opened this issue May 6, 2025 · 3 comments
Labels
bug Something isn't working needs triage New item requiring triage receiver/kafka

Comments

@don-zur
Copy link

don-zur commented May 6, 2025

Component(s)

receiver/kafka

What happened?

Description

If topic contains messages that do not match the receivers desired encoding an unmarshal error is returned which causes the kafka consumer groups to restart. The result is that the collector appears to be blocked. While it is moving through offsets...it is very slow due to consumer group restart churn.

Steps to Reproduce

Create a topic and add messages in otlp_proto and otlp_json encoding. Start the collector and look at the log.

Expected Result

Unmarshalling errors should be logged and the consume loop should continue to next message with out restarting the kafka consumer groups.

Actual Result

Unmarshalling errors cause the kafka consumer groups to be restarted. The result is that the collector appears to be blocked. While it is moving through offsets it can take hours to move through messages encoded incorrectly.

Collector version

v0.120.1

Environment information

Environment

OS: MacOS
go version go1.24.2 darwin/arm64

OpenTelemetry Collector configuration

receivers:
    kafka:
      brokers: "localhost:9093"
      topic: "otel-mixed-traces"
      encoding: "otlp_proto"
      group_id: "otel-mixed-traces-cg-foo"
      initial_offset: earliest

      message_marking:  
        after: true
        on_error: true

      auth:
        tls:
          ca_file: ./certs/ca_cert
          cert_file: ./certs/cert.crt
          key_file: ./certs/cert.key

processors:
  batch:
    timeout: 5s
    send_batch_size: 5000
  memory_limiter:
    check_interval: 1s
    limit_percentage: 95
    limit_mib: 24000
    spike_limit_percentage: 20


service:
  extensions: [ health_check ]
  telemetry:
    metrics:
      address: 0.0.0.0:8888
  pipelines:
    traces:
      receivers: [ kafka ]
      processors: [ memory_limiter, batch ]
      exporters: [ debug ]

exporters:
  debug:
    verbosity: basic

extensions:
  health_check:
  pprof:
  zpages:

Log output

2025-05-05T20:50:22.880-0600	error	kafkareceiver/kafka_receiver.go:580	failed to unmarshal message	{"otelcol.component.id": "kafka", "otelcol.component.kind": "Receiver", "otelcol.signal": "traces", "error": "proto: illegal wireType 6"}
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver.(*tracesConsumerGroupHandler).ConsumeClaim
	/Users/[email protected]/projects/maple/opentelemetry-collector-contrib/receiver/kafkareceiver/kafka_receiver.go:580
github.com/IBM/sarama.(*consumerGroupSession).consume
	/Users/[email protected]/go/pkg/mod/github.com/!i!b!m/[email protected]/consumer_group.go:952
github.com/IBM/sarama.newConsumerGroupSession.func2
	/Users/[email protected]/go/pkg/mod/github.com/!i!b!m/[email protected]/consumer_group.go:877
2025-05-05T20:50:30.341-0600	info	kafkareceiver/kafka_receiver.go:551	Starting consumer group	{"otelcol.component.id": "kafka", "otelcol.component.kind": "Receiver", "otelcol.signal": "traces", "partition": 0}
2025-05-05T20:50:30.341-0600	info	kafkareceiver/kafka_receiver.go:551	Starting consumer group	{"otelcol.component.id": "kafka", "otelcol.component.kind": "Receiver", "otelcol.signal": "traces", "partition": 2}
2025-05-05T20:50:30.343-0600	info	kafkareceiver/kafka_receiver.go:551	Starting consumer group	{"otelcol.component.id": "kafka", "otelcol.component.kind": "Receiver", "otelcol.signal": "traces", "partition": 1}
2025-05-05T20:50:30.462-0600	info	kafkareceiver/kafka_receiver.go:551	Starting consumer group	{"otelcol.component.id": "kafka", "otelcol.component.kind": "Receiver", "otelcol.signal": "traces", "partition": 3}
2025-05-05T20:50:34.224-0600	error	kafkareceiver/kafka_receiver.go:580	failed to unmarshal message	{"otelcol.component.id": "kafka", "otelcol.component.kind": "Receiver", "otelcol.signal": "traces", "error": "proto: illegal wireType 6"}
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver.(*tracesConsumerGroupHandler).ConsumeClaim
	/Users/[email protected]/projects/maple/opentelemetry-collector-contrib/receiver/kafkareceiver/kafka_receiver.go:580
github.com/IBM/sarama.(*consumerGroupSession).consume
	/Users/[email protected]/go/pkg/mod/github.com/!i!b!m/[email protected]/consumer_group.go:952
github.com/IBM/sarama.newConsumerGroupSession.func2
	/Users/[email protected]/go/pkg/mod/github.com/!i!b!m/[email protected]/consumer_group.go:877
2025-05-05T20:50:42.801-0600	info	kafkareceiver/kafka_receiver.go:551	Starting consumer group	{"otelcol.component.id": "kafka", "otelcol.component.kind": "Receiver", "otelcol.signal": "traces", "partition": 1}
2025-05-05T20:50:42.802-0600	info	kafkareceiver/kafka_receiver.go:551	Starting consumer group	{"otelcol.component.id": "kafka", "otelcol.component.kind": "Receiver", "otelcol.signal": "traces", "partition": 2}
2025-05-05T20:50:42.803-0600	info	kafkareceiver/kafka_receiver.go:551	Starting consumer group	{"otelcol.component.id": "kafka", "otelcol.component.kind": "Receiver", "otelcol.signal": "traces", "partition": 0}
2025-05-05T20:50:42.926-0600	info	kafkareceiver/kafka_receiver.go:551	Starting consumer group	{"otelcol.component.id": "kafka", "otelcol.component.kind": "Receiver", "otelcol.signal": "traces", "partition": 3}

Additional context

No response

@don-zur don-zur added bug Something isn't working needs triage New item requiring triage labels May 6, 2025
Copy link
Contributor

github-actions bot commented May 6, 2025

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

@don-zur
Copy link
Author

don-zur commented May 6, 2025

/label receiver/kafka -exporter/kafka

Copy link
Contributor

github-actions bot commented May 6, 2025

Pinging code owners for receiver/kafka: @pavolloffay @MovieStoreGuy @axw. See Adding Labels via Comments if you do not have permissions to add labels yourself. For example, comment '/label priority:p2 -needs-triaged' to set the priority and remove the needs-triaged label.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working needs triage New item requiring triage receiver/kafka
Projects
None yet
Development

No branches or pull requests

1 participant