Skip to content

Commit bc3aca7

Browse files
committed
[connector/routing] Mark MutatesData: true
1 parent 39913af commit bc3aca7

File tree

9 files changed

+45
-7
lines changed

9 files changed

+45
-7
lines changed
+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: 'bug_fix'
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: routingconnector
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: The connector splits the original payload so that it may be emitted in parts to each route.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [37390]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

connector/routingconnector/logs.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func newLogsConnector(
6161
}
6262

6363
func (c *logsConnector) Capabilities() consumer.Capabilities {
64-
return consumer.Capabilities{MutatesData: false}
64+
return consumer.Capabilities{MutatesData: true}
6565
}
6666

6767
func (c *logsConnector) ConsumeLogs(ctx context.Context, ld plog.Logs) error {

connector/routingconnector/logs_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,7 @@ func TestLogsConnectorCapabilities(t *testing.T) {
413413
)
414414

415415
require.NoError(t, err)
416-
assert.False(t, conn.Capabilities().MutatesData)
416+
assert.True(t, conn.Capabilities().MutatesData)
417417
}
418418

419419
func TestLogsConnectorDetailed(t *testing.T) {

connector/routingconnector/metrics.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func newMetricsConnector(
6262
}
6363

6464
func (c *metricsConnector) Capabilities() consumer.Capabilities {
65-
return consumer.Capabilities{MutatesData: false}
65+
return consumer.Capabilities{MutatesData: true}
6666
}
6767

6868
func (c *metricsConnector) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {

connector/routingconnector/metrics_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ func TestMetricsConnectorCapabilities(t *testing.T) {
435435
)
436436

437437
require.NoError(t, err)
438-
assert.False(t, conn.Capabilities().MutatesData)
438+
assert.True(t, conn.Capabilities().MutatesData)
439439
}
440440

441441
func TestMetricsConnectorDetailed(t *testing.T) {

connector/routingconnector/traces.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func newTracesConnector(
6161
}
6262

6363
func (*tracesConnector) Capabilities() consumer.Capabilities {
64-
return consumer.Capabilities{MutatesData: false}
64+
return consumer.Capabilities{MutatesData: true}
6565
}
6666

6767
func (c *tracesConnector) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {

connector/routingconnector/traces_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ func TestTraceConnectorCapabilities(t *testing.T) {
393393
)
394394

395395
require.NoError(t, err)
396-
assert.False(t, conn.Capabilities().MutatesData)
396+
assert.True(t, conn.Capabilities().MutatesData)
397397
}
398398

399399
func TestTracesConnectorDetailed(t *testing.T) {

pkg/stanza/fileconsumer/file.go

+5
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,12 @@ func (m *Manager) startPoller(ctx context.Context) {
105105
case <-ctx.Done():
106106
return
107107
case <-globTicker.C:
108+
m.set.Logger.Debug("tick")
108109
}
109110

111+
m.set.Logger.Debug("poll start")
110112
m.poll(ctx)
113+
m.set.Logger.Debug("poll end")
111114
}
112115
}()
113116
}
@@ -248,11 +251,13 @@ func (m *Manager) newReader(ctx context.Context, file *os.File, fp *fingerprint.
248251
zap.String("rotated_path", file.Name()))
249252
}
250253
}
254+
m.set.Logger.Debug("Found already open file", zap.Int64("offset", oldReader.Offset))
251255
return m.readerFactory.NewReaderFromMetadata(file, oldReader.Close())
252256
}
253257

254258
// Check for closed files for match
255259
if oldMetadata := m.tracker.GetClosedFile(fp); oldMetadata != nil {
260+
m.set.Logger.Debug("Found previously closed file", zap.Int64("offset", oldMetadata.Offset))
256261
r, err := m.readerFactory.NewReaderFromMetadata(file, oldMetadata)
257262
if err != nil {
258263
return nil, err

pkg/stanza/operator/parser/jsonarray/parser.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"fmt"
99

1010
"github.com/valyala/fastjson"
11+
"go.uber.org/zap"
1112

1213
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
1314
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
@@ -23,7 +24,12 @@ type parseFunc func(any) (any, error)
2324

2425
// Process will parse an entry for json array.
2526
func (p *Parser) Process(ctx context.Context, e *entry.Entry) error {
26-
return p.ParserOperator.ProcessWith(ctx, e, p.parse)
27+
err := p.ParserOperator.ProcessWith(ctx, e, p.parse)
28+
if err == nil {
29+
return nil
30+
}
31+
p.Logger().Error("json_array_parser", zap.Error(err))
32+
return err
2733
}
2834

2935
func generateParseToArrayFunc(pool *fastjson.ParserPool) parseFunc {

0 commit comments

Comments
 (0)