@@ -5,78 +5,238 @@ package googlecloudpubsubexporter
5
5
6
6
import (
7
7
"context"
8
+ "fmt"
8
9
"testing"
9
- "time"
10
10
11
11
pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
12
- "cloud.google.com/go/pubsub/pstest"
12
+ "github.com/google/uuid"
13
+ "github.com/googleapis/gax-go/v2"
13
14
"github.com/stretchr/testify/assert"
14
- "go.opentelemetry.io/collector/exporter/exporterhelper"
15
+ "github.com/stretchr/testify/require"
16
+ "go.opentelemetry.io/collector/component/componenttest"
15
17
"go.opentelemetry.io/collector/exporter/exportertest"
16
18
"go.opentelemetry.io/collector/pdata/plog"
17
19
"go.opentelemetry.io/collector/pdata/pmetric"
18
20
"go.opentelemetry.io/collector/pdata/ptrace"
19
21
)
20
22
21
- func TestName(t *testing.T) {
22
- exporter := &pubsubExporter{}
23
- assert.Equal(t, "googlecloudpubsub", exporter.Name())
24
- }
23
+ const (
24
+ defaultUUID = "00000000-0000-0000-0000-000000000000"
25
+ defaultProjectID = "my-project"
26
+ defaultTopic = "projects/my-project/topics/otlp"
27
+ )
25
28
26
- func TestExporterDefaultSettings(t *testing.T) {
27
- ctx := context.Background()
28
- // Start a fake server running locally.
29
- srv := pstest.NewServer()
30
- defer srv.Close()
31
- _, err := srv.GServer.CreateTopic(ctx, &pb.Topic{
32
- Name: "projects/my-project/topics/otlp",
29
+ func TestExporterNoData(t *testing.T) {
30
+ exporter, publisher := newTestExporter(t, func(config *Config) {
31
+ config.Watermark.Behavior = "earliest"
33
32
})
34
- assert.NoError(t, err)
35
33
36
- factory := NewFactory()
37
- cfg := factory.CreateDefaultConfig()
38
- exporterConfig := cfg.(*Config)
39
- exporterConfig.Endpoint = srv.Addr
40
- exporterConfig.Insecure = true
41
- exporterConfig.ProjectID = "my-project"
42
- exporterConfig.Topic = "projects/my-project/topics/otlp"
43
- exporterConfig.TimeoutSettings = exporterhelper.TimeoutConfig{
44
- Timeout: 12 * time.Second,
45
- }
46
- exporter := ensureExporter(exportertest.NewNopSettings(), exporterConfig)
47
- assert.NoError(t, exporter.start(ctx, nil))
48
- assert.NoError(t, exporter.consumeTraces(ctx, ptrace.NewTraces()))
49
- assert.NoError(t, exporter.consumeMetrics(ctx, pmetric.NewMetrics()))
34
+ ctx := context.Background()
50
35
assert.NoError(t, exporter.consumeLogs(ctx, plog.NewLogs()))
51
- assert.NoError(t, exporter.shutdown(ctx))
36
+ assert.NoError(t, exporter.consumeMetrics(ctx, pmetric.NewMetrics()))
37
+ assert.NoError(t, exporter.consumeTraces(ctx, ptrace.NewTraces()))
38
+
39
+ assert.Zero(t, publisher.requests)
52
40
}
53
41
54
- func TestExporterCompression(t *testing.T) {
55
- ctx := context.Background()
56
- // Start a fake server running locally.
57
- srv := pstest.NewServer()
58
- defer srv.Close()
59
- _, err := srv.GServer.CreateTopic(ctx, &pb.Topic{
60
- Name: "projects/my-project/topics/otlp",
42
+ func TestExporterClientError(t *testing.T) {
43
+ cfg := NewFactory().CreateDefaultConfig().(*Config)
44
+ cfg.ProjectID = defaultProjectID
45
+ cfg.Topic = defaultTopic
46
+ require.NoError(t, cfg.Validate())
47
+
48
+ exporter := ensureExporter(exportertest.NewNopSettings(), cfg)
49
+ exporter.makeClient = func(context.Context, *Config, string) (publisherClient, error) {
50
+ return nil, fmt.Errorf("something went wrong")
51
+ }
52
+
53
+ require.Error(t, exporter.start(context.Background(), componenttest.NewNopHost()))
54
+ }
55
+
56
+ func TestExporterSimpleData(t *testing.T) {
57
+ t.Run("logs", func(t *testing.T) {
58
+ exporter, publisher := newTestExporter(t)
59
+
60
+ logs := plog.NewLogs()
61
+ logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("some log message")
62
+
63
+ require.NoError(t, exporter.consumeLogs(context.Background(), logs))
64
+ require.Len(t, publisher.requests, 1)
65
+
66
+ request := publisher.requests[0]
67
+ assert.Equal(t, defaultTopic, request.Topic)
68
+ assert.Len(t, request.Messages, 1)
69
+
70
+ message := request.Messages[0]
71
+ assert.NotEmpty(t, message.Data)
72
+ assert.Subset(t, message.Attributes, map[string]string{
73
+ "ce-type": "org.opentelemetry.otlp.logs.v1",
74
+ "content-type": "application/protobuf",
75
+ })
76
+ })
77
+
78
+ t.Run("metrics", func(t *testing.T) {
79
+ exporter, publisher := newTestExporter(t)
80
+
81
+ metrics := pmetric.NewMetrics()
82
+ metric := metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
83
+ metric.SetName("some.metric")
84
+ metric.SetEmptyGauge().DataPoints().AppendEmpty().SetIntValue(42)
85
+
86
+ require.NoError(t, exporter.consumeMetrics(context.Background(), metrics))
87
+ require.Len(t, publisher.requests, 1)
88
+
89
+ request := publisher.requests[0]
90
+ assert.Equal(t, defaultTopic, request.Topic)
91
+ assert.Len(t, request.Messages, 1)
92
+
93
+ message := request.Messages[0]
94
+ assert.NotEmpty(t, message.Data)
95
+ assert.Subset(t, message.Attributes, map[string]string{
96
+ "ce-type": "org.opentelemetry.otlp.metrics.v1",
97
+ "content-type": "application/protobuf",
98
+ })
61
99
})
62
- assert.NoError(t, err)
100
+
101
+ t.Run("traces", func(t *testing.T) {
102
+ exporter, publisher := newTestExporter(t)
103
+
104
+ traces := ptrace.NewTraces()
105
+ span := traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty()
106
+ span.SetName("some span")
107
+
108
+ require.NoError(t, exporter.consumeTraces(context.Background(), traces))
109
+ require.Len(t, publisher.requests, 1)
110
+
111
+ request := publisher.requests[0]
112
+ assert.Equal(t, defaultTopic, request.Topic)
113
+ assert.Len(t, request.Messages, 1)
114
+
115
+ message := request.Messages[0]
116
+ assert.NotEmpty(t, message.Data)
117
+ assert.Subset(t, message.Attributes, map[string]string{
118
+ "ce-type": "org.opentelemetry.otlp.traces.v1",
119
+ "content-type": "application/protobuf",
120
+ })
121
+ })
122
+ }
123
+
124
+ func TestExporterSimpleDataWithCompression(t *testing.T) {
125
+ withCompression := func(config *Config) {
126
+ config.Compression = "gzip"
127
+ }
128
+
129
+ t.Run("logs", func(t *testing.T) {
130
+ exporter, publisher := newTestExporter(t, withCompression)
131
+
132
+ logs := plog.NewLogs()
133
+ logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("some log message")
134
+
135
+ require.NoError(t, exporter.consumeLogs(context.Background(), logs))
136
+ require.Len(t, publisher.requests, 1)
137
+
138
+ request := publisher.requests[0]
139
+ assert.Equal(t, defaultTopic, request.Topic)
140
+ assert.Len(t, request.Messages, 1)
141
+
142
+ message := request.Messages[0]
143
+ assert.NotEmpty(t, message.Data)
144
+ assert.Subset(t, message.Attributes, map[string]string{
145
+ "ce-id": "00000000-0000-0000-0000-000000000000",
146
+ "ce-source": "/opentelemetry/collector/googlecloudpubsub/latest",
147
+ "ce-specversion": "1.0",
148
+ "ce-type": "org.opentelemetry.otlp.logs.v1",
149
+ "content-type": "application/protobuf",
150
+ "content-encoding": "gzip",
151
+ })
152
+ })
153
+
154
+ t.Run("metrics", func(t *testing.T) {
155
+ exporter, publisher := newTestExporter(t, withCompression)
156
+
157
+ metrics := pmetric.NewMetrics()
158
+ metric := metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
159
+ metric.SetName("some.metric")
160
+ metric.SetEmptyGauge().DataPoints().AppendEmpty().SetIntValue(42)
161
+
162
+ require.NoError(t, exporter.consumeMetrics(context.Background(), metrics))
163
+ require.Len(t, publisher.requests, 1)
164
+
165
+ request := publisher.requests[0]
166
+ assert.Equal(t, defaultTopic, request.Topic)
167
+ assert.Len(t, request.Messages, 1)
168
+
169
+ message := request.Messages[0]
170
+ assert.NotEmpty(t, message.Data)
171
+ assert.Subset(t, message.Attributes, map[string]string{
172
+ "ce-type": "org.opentelemetry.otlp.metrics.v1",
173
+ "content-type": "application/protobuf",
174
+ "content-encoding": "gzip",
175
+ })
176
+ })
177
+
178
+ t.Run("traces", func(t *testing.T) {
179
+ exporter, publisher := newTestExporter(t, withCompression)
180
+
181
+ traces := ptrace.NewTraces()
182
+ span := traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty()
183
+ span.SetName("some span")
184
+
185
+ require.NoError(t, exporter.consumeTraces(context.Background(), traces))
186
+ require.Len(t, publisher.requests, 1)
187
+
188
+ request := publisher.requests[0]
189
+ assert.Equal(t, defaultTopic, request.Topic)
190
+ assert.Len(t, request.Messages, 1)
191
+
192
+ message := request.Messages[0]
193
+ assert.NotEmpty(t, message.Data)
194
+ assert.Subset(t, message.Attributes, map[string]string{
195
+ "ce-type": "org.opentelemetry.otlp.traces.v1",
196
+ "content-type": "application/protobuf",
197
+ "content-encoding": "gzip",
198
+ })
199
+ })
200
+ }
201
+
202
+ // Helpers
203
+
204
+ func newTestExporter(t *testing.T, options ...func(*Config)) (*pubsubExporter, *mockPublisher) {
205
+ t.Helper()
63
206
64
207
factory := NewFactory()
65
- cfg := factory.CreateDefaultConfig()
66
- exporterConfig := cfg.(*Config)
67
- exporterConfig.Endpoint = srv.Addr
68
- exporterConfig.UserAgent = "test-user-agent"
69
- exporterConfig.Insecure = true
70
- exporterConfig.ProjectID = "my-project"
71
- exporterConfig.Topic = "projects/my-project/topics/otlp"
72
- exporterConfig.TimeoutSettings = exporterhelper.TimeoutConfig{
73
- Timeout: 12 * time.Second,
208
+ cfg := factory.CreateDefaultConfig().(*Config)
209
+ cfg.ProjectID = defaultProjectID
210
+ cfg.Topic = defaultTopic
211
+ for _, option := range options {
212
+ option(cfg)
74
213
}
75
- exporterConfig.Compression = "gzip"
76
- exporter := ensureExporter(exportertest.NewNopSettings(), exporterConfig)
77
- assert.NoError(t, exporter.start(ctx, nil))
78
- assert.NoError(t, exporter.consumeTraces(ctx, ptrace.NewTraces()))
79
- assert.NoError(t, exporter.consumeMetrics(ctx, pmetric.NewMetrics()))
80
- assert.NoError(t, exporter.consumeLogs(ctx, plog.NewLogs()))
81
- assert.NoError(t, exporter.shutdown(ctx))
214
+ require.NoError(t, cfg.Validate())
215
+
216
+ exporter := ensureExporter(exportertest.NewNopSettings(), cfg)
217
+ publisher := &mockPublisher{}
218
+ exporter.makeClient = func(context.Context, *Config, string) (publisherClient, error) {
219
+ return publisher, nil
220
+ }
221
+ exporter.makeUUID = func() (uuid.UUID, error) {
222
+ return uuid.Parse(defaultUUID)
223
+ }
224
+
225
+ require.NoError(t, exporter.start(context.Background(), componenttest.NewNopHost()))
226
+ t.Cleanup(func() { assert.NoError(t, exporter.shutdown(context.Background())) })
227
+
228
+ return exporter, publisher
229
+ }
230
+
231
+ type mockPublisher struct {
232
+ requests []*pb.PublishRequest
233
+ }
234
+
235
+ func (m *mockPublisher) Publish(_ context.Context, request *pb.PublishRequest, _ ...gax.CallOption) (*pb.PublishResponse, error) {
236
+ m.requests = append(m.requests, request)
237
+ return &pb.PublishResponse{}, nil
238
+ }
239
+
240
+ func (m *mockPublisher) Close() error {
241
+ return nil
82
242
}
0 commit comments