Skip to content

Commit 0069503

Browse files
committed
Avoid unnecessary copy of the data in routing connector
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 8c74228 commit 0069503

File tree

6 files changed

+114
-143
lines changed

6 files changed

+114
-143
lines changed

connector/routingconnector/internal/plogutil/logs.go

+4-10
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ func MoveResourcesIf(from, to plog.Logs, f func(plog.ResourceLogs) bool) {
1414
if !f(rl) {
1515
return false
1616
}
17-
rl.CopyTo(to.ResourceLogs().AppendEmpty())
17+
rl.MoveTo(to.ResourceLogs().AppendEmpty())
1818
return true
1919
})
2020
}
@@ -25,12 +25,10 @@ func MoveResourcesIf(from, to plog.Logs, f func(plog.ResourceLogs) bool) {
2525
// Resources or Scopes are removed from the original if they become empty. All ordering is preserved.
2626
func MoveRecordsWithContextIf(from, to plog.Logs, f func(plog.ResourceLogs, plog.ScopeLogs, plog.LogRecord) bool) {
2727
rls := from.ResourceLogs()
28-
for i := 0; i < rls.Len(); i++ {
29-
rl := rls.At(i)
28+
rls.RemoveIf(func(rl plog.ResourceLogs) bool {
3029
sls := rl.ScopeLogs()
3130
var rlCopy *plog.ResourceLogs
32-
for j := 0; j < sls.Len(); j++ {
33-
sl := sls.At(j)
31+
sls.RemoveIf(func(sl plog.ScopeLogs) bool {
3432
lrs := sl.LogRecords()
3533
var slCopy *plog.ScopeLogs
3634
lrs.RemoveIf(func(lr plog.LogRecord) bool {
@@ -49,15 +47,11 @@ func MoveRecordsWithContextIf(from, to plog.Logs, f func(plog.ResourceLogs, plog
4947
sl.Scope().CopyTo(slCopy.Scope())
5048
slCopy.SetSchemaUrl(sl.SchemaUrl())
5149
}
52-
lr.CopyTo(slCopy.LogRecords().AppendEmpty())
50+
lr.MoveTo(slCopy.LogRecords().AppendEmpty())
5351
return true
5452
})
55-
}
56-
sls.RemoveIf(func(sl plog.ScopeLogs) bool {
5753
return sl.LogRecords().Len() == 0
5854
})
59-
}
60-
rls.RemoveIf(func(rl plog.ResourceLogs) bool {
6155
return rl.ScopeLogs().Len() == 0
6256
})
6357
}

connector/routingconnector/internal/plogutil/logs_test.go

+13
Original file line numberDiff line numberDiff line change
@@ -233,3 +233,16 @@ func TestMoveRecordsWithContextIf(t *testing.T) {
233233
})
234234
}
235235
}
236+
237+
func BenchmarkMoveResourcesIfLogs(b *testing.B) {
238+
b.ReportAllocs()
239+
for i := 0; i < b.N; i++ {
240+
from := plogutiltest.NewLogs("AB", "CD", "EF")
241+
to := plog.NewLogs()
242+
plogutil.MoveResourcesIf(from, to, func(rl plog.ResourceLogs) bool {
243+
return true
244+
})
245+
assert.Equal(b, 0, from.LogRecordCount())
246+
assert.Equal(b, 8, to.LogRecordCount())
247+
}
248+
}

connector/routingconnector/internal/pmetricutil/metrics.go

+58-114
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ func MoveResourcesIf(from, to pmetric.Metrics, f func(pmetric.ResourceMetrics) b
1212
if !f(rs) {
1313
return false
1414
}
15-
rs.CopyTo(to.ResourceMetrics().AppendEmpty())
15+
rs.MoveTo(to.ResourceMetrics().AppendEmpty())
1616
return true
1717
})
1818
}
@@ -23,39 +23,27 @@ func MoveResourcesIf(from, to pmetric.Metrics, f func(pmetric.ResourceMetrics) b
2323
// Resources or Scopes are removed from the original if they become empty. All ordering is preserved.
2424
func MoveMetricsWithContextIf(from, to pmetric.Metrics, f func(pmetric.ResourceMetrics, pmetric.ScopeMetrics, pmetric.Metric) bool) {
2525
rms := from.ResourceMetrics()
26-
for i := 0; i < rms.Len(); i++ {
27-
rm := rms.At(i)
26+
rms.RemoveIf(func(rm pmetric.ResourceMetrics) bool {
2827
sms := rm.ScopeMetrics()
2928
var rmCopy *pmetric.ResourceMetrics
30-
for j := 0; j < sms.Len(); j++ {
31-
sm := sms.At(j)
29+
sms.RemoveIf(func(sm pmetric.ScopeMetrics) bool {
3230
ms := sm.Metrics()
3331
var smCopy *pmetric.ScopeMetrics
3432
ms.RemoveIf(func(m pmetric.Metric) bool {
3533
if !f(rm, sm, m) {
3634
return false
3735
}
3836
if rmCopy == nil {
39-
rmc := to.ResourceMetrics().AppendEmpty()
40-
rmCopy = &rmc
41-
rm.Resource().CopyTo(rmCopy.Resource())
42-
rmCopy.SetSchemaUrl(rm.SchemaUrl())
37+
rmCopy = copyResourceMetrics(rm, to.ResourceMetrics())
4338
}
4439
if smCopy == nil {
45-
smc := rmCopy.ScopeMetrics().AppendEmpty()
46-
smCopy = &smc
47-
sm.Scope().CopyTo(smCopy.Scope())
48-
smCopy.SetSchemaUrl(sm.SchemaUrl())
40+
smCopy = copyScopeMetrics(sm, rmCopy.ScopeMetrics())
4941
}
50-
m.CopyTo(smCopy.Metrics().AppendEmpty())
42+
m.MoveTo(smCopy.Metrics().AppendEmpty())
5143
return true
5244
})
53-
}
54-
sms.RemoveIf(func(sm pmetric.ScopeMetrics) bool {
5545
return sm.Metrics().Len() == 0
5646
})
57-
}
58-
rms.RemoveIf(func(rm pmetric.ResourceMetrics) bool {
5947
return rm.ScopeMetrics().Len() == 0
6048
})
6149
}
@@ -66,16 +54,13 @@ func MoveMetricsWithContextIf(from, to pmetric.Metrics, f func(pmetric.ResourceM
6654
// Resources, Scopes, or Metrics are removed from the original if they become empty. All ordering is preserved.
6755
func MoveDataPointsWithContextIf(from, to pmetric.Metrics, f func(pmetric.ResourceMetrics, pmetric.ScopeMetrics, pmetric.Metric, any) bool) {
6856
rms := from.ResourceMetrics()
69-
for i := 0; i < rms.Len(); i++ {
70-
rm := rms.At(i)
57+
rms.RemoveIf(func(rm pmetric.ResourceMetrics) bool {
7158
sms := rm.ScopeMetrics()
7259
var rmCopy *pmetric.ResourceMetrics
73-
for j := 0; j < sms.Len(); j++ {
74-
sm := sms.At(j)
60+
sms.RemoveIf(func(sm pmetric.ScopeMetrics) bool {
7561
ms := sm.Metrics()
7662
var smCopy *pmetric.ScopeMetrics
77-
for k := 0; k < ms.Len(); k++ {
78-
m := ms.At(k)
63+
ms.RemoveIf(func(m pmetric.Metric) bool {
7964
var mCopy *pmetric.Metric
8065

8166
// TODO condense this code
@@ -87,168 +72,127 @@ func MoveDataPointsWithContextIf(from, to pmetric.Metrics, f func(pmetric.Resour
8772
return false
8873
}
8974
if rmCopy == nil {
90-
rmc := to.ResourceMetrics().AppendEmpty()
91-
rmCopy = &rmc
92-
rm.Resource().CopyTo(rmCopy.Resource())
93-
rmCopy.SetSchemaUrl(rm.SchemaUrl())
75+
rmCopy = copyResourceMetrics(rm, to.ResourceMetrics())
9476
}
9577
if smCopy == nil {
96-
smc := rmCopy.ScopeMetrics().AppendEmpty()
97-
smCopy = &smc
98-
sm.Scope().CopyTo(smCopy.Scope())
99-
smCopy.SetSchemaUrl(sm.SchemaUrl())
78+
smCopy = copyScopeMetrics(sm, rmCopy.ScopeMetrics())
10079
}
10180
if mCopy == nil {
102-
mc := smCopy.Metrics().AppendEmpty()
103-
mCopy = &mc
104-
mCopy.SetName(m.Name())
105-
mCopy.SetDescription(m.Description())
106-
mCopy.SetUnit(m.Unit())
81+
mCopy = copyMetricDescription(m, smCopy.Metrics())
10782
mCopy.SetEmptyGauge()
10883
}
109-
dp.CopyTo(mCopy.Gauge().DataPoints().AppendEmpty())
84+
dp.MoveTo(mCopy.Gauge().DataPoints().AppendEmpty())
11085
return true
11186
})
87+
return dps.Len() == 0
11288
case pmetric.MetricTypeSum:
11389
dps := m.Sum().DataPoints()
11490
dps.RemoveIf(func(dp pmetric.NumberDataPoint) bool {
11591
if !f(rm, sm, m, dp) {
11692
return false
11793
}
11894
if rmCopy == nil {
119-
rmc := to.ResourceMetrics().AppendEmpty()
120-
rmCopy = &rmc
121-
rm.Resource().CopyTo(rmCopy.Resource())
122-
rmCopy.SetSchemaUrl(rm.SchemaUrl())
95+
rmCopy = copyResourceMetrics(rm, to.ResourceMetrics())
12396
}
12497
if smCopy == nil {
125-
smc := rmCopy.ScopeMetrics().AppendEmpty()
126-
smCopy = &smc
127-
sm.Scope().CopyTo(smCopy.Scope())
128-
smCopy.SetSchemaUrl(sm.SchemaUrl())
98+
smCopy = copyScopeMetrics(sm, rmCopy.ScopeMetrics())
12999
}
130100
if mCopy == nil {
131-
mc := smCopy.Metrics().AppendEmpty()
132-
mCopy = &mc
133-
mCopy.SetName(m.Name())
134-
mCopy.SetDescription(m.Description())
135-
mCopy.SetUnit(m.Unit())
101+
mCopy = copyMetricDescription(m, smCopy.Metrics())
136102
mCopy.SetEmptySum()
137103
}
138-
dp.CopyTo(mCopy.Sum().DataPoints().AppendEmpty())
104+
dp.MoveTo(mCopy.Sum().DataPoints().AppendEmpty())
139105
return true
140106
})
107+
return dps.Len() == 0
141108
case pmetric.MetricTypeHistogram:
142109
dps := m.Histogram().DataPoints()
143110
dps.RemoveIf(func(dp pmetric.HistogramDataPoint) bool {
144111
if !f(rm, sm, m, dp) {
145112
return false
146113
}
147114
if rmCopy == nil {
148-
rmc := to.ResourceMetrics().AppendEmpty()
149-
rmCopy = &rmc
150-
rm.Resource().CopyTo(rmCopy.Resource())
151-
rmCopy.SetSchemaUrl(rm.SchemaUrl())
115+
rmCopy = copyResourceMetrics(rm, to.ResourceMetrics())
152116
}
153117
if smCopy == nil {
154-
smc := rmCopy.ScopeMetrics().AppendEmpty()
155-
smCopy = &smc
156-
sm.Scope().CopyTo(smCopy.Scope())
157-
smCopy.SetSchemaUrl(sm.SchemaUrl())
118+
smCopy = copyScopeMetrics(sm, rmCopy.ScopeMetrics())
158119
}
159120
if mCopy == nil {
160-
mc := smCopy.Metrics().AppendEmpty()
161-
mCopy = &mc
162-
mCopy.SetName(m.Name())
163-
mCopy.SetDescription(m.Description())
164-
mCopy.SetUnit(m.Unit())
121+
mCopy = copyMetricDescription(m, smCopy.Metrics())
165122
mCopy.SetEmptyHistogram()
166123
}
167-
dp.CopyTo(mCopy.Histogram().DataPoints().AppendEmpty())
124+
dp.MoveTo(mCopy.Histogram().DataPoints().AppendEmpty())
168125
return true
169126
})
127+
return dps.Len() == 0
170128
case pmetric.MetricTypeExponentialHistogram:
171129
dps := m.ExponentialHistogram().DataPoints()
172130
dps.RemoveIf(func(dp pmetric.ExponentialHistogramDataPoint) bool {
173131
if !f(rm, sm, m, dp) {
174132
return false
175133
}
176134
if rmCopy == nil {
177-
rmc := to.ResourceMetrics().AppendEmpty()
178-
rmCopy = &rmc
179-
rm.Resource().CopyTo(rmCopy.Resource())
180-
rmCopy.SetSchemaUrl(rm.SchemaUrl())
135+
rmCopy = copyResourceMetrics(rm, to.ResourceMetrics())
181136
}
182137
if smCopy == nil {
183-
smc := rmCopy.ScopeMetrics().AppendEmpty()
184-
smCopy = &smc
185-
sm.Scope().CopyTo(smCopy.Scope())
186-
smCopy.SetSchemaUrl(sm.SchemaUrl())
138+
smCopy = copyScopeMetrics(sm, rmCopy.ScopeMetrics())
187139
}
188140
if mCopy == nil {
189-
mc := smCopy.Metrics().AppendEmpty()
190-
mCopy = &mc
191-
mCopy.SetName(m.Name())
192-
mCopy.SetDescription(m.Description())
193-
mCopy.SetUnit(m.Unit())
141+
mCopy = copyMetricDescription(m, smCopy.Metrics())
194142
mCopy.SetEmptyExponentialHistogram()
195143
}
196-
dp.CopyTo(mCopy.ExponentialHistogram().DataPoints().AppendEmpty())
144+
dp.MoveTo(mCopy.ExponentialHistogram().DataPoints().AppendEmpty())
197145
return true
198146
})
147+
return dps.Len() == 0
199148
case pmetric.MetricTypeSummary:
200149
dps := m.Summary().DataPoints()
201150
dps.RemoveIf(func(dp pmetric.SummaryDataPoint) bool {
202151
if !f(rm, sm, m, dp) {
203152
return false
204153
}
205154
if rmCopy == nil {
206-
rmc := to.ResourceMetrics().AppendEmpty()
207-
rmCopy = &rmc
208-
rm.Resource().CopyTo(rmCopy.Resource())
209-
rmCopy.SetSchemaUrl(rm.SchemaUrl())
155+
rmCopy = copyResourceMetrics(rm, to.ResourceMetrics())
210156
}
211157
if smCopy == nil {
212-
smc := rmCopy.ScopeMetrics().AppendEmpty()
213-
smCopy = &smc
214-
sm.Scope().CopyTo(smCopy.Scope())
215-
smCopy.SetSchemaUrl(sm.SchemaUrl())
158+
smCopy = copyScopeMetrics(sm, rmCopy.ScopeMetrics())
216159
}
217160
if mCopy == nil {
218-
mc := smCopy.Metrics().AppendEmpty()
219-
mCopy = &mc
220-
mCopy.SetName(m.Name())
221-
mCopy.SetDescription(m.Description())
222-
mCopy.SetUnit(m.Unit())
161+
mCopy = copyMetricDescription(m, smCopy.Metrics())
223162
mCopy.SetEmptySummary()
224163
}
225-
dp.CopyTo(mCopy.Summary().DataPoints().AppendEmpty())
164+
dp.MoveTo(mCopy.Summary().DataPoints().AppendEmpty())
226165
return true
227166
})
167+
return dps.Len() == 0
228168
}
229-
}
230-
ms.RemoveIf(func(m pmetric.Metric) bool {
231-
var numDPs int
232-
switch m.Type() {
233-
case pmetric.MetricTypeGauge:
234-
numDPs = m.Gauge().DataPoints().Len()
235-
case pmetric.MetricTypeSum:
236-
numDPs = m.Sum().DataPoints().Len()
237-
case pmetric.MetricTypeHistogram:
238-
numDPs = m.Histogram().DataPoints().Len()
239-
case pmetric.MetricTypeExponentialHistogram:
240-
numDPs = m.ExponentialHistogram().DataPoints().Len()
241-
case pmetric.MetricTypeSummary:
242-
numDPs = m.Summary().DataPoints().Len()
243-
}
244-
return numDPs == 0
169+
// Do not remove unknown type.
170+
return false
245171
})
246-
}
247-
sms.RemoveIf(func(sm pmetric.ScopeMetrics) bool {
248172
return sm.Metrics().Len() == 0
249173
})
250-
}
251-
rms.RemoveIf(func(rm pmetric.ResourceMetrics) bool {
252174
return rm.ScopeMetrics().Len() == 0
253175
})
254176
}
177+
178+
func copyResourceMetrics(from pmetric.ResourceMetrics, to pmetric.ResourceMetricsSlice) *pmetric.ResourceMetrics {
179+
rmc := to.AppendEmpty()
180+
from.Resource().CopyTo(rmc.Resource())
181+
rmc.SetSchemaUrl(from.SchemaUrl())
182+
return &rmc
183+
}
184+
185+
func copyScopeMetrics(from pmetric.ScopeMetrics, to pmetric.ScopeMetricsSlice) *pmetric.ScopeMetrics {
186+
smc := to.AppendEmpty()
187+
from.Scope().CopyTo(smc.Scope())
188+
smc.SetSchemaUrl(from.SchemaUrl())
189+
return &smc
190+
}
191+
192+
func copyMetricDescription(from pmetric.Metric, to pmetric.MetricSlice) *pmetric.Metric {
193+
mc := to.AppendEmpty()
194+
mc.SetName(from.Name())
195+
mc.SetDescription(from.Description())
196+
mc.SetUnit(from.Unit())
197+
return &mc
198+
}

connector/routingconnector/internal/pmetricutil/metrics_test.go

+13
Original file line numberDiff line numberDiff line change
@@ -1506,3 +1506,16 @@ func TestMoveDataPointsWithContextIf(t *testing.T) {
15061506
})
15071507
}
15081508
}
1509+
1510+
func BenchmarkMoveResourcesIfMetrics(b *testing.B) {
1511+
b.ReportAllocs()
1512+
for i := 0; i < b.N; i++ {
1513+
from := pmetricutiltest.NewGauges("AB", "CD", "EF", "GH")
1514+
to := pmetric.NewMetrics()
1515+
pmetricutil.MoveResourcesIf(from, to, func(rl pmetric.ResourceMetrics) bool {
1516+
return true
1517+
})
1518+
assert.Equal(b, 0, from.DataPointCount())
1519+
assert.Equal(b, 16, to.DataPointCount())
1520+
}
1521+
}

0 commit comments

Comments
 (0)