Skip to content

Commit 1033b02

Browse files
committed
pass publishers interface instead of extensions
1 parent ad62aab commit 1033b02

File tree

9 files changed

+40
-24
lines changed

9 files changed

+40
-24
lines changed

pdata/publisher.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111

1212
type ComponentID string
1313

14-
// Publisher is used by components to publish data to the RemoteTap extension.
14+
// Publisher is used by components to publish data as it passes through the pipeline.
1515
type Publisher interface {
1616
// IsActive returns true when at least one connection is open for the given componentID.
1717
IsActive(ComponentID) bool

processor/internal/processor.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@
33

44
package internal // import "go.opentelemetry.io/collector/processor/internal"
55

6-
import "go.opentelemetry.io/collector/component"
6+
import (
7+
"go.opentelemetry.io/collector/component"
8+
"go.opentelemetry.io/collector/pdata"
9+
)
710

811
// Settings is passed to Create* functions in Factory.
912
type Settings struct {
@@ -15,6 +18,6 @@ type Settings struct {
1518
// BuildInfo can be used by components for informational purposes
1619
BuildInfo component.BuildInfo
1720

18-
// Extensions can be used by components to interact with extensions
19-
Extensions func() map[component.ID]component.Component
21+
// Publishers can be used by components to publish data after processing
22+
Publishers []pdata.Publisher
2023
}

processor/processorhelper/logs.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -42,17 +42,17 @@ func NewLogsProcessor(
4242

4343
eventOptions := spanAttributes(set.ID)
4444
bs := fromOptions(options)
45-
remoteTapType, _ := component.NewType("remotetap")
46-
remoteTapId := component.NewID(remoteTapType)
4745
logsConsumer, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error {
4846
span := trace.SpanFromContext(ctx)
4947
span.AddEvent("Start processing.", eventOptions)
5048
var err error
5149
ld, err = logsFunc(ctx, ld)
5250

53-
// Publish logs to the remotetap extension if active.
54-
if remotetap := set.Extensions()[remoteTapId]; remotetap != nil && remotetap.(pdata.Publisher).IsActive(pdata.ComponentID(set.ID.String())) {
55-
remotetap.(pdata.Publisher).PublishLogs(pdata.ComponentID(set.ID.String()), ld)
51+
// Publish data to active streams.
52+
for _, p := range set.Publishers {
53+
if p.IsActive(pdata.ComponentID(set.ID.String())) {
54+
p.PublishLogs(pdata.ComponentID(set.ID.String()), ld)
55+
}
5656
}
5757

5858
span.AddEvent("End processing.", eventOptions)

processor/processorhelper/metrics.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -42,17 +42,17 @@ func NewMetricsProcessor(
4242

4343
eventOptions := spanAttributes(set.ID)
4444
bs := fromOptions(options)
45-
remoteTapType, _ := component.NewType("remotetap")
46-
remoteTapId := component.NewID(remoteTapType)
4745
metricsConsumer, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error {
4846
span := trace.SpanFromContext(ctx)
4947
span.AddEvent("Start processing.", eventOptions)
5048
var err error
5149
md, err = metricsFunc(ctx, md)
5250

53-
// Publish metrics to the remotetap extension if active.
54-
if remotetap := set.Extensions()[remoteTapId]; remotetap != nil && remotetap.(pdata.Publisher).IsActive(pdata.ComponentID(set.ID.String())) {
55-
remotetap.(pdata.Publisher).PublishMetrics(pdata.ComponentID(set.ID.String()), md)
51+
// Publish data to active streams.
52+
for _, p := range set.Publishers {
53+
if p.IsActive(pdata.ComponentID(set.ID.String())) {
54+
p.PublishMetrics(pdata.ComponentID(set.ID.String()), md)
55+
}
5656
}
5757

5858
span.AddEvent("End processing.", eventOptions)

processor/processorhelper/traces.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -42,17 +42,17 @@ func NewTracesProcessor(
4242

4343
eventOptions := spanAttributes(set.ID)
4444
bs := fromOptions(options)
45-
remoteTapType, _ := component.NewType("remotetap")
46-
remoteTapId := component.NewID(remoteTapType)
4745
traceConsumer, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error {
4846
span := trace.SpanFromContext(ctx)
4947
span.AddEvent("Start processing.", eventOptions)
5048
var err error
5149
td, err = tracesFunc(ctx, td)
5250

53-
// Publish traces to the remotetap extension if active.
54-
if remotetap := set.Extensions()[remoteTapId]; remotetap != nil && remotetap.(pdata.Publisher).IsActive(pdata.ComponentID(set.ID.String())) {
55-
remotetap.(pdata.Publisher).PublishTraces(pdata.ComponentID(set.ID.String()), td)
51+
// Publish data to active streams.
52+
for _, p := range set.Publishers {
53+
if p.IsActive(pdata.ComponentID(set.ID.String())) {
54+
p.PublishTraces(pdata.ComponentID(set.ID.String()), td)
55+
}
5656
}
5757

5858
span.AddEvent("End processing.", eventOptions)

service/internal/graph/graph.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"go.opentelemetry.io/collector/connector"
3030
"go.opentelemetry.io/collector/consumer"
3131
"go.opentelemetry.io/collector/internal/fanoutconsumer"
32+
"go.opentelemetry.io/collector/pdata"
3233
"go.opentelemetry.io/collector/service/internal/builders"
3334
"go.opentelemetry.io/collector/service/internal/capabilityconsumer"
3435
"go.opentelemetry.io/collector/service/internal/status"
@@ -50,7 +51,7 @@ type Settings struct {
5051

5152
ReportStatus status.ServiceStatusFunc
5253

53-
Extensions func() map[component.ID]component.Component
54+
Publishers []pdata.Publisher
5455
}
5556

5657
type Graph struct {
@@ -289,7 +290,7 @@ func (g *Graph) buildComponents(ctx context.Context, set Settings) error {
289290
err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ReceiverBuilder, g.nextConsumers(n.ID()))
290291
case *processorNode:
291292
// nextConsumers is guaranteed to be length 1. Either it is the next processor or it is the fanout node for the exporters.
292-
err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ProcessorBuilder, set.Extensions, g.nextConsumers(n.ID())[0])
293+
err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ProcessorBuilder, set.Publishers, g.nextConsumers(n.ID())[0])
293294
case *exporterNode:
294295
err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ExporterBuilder)
295296
case *connectorNode:

service/internal/graph/host.go

+11
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"go.opentelemetry.io/collector/component/componentstatus"
1414
"go.opentelemetry.io/collector/extension"
1515
"go.opentelemetry.io/collector/featuregate"
16+
"go.opentelemetry.io/collector/pdata"
1617
"go.opentelemetry.io/collector/service/extensions"
1718
"go.opentelemetry.io/collector/service/internal/builders"
1819
"go.opentelemetry.io/collector/service/internal/status"
@@ -64,6 +65,16 @@ func (host *Host) GetExtensions() map[component.ID]component.Component {
6465
return host.ServiceExtensions.GetExtensions()
6566
}
6667

68+
func (host *Host) GetPublishers() []pdata.Publisher {
69+
publishers := make([]pdata.Publisher, 0)
70+
for _, v := range host.ServiceExtensions.GetExtensions() {
71+
if publisher, ok := v.(pdata.Publisher); ok {
72+
publishers = append(publishers, publisher)
73+
}
74+
}
75+
return publishers
76+
}
77+
6778
// Deprecated: [0.79.0] This function will be removed in the future.
6879
// Several components in the contrib repository use this function so it cannot be removed
6980
// before those cases are removed. In most cases, use of this function can be replaced by a

service/internal/graph/nodes.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"go.opentelemetry.io/collector/consumer"
1515
"go.opentelemetry.io/collector/exporter"
1616
"go.opentelemetry.io/collector/internal/fanoutconsumer"
17+
"go.opentelemetry.io/collector/pdata"
1718
"go.opentelemetry.io/collector/processor"
1819
"go.opentelemetry.io/collector/receiver"
1920
"go.opentelemetry.io/collector/service/internal/builders"
@@ -132,11 +133,11 @@ func (n *processorNode) buildComponent(ctx context.Context,
132133
tel component.TelemetrySettings,
133134
info component.BuildInfo,
134135
builder builders.Processor,
135-
extensions func() map[component.ID]component.Component,
136+
publishers []pdata.Publisher,
136137
next baseConsumer,
137138
) error {
138139
tel.Logger = components.ProcessorLogger(tel.Logger, n.componentID, n.pipelineID)
139-
set := processor.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info, Extensions: extensions}
140+
set := processor.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info, Publishers: publishers}
140141
var err error
141142
switch n.pipelineID.Type() {
142143
case component.DataTypeTraces:

service/service.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ func (srv *Service) initGraph(ctx context.Context, cfg Config) error {
371371
ConnectorBuilder: srv.host.Connectors,
372372
PipelineConfigs: cfg.Pipelines,
373373
ReportStatus: srv.host.Reporter.ReportStatus,
374-
Extensions: srv.host.ServiceExtensions.GetExtensions,
374+
Publishers: srv.host.GetPublishers(),
375375
}); err != nil {
376376
return fmt.Errorf("failed to build pipelines: %w", err)
377377
}

0 commit comments

Comments
 (0)