Skip to content

Commit 3f6082e

Browse files
committed
Implemented merged context with link
1 parent 2493b6e commit 3f6082e

File tree

3 files changed

+128
-4
lines changed

3 files changed

+128
-4
lines changed

exporter/exporterhelper/internal/batcher/default_batcher.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717
)
1818

1919
type batch struct {
20-
ctx context.Context
20+
ctx mergedContext
2121
req request.Request
2222
done multiDone
2323
}
@@ -86,7 +86,7 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
8686
// Do not flush the last item and add it to the current batch.
8787
reqList = reqList[:len(reqList)-1]
8888
qb.currentBatch = &batch{
89-
ctx: ctx,
89+
ctx: NewMergedContext(ctx),
9090
req: lastReq,
9191
done: multiDone{done},
9292
}
@@ -120,9 +120,10 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
120120
// - Last result may not have enough data to be flushed.
121121

122122
// Logic on how to deal with the current batch:
123-
// TODO: Deal with merging Context.
124123
qb.currentBatch.req = reqList[0]
125124
qb.currentBatch.done = append(qb.currentBatch.done, done)
125+
qb.currentBatch.ctx = qb.currentBatch.ctx.Merge(qb.currentBatch.ctx)
126+
126127
// Save the "currentBatch" if we need to flush it, because we want to execute flush without holding the lock, and
127128
// cannot unlock and re-lock because we are not done processing all the responses.
128129
var firstBatch *batch
@@ -141,7 +142,7 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
141142
// Do not flush the last item and add it to the current batch.
142143
reqList = reqList[:len(reqList)-1]
143144
qb.currentBatch = &batch{
144-
ctx: ctx,
145+
ctx: NewMergedContext(ctx),
145146
req: lastReq,
146147
done: multiDone{done},
147148
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package batcher // import "go.opentelemetry.io/collector/exporter/internal/queue"
5+
import (
6+
"context"
7+
"time"
8+
9+
"go.opentelemetry.io/otel/trace"
10+
)
11+
12+
type mergedContext struct {
13+
deadline time.Time
14+
deadlineOk bool
15+
ctx context.Context
16+
}
17+
18+
func NewMergedContext(ctx context.Context) mergedContext {
19+
deadline, ok := ctx.Deadline()
20+
return mergedContext{
21+
deadline: deadline,
22+
deadlineOk: ok,
23+
ctx: ctx,
24+
}
25+
}
26+
27+
// ContextWithSpan returns a copy of parent with span set as the current Span.
28+
func (mc *mergedContext) Merge(other context.Context) mergedContext {
29+
deadline, deadlineOk := mc.Deadline()
30+
if otherDeadline, ok := other.Deadline(); ok {
31+
deadlineOk = true
32+
if deadline.Before(otherDeadline) {
33+
deadline = otherDeadline
34+
}
35+
}
36+
trace.SpanFromContext(other).AddLink(trace.LinkFromContext(mc.ctx))
37+
return mergedContext{
38+
deadline: deadline,
39+
deadlineOk: deadlineOk,
40+
ctx: mc.ctx,
41+
}
42+
}
43+
44+
func (mc mergedContext) Deadline() (time.Time, bool) {
45+
return mc.deadline, mc.deadlineOk
46+
}
47+
48+
func (mc mergedContext) Done() <-chan struct{} {
49+
return nil
50+
}
51+
52+
func (mc mergedContext) Err() error {
53+
return nil
54+
}
55+
56+
func (mc mergedContext) Value(key any) any {
57+
return mc.ctx.Value(key)
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package batcher
5+
6+
import (
7+
"context"
8+
"testing"
9+
"time"
10+
11+
"github.com/stretchr/testify/require"
12+
"go.opentelemetry.io/collector/component/componenttest"
13+
sdktrace "go.opentelemetry.io/otel/sdk/trace"
14+
"go.opentelemetry.io/otel/trace"
15+
)
16+
17+
func TestMergedContextDeadline(t *testing.T) {
18+
now := time.Now()
19+
ctx1 := context.Background()
20+
mergedContext := NewMergedContext(ctx1)
21+
22+
deadline, ok := mergedContext.Deadline()
23+
require.False(t, ok)
24+
25+
ctx2, cancel2 := context.WithDeadline(context.Background(), now.Add(200))
26+
defer cancel2()
27+
mergedContext = mergedContext.Merge(ctx2)
28+
29+
deadline, ok = mergedContext.Deadline()
30+
require.True(t, ok)
31+
require.Equal(t, now.Add(200), deadline)
32+
33+
ctx3, cancel3 := context.WithDeadline(context.Background(), now.Add(300))
34+
defer cancel3()
35+
ctx4, cancel4 := context.WithDeadline(context.Background(), now.Add(100))
36+
defer cancel4()
37+
mergedContext = mergedContext.Merge(ctx3)
38+
mergedContext = mergedContext.Merge(ctx4)
39+
40+
deadline, ok = mergedContext.Deadline()
41+
require.True(t, ok)
42+
require.Equal(t, now.Add(300), deadline)
43+
}
44+
45+
func TestMergedContextLink(t *testing.T) {
46+
tracerProvider := componenttest.NewTelemetry().NewTelemetrySettings().TracerProvider
47+
tracer := tracerProvider.Tracer("go.opentelemetry.io/collector/exporter/exporterhelper")
48+
49+
ctx1 := context.WithValue(context.Background(), "key", "value")
50+
ctx2, span2 := tracer.Start(ctx1, "span2")
51+
52+
mergedContext := NewMergedContext(ctx2)
53+
ctx3, span3 := tracer.Start(ctx1, "span3")
54+
mergedContext = mergedContext.Merge(ctx3)
55+
ctx4, span4 := tracer.Start(ctx1, "span3")
56+
mergedContext = mergedContext.Merge(ctx4)
57+
58+
span2.AddEvent("This is an event.")
59+
60+
spanFromMergedContext := trace.SpanFromContext(mergedContext)
61+
require.Equal(t, span2, spanFromMergedContext)
62+
63+
require.Equal(t, trace.SpanContextFromContext(ctx2), span3.(sdktrace.ReadOnlySpan).Links()[0].SpanContext)
64+
require.Equal(t, trace.SpanContextFromContext(ctx2), span4.(sdktrace.ReadOnlySpan).Links()[0].SpanContext)
65+
}

0 commit comments

Comments
 (0)