Skip to content

Commit 6a425b5

Browse files
committed
Implemented merged context with link
1 parent 2493b6e commit 6a425b5

File tree

4 files changed

+161
-4
lines changed

4 files changed

+161
-4
lines changed

.chloggen/merged_context.yaml

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Link batcher context to all batched request's span contexts.
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12212, 8122]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [user]

exporter/exporterhelper/internal/batcher/default_batcher.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717
)
1818

1919
type batch struct {
20-
ctx context.Context
20+
ctx mergedContext
2121
req request.Request
2222
done multiDone
2323
}
@@ -86,7 +86,7 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
8686
// Do not flush the last item and add it to the current batch.
8787
reqList = reqList[:len(reqList)-1]
8888
qb.currentBatch = &batch{
89-
ctx: ctx,
89+
ctx: NewMergedContext(ctx),
9090
req: lastReq,
9191
done: multiDone{done},
9292
}
@@ -120,9 +120,10 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
120120
// - Last result may not have enough data to be flushed.
121121

122122
// Logic on how to deal with the current batch:
123-
// TODO: Deal with merging Context.
124123
qb.currentBatch.req = reqList[0]
125124
qb.currentBatch.done = append(qb.currentBatch.done, done)
125+
qb.currentBatch.ctx = qb.currentBatch.ctx.Merge(qb.currentBatch.ctx)
126+
126127
// Save the "currentBatch" if we need to flush it, because we want to execute flush without holding the lock, and
127128
// cannot unlock and re-lock because we are not done processing all the responses.
128129
var firstBatch *batch
@@ -141,7 +142,7 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
141142
// Do not flush the last item and add it to the current batch.
142143
reqList = reqList[:len(reqList)-1]
143144
qb.currentBatch = &batch{
144-
ctx: ctx,
145+
ctx: NewMergedContext(ctx),
145146
req: lastReq,
146147
done: multiDone{done},
147148
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package batcher // import "go.opentelemetry.io/collector/exporter/internal/queue"
5+
import (
6+
"context"
7+
"time"
8+
9+
"go.opentelemetry.io/otel/trace"
10+
)
11+
12+
// mergedContext links the underlying context to all incoming span contexts.
13+
type mergedContext struct {
14+
deadline time.Time
15+
deadlineOk bool
16+
ctx context.Context
17+
}
18+
19+
func NewMergedContext(ctx context.Context) mergedContext {
20+
deadline, ok := ctx.Deadline()
21+
return mergedContext{
22+
deadline: deadline,
23+
deadlineOk: ok,
24+
ctx: ctx,
25+
}
26+
}
27+
28+
// Merge links the span from incoming context to the span from the first context.
29+
func (mc *mergedContext) Merge(other context.Context) mergedContext {
30+
deadline, deadlineOk := mc.Deadline()
31+
if otherDeadline, ok := other.Deadline(); ok {
32+
deadlineOk = true
33+
if deadline.Before(otherDeadline) {
34+
deadline = otherDeadline
35+
}
36+
}
37+
38+
link := trace.LinkFromContext(other)
39+
trace.SpanFromContext(mc.ctx).AddLink(link)
40+
return mergedContext{
41+
deadline: deadline,
42+
deadlineOk: deadlineOk,
43+
ctx: mc.ctx,
44+
}
45+
}
46+
47+
// Deadline returns the latest deadline of all context.
48+
func (mc mergedContext) Deadline() (time.Time, bool) {
49+
return mc.deadline, mc.deadlineOk
50+
}
51+
52+
func (mc mergedContext) Done() <-chan struct{} {
53+
return nil
54+
}
55+
56+
func (mc mergedContext) Err() error {
57+
return nil
58+
}
59+
60+
// Value delegates to the first context.
61+
func (mc mergedContext) Value(key any) any {
62+
return mc.ctx.Value(key)
63+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package batcher
5+
6+
import (
7+
"context"
8+
"testing"
9+
"time"
10+
11+
"github.com/stretchr/testify/require"
12+
"go.opentelemetry.io/collector/component/componenttest"
13+
sdktrace "go.opentelemetry.io/otel/sdk/trace"
14+
"go.opentelemetry.io/otel/trace"
15+
)
16+
17+
func TestMergedContextDeadline(t *testing.T) {
18+
now := time.Now()
19+
ctx1 := context.Background()
20+
mergedContext := NewMergedContext(ctx1)
21+
22+
deadline, ok := mergedContext.Deadline()
23+
require.False(t, ok)
24+
25+
ctx2, cancel2 := context.WithDeadline(context.Background(), now.Add(200))
26+
defer cancel2()
27+
mergedContext = mergedContext.Merge(ctx2)
28+
29+
deadline, ok = mergedContext.Deadline()
30+
require.True(t, ok)
31+
require.Equal(t, now.Add(200), deadline)
32+
33+
ctx3, cancel3 := context.WithDeadline(context.Background(), now.Add(300))
34+
defer cancel3()
35+
ctx4, cancel4 := context.WithDeadline(context.Background(), now.Add(100))
36+
defer cancel4()
37+
mergedContext = mergedContext.Merge(ctx3)
38+
mergedContext = mergedContext.Merge(ctx4)
39+
40+
deadline, ok = mergedContext.Deadline()
41+
require.True(t, ok)
42+
require.Equal(t, now.Add(300), deadline)
43+
}
44+
45+
func TestMergedContextLink(t *testing.T) {
46+
tracerProvider := componenttest.NewTelemetry().NewTelemetrySettings().TracerProvider
47+
tracer := tracerProvider.Tracer("go.opentelemetry.io/collector/exporter/exporterhelper")
48+
49+
ctx1 := context.WithValue(context.Background(), "key", "value")
50+
ctx2, span2 := tracer.Start(ctx1, "span2")
51+
defer span2.End()
52+
53+
mergedContext := NewMergedContext(ctx2)
54+
ctx3, span3 := tracer.Start(ctx1, "span3")
55+
defer span3.End()
56+
mergedContext = mergedContext.Merge(ctx3)
57+
ctx4, span4 := tracer.Start(ctx1, "span3")
58+
defer span4.End()
59+
mergedContext = mergedContext.Merge(ctx4)
60+
61+
span2.AddEvent("This is an event.")
62+
63+
spanFromMergedContext := trace.SpanFromContext(mergedContext)
64+
require.Equal(t, span2, spanFromMergedContext)
65+
66+
require.Equal(t, trace.SpanContextFromContext(ctx3), span2.(sdktrace.ReadOnlySpan).Links()[0].SpanContext)
67+
require.Equal(t, trace.SpanContextFromContext(ctx4), span2.(sdktrace.ReadOnlySpan).Links()[1].SpanContext)
68+
}

0 commit comments

Comments
 (0)