Skip to content

Commit ce8e230

Browse files
portertechchengchuanpeng
authored andcommitted
[processor/tailsampling] Improved "not sampled" decision cache usage (open-telemetry#37189)
Currently, a "not sampled" decision for a trace is only cached when late spans arrive for it. Whereas a "sampled" decision for a trace is immediately cached. This pull request immediately caches "not sampled" decisions, late spans (or reprocessed spans) for "not sampled" traces now take fewer resources to process. Currently, traces are stored in memory until `num_traces` is reached, then the oldest is deleted to make room. This is a clever mechanism to limit the size* of the internal map, however, trace span counts and their contents likely vary considerably. This pull request doesn't touch this mechanism, but it does delete traces from the internal map after they are released and are added to a decision cache. --------- Signed-off-by: Sean Porter <[email protected]>
1 parent 1873a70 commit ce8e230

File tree

3 files changed

+51
-10
lines changed

3 files changed

+51
-10
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: tailsamplingprocessor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Improved not sampled decision cache usage and deleting traces from the internal map when they are in a decision cache.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [37189]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

processor/tailsamplingprocessor/processor.go

+22-5
Original file line numberDiff line numberDiff line change
@@ -350,8 +350,11 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() {
350350
trace.ReceivedBatches = ptrace.NewTraces()
351351
trace.Unlock()
352352

353-
if decision == sampling.Sampled {
353+
switch decision {
354+
case sampling.Sampled:
354355
tsp.releaseSampledTrace(context.Background(), id, allSpans)
356+
case sampling.NotSampled:
357+
tsp.releaseNotSampledTrace(id)
355358
}
356359
}
357360

@@ -517,7 +520,7 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc
517520
appendToTraces(traceTd, resourceSpans, spans)
518521
tsp.releaseSampledTrace(tsp.ctx, id, traceTd)
519522
case sampling.NotSampled:
520-
tsp.nonSampledIDCache.Put(id, true)
523+
tsp.releaseNotSampledTrace(id)
521524
default:
522525
tsp.logger.Warn("Encountered unexpected sampling decision",
523526
zap.Int("decision", int(finalDecision)))
@@ -565,16 +568,30 @@ func (tsp *tailSamplingSpanProcessor) dropTrace(traceID pcommon.TraceID, deletio
565568
tsp.telemetry.ProcessorTailSamplingSamplingTraceRemovalAge.Record(tsp.ctx, int64(deletionTime.Sub(trace.ArrivalTime)/time.Second))
566569
}
567570

568-
// releaseSampledTrace sends the trace data to the next consumer.
569-
// It additionally adds the trace ID to the cache of sampled trace IDs.
570-
// It does not (yet) delete the spans from the internal map.
571+
// releaseSampledTrace sends the trace data to the next consumer. It
572+
// additionally adds the trace ID to the cache of sampled trace IDs. If the
573+
// trace ID is cached, it deletes the spans from the internal map.
571574
func (tsp *tailSamplingSpanProcessor) releaseSampledTrace(ctx context.Context, id pcommon.TraceID, td ptrace.Traces) {
572575
tsp.sampledIDCache.Put(id, true)
573576
if err := tsp.nextConsumer.ConsumeTraces(ctx, td); err != nil {
574577
tsp.logger.Warn(
575578
"Error sending spans to destination",
576579
zap.Error(err))
577580
}
581+
_, ok := tsp.sampledIDCache.Get(id)
582+
if ok {
583+
tsp.dropTrace(id, time.Now())
584+
}
585+
}
586+
587+
// releaseNotSampledTrace adds the trace ID to the cache of not sampled trace
588+
// IDs. If the trace ID is cached, it deletes the spans from the internal map.
589+
func (tsp *tailSamplingSpanProcessor) releaseNotSampledTrace(id pcommon.TraceID) {
590+
tsp.nonSampledIDCache.Put(id, true)
591+
_, ok := tsp.nonSampledIDCache.Get(id)
592+
if ok {
593+
tsp.dropTrace(id, time.Now())
594+
}
578595
}
579596

580597
func appendToTraces(dest ptrace.Traces, rss ptrace.ResourceSpans, spanAndScopes []spanAndScope) {

processor/tailsamplingprocessor/processor_decisions_test.go

+2-5
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ package tailsamplingprocessor
66
import (
77
"context"
88
"testing"
9-
"time"
109

1110
"github.com/stretchr/testify/require"
1211
"go.opentelemetry.io/collector/component/componenttest"
@@ -386,8 +385,7 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) {
386385
// The final decision SHOULD be Sampled.
387386
require.EqualValues(t, 1, nextConsumer.SpanCount())
388387

389-
// Drop the trace to force cache to make decision
390-
tsp.dropTrace(traceID, time.Now())
388+
// The trace should have been dropped after its id was added to the decision cache
391389
_, ok := tsp.idToTrace.Load(traceID)
392390
require.False(t, ok)
393391

@@ -461,8 +459,7 @@ func TestLateSpanUsesNonSampledDecisionCache(t *testing.T) {
461459
// The final decision SHOULD be NOT Sampled.
462460
require.EqualValues(t, 0, nextConsumer.SpanCount())
463461

464-
// Drop the trace to force cache to make decision
465-
tsp.dropTrace(traceID, time.Now())
462+
// The trace should have been dropped after its id was added to the decision cache
466463
_, ok := tsp.idToTrace.Load(traceID)
467464
require.False(t, ok)
468465

0 commit comments

Comments
 (0)