@@ -840,7 +840,7 @@ func TestConnectorPipelinesGraph(t *testing.T) {
840
840
}
841
841
842
842
for _ , n := range pipeline .processors {
843
- require .True (t , n .Component .(* testcomponents.ExampleProcessor ).Started ())
843
+ require .True (t , n .( * processorNode ). Component .(* testcomponents.ExampleProcessor ).Started ())
844
844
}
845
845
846
846
for _ , n := range pipeline .receivers {
@@ -929,7 +929,7 @@ func TestConnectorPipelinesGraph(t *testing.T) {
929
929
}
930
930
931
931
for _ , n := range pipeline .processors {
932
- require .True (t , n .Component .(* testcomponents.ExampleProcessor ).Stopped ())
932
+ require .True (t , n .( * processorNode ). Component .(* testcomponents.ExampleProcessor ).Stopped ())
933
933
}
934
934
935
935
for _ , n := range pipeline .exporters {
@@ -1010,6 +1010,170 @@ func TestConnectorPipelinesGraph(t *testing.T) {
1010
1010
}
1011
1011
}
1012
1012
1013
+ func TestInstances (t * testing.T ) {
1014
+ tests := []struct {
1015
+ name string
1016
+ pipelineConfigs pipelines.Config
1017
+ expectInstances map [component.ID ]int
1018
+ }{
1019
+ {
1020
+ name : "one_pipeline_each_signal" ,
1021
+ pipelineConfigs : pipelines.Config {
1022
+ pipeline .NewID (pipeline .SignalTraces ): {
1023
+ Receivers : []component.ID {component .MustNewID ("examplereceiver" )},
1024
+ Processors : []component.ID {component .MustNewID ("exampleprocessor" )},
1025
+ Exporters : []component.ID {component .MustNewID ("exampleexporter" )},
1026
+ },
1027
+ pipeline .NewID (pipeline .SignalMetrics ): {
1028
+ Receivers : []component.ID {component .MustNewID ("examplereceiver" )},
1029
+ Processors : []component.ID {component .MustNewID ("exampleprocessor" )},
1030
+ Exporters : []component.ID {component .MustNewID ("exampleexporter" )},
1031
+ },
1032
+ pipeline .NewID (pipeline .SignalLogs ): {
1033
+ Receivers : []component.ID {component .MustNewID ("examplereceiver" )},
1034
+ Processors : []component.ID {component .MustNewID ("exampleprocessor" )},
1035
+ Exporters : []component.ID {component .MustNewID ("exampleexporter" )},
1036
+ },
1037
+ pipeline .NewID (xpipeline .SignalProfiles ): {
1038
+ Receivers : []component.ID {component .MustNewID ("examplereceiver" )},
1039
+ Processors : []component.ID {component .MustNewID ("exampleprocessor" )},
1040
+ Exporters : []component.ID {component .MustNewID ("exampleexporter" )},
1041
+ },
1042
+ },
1043
+ expectInstances : map [component.ID ]int {
1044
+ component .MustNewID ("examplereceiver" ): 4 , // one per signal
1045
+ component .MustNewID ("exampleprocessor" ): 4 , // one per pipeline
1046
+ component .MustNewID ("exampleexporter" ): 4 , // one per signal
1047
+ },
1048
+ },
1049
+ {
1050
+ name : "shared_by_signals" ,
1051
+ pipelineConfigs : pipelines.Config {
1052
+ pipeline .NewIDWithName (pipeline .SignalTraces , "1" ): {
1053
+ Receivers : []component.ID {component .MustNewID ("examplereceiver" )},
1054
+ Processors : []component.ID {component .MustNewID ("exampleprocessor" )},
1055
+ Exporters : []component.ID {component .MustNewID ("exampleexporter" )},
1056
+ },
1057
+ pipeline .NewIDWithName (pipeline .SignalTraces , "2" ): {
1058
+ Receivers : []component.ID {component .MustNewID ("examplereceiver" )},
1059
+ Processors : []component.ID {component .MustNewID ("exampleprocessor" )},
1060
+ Exporters : []component.ID {component .MustNewID ("exampleexporter" )},
1061
+ },
1062
+ pipeline .NewIDWithName (pipeline .SignalMetrics , "1" ): {
1063
+ Receivers : []component.ID {component .MustNewID ("examplereceiver" )},
1064
+ Processors : []component.ID {component .MustNewID ("exampleprocessor" )},
1065
+ Exporters : []component.ID {component .MustNewID ("exampleexporter" )},
1066
+ },
1067
+ pipeline .NewIDWithName (pipeline .SignalMetrics , "2" ): {
1068
+ Receivers : []component.ID {component .MustNewID ("examplereceiver" )},
1069
+ Processors : []component.ID {component .MustNewID ("exampleprocessor" )},
1070
+ Exporters : []component.ID {component .MustNewID ("exampleexporter" )},
1071
+ },
1072
+ pipeline .NewIDWithName (pipeline .SignalLogs , "1" ): {
1073
+ Receivers : []component.ID {component .MustNewID ("examplereceiver" )},
1074
+ Processors : []component.ID {component .MustNewID ("exampleprocessor" )},
1075
+ Exporters : []component.ID {component .MustNewID ("exampleexporter" )},
1076
+ },
1077
+ pipeline .NewIDWithName (pipeline .SignalLogs , "2" ): {
1078
+ Receivers : []component.ID {component .MustNewID ("examplereceiver" )},
1079
+ Processors : []component.ID {component .MustNewID ("exampleprocessor" )},
1080
+ Exporters : []component.ID {component .MustNewID ("exampleexporter" )},
1081
+ },
1082
+ pipeline .NewIDWithName (xpipeline .SignalProfiles , "1" ): {
1083
+ Receivers : []component.ID {component .MustNewID ("examplereceiver" )},
1084
+ Processors : []component.ID {component .MustNewID ("exampleprocessor" )},
1085
+ Exporters : []component.ID {component .MustNewID ("exampleexporter" )},
1086
+ },
1087
+ pipeline .NewIDWithName (xpipeline .SignalProfiles , "2" ): {
1088
+ Receivers : []component.ID {component .MustNewID ("examplereceiver" )},
1089
+ Processors : []component.ID {component .MustNewID ("exampleprocessor" )},
1090
+ Exporters : []component.ID {component .MustNewID ("exampleexporter" )},
1091
+ },
1092
+ },
1093
+ expectInstances : map [component.ID ]int {
1094
+ component .MustNewID ("examplereceiver" ): 4 , // one per signal
1095
+ component .MustNewID ("exampleprocessor" ): 8 , // one per pipeline
1096
+ component .MustNewID ("exampleexporter" ): 4 , // one per signal
1097
+ },
1098
+ },
1099
+ }
1100
+
1101
+ for _ , tt := range tests {
1102
+ t .Run (tt .name , func (t * testing.T ) {
1103
+ set := Settings {
1104
+ Telemetry : componenttest .NewNopTelemetrySettings (),
1105
+ BuildInfo : component .NewDefaultBuildInfo (),
1106
+ ReceiverBuilder : builders .NewReceiver (
1107
+ map [component.ID ]component.Config {
1108
+ component .MustNewID ("examplereceiver" ): testcomponents .ExampleReceiverFactory .CreateDefaultConfig (),
1109
+ },
1110
+ map [component.Type ]receiver.Factory {
1111
+ testcomponents .ExampleReceiverFactory .Type (): testcomponents .ExampleReceiverFactory ,
1112
+ },
1113
+ ),
1114
+ ProcessorBuilder : builders .NewProcessor (
1115
+ map [component.ID ]component.Config {
1116
+ component .MustNewID ("exampleprocessor" ): testcomponents .ExampleProcessorFactory .CreateDefaultConfig (),
1117
+ },
1118
+ map [component.Type ]processor.Factory {
1119
+ testcomponents .ExampleProcessorFactory .Type (): testcomponents .ExampleProcessorFactory ,
1120
+ },
1121
+ ),
1122
+ ExporterBuilder : builders .NewExporter (
1123
+ map [component.ID ]component.Config {
1124
+ component .MustNewID ("exampleexporter" ): testcomponents .ExampleExporterFactory .CreateDefaultConfig (),
1125
+ },
1126
+ map [component.Type ]exporter.Factory {
1127
+ testcomponents .ExampleExporterFactory .Type (): testcomponents .ExampleExporterFactory ,
1128
+ },
1129
+ ),
1130
+ ConnectorBuilder : builders .NewConnector (map [component.ID ]component.Config {}, map [component.Type ]connector.Factory {}),
1131
+ PipelineConfigs : tt .pipelineConfigs ,
1132
+ }
1133
+
1134
+ pg , err := Build (context .Background (), set )
1135
+ require .NoError (t , err )
1136
+
1137
+ require .Equal (t , len (set .PipelineConfigs ), len (pg .pipelines ))
1138
+
1139
+ // For each component id, build a map of the instances of that component.
1140
+ // Use graph.Node.ID() as the key to determine uniqueness of instances.
1141
+ componentInstances := map [component.ID ]map [int64 ]struct {}{}
1142
+ for _ , pipeline := range pg .pipelines {
1143
+ for _ , n := range pipeline .receivers {
1144
+ r := n .(* receiverNode )
1145
+ if _ , ok := componentInstances [r .componentID ]; ! ok {
1146
+ componentInstances [r .componentID ] = map [int64 ]struct {}{}
1147
+ }
1148
+ componentInstances [r .componentID ][n .ID ()] = struct {}{}
1149
+ }
1150
+ for _ , n := range pipeline .processors {
1151
+ p := n .(* processorNode )
1152
+ if _ , ok := componentInstances [p .componentID ]; ! ok {
1153
+ componentInstances [p .componentID ] = map [int64 ]struct {}{}
1154
+ }
1155
+ componentInstances [p .componentID ][n .ID ()] = struct {}{}
1156
+ }
1157
+ for _ , n := range pipeline .exporters {
1158
+ e := n .(* exporterNode )
1159
+ if _ , ok := componentInstances [e .componentID ]; ! ok {
1160
+ componentInstances [e .componentID ] = map [int64 ]struct {}{}
1161
+ }
1162
+ componentInstances [e .componentID ][n .ID ()] = struct {}{}
1163
+ }
1164
+ }
1165
+
1166
+ var totalExpected int
1167
+ for id , instances := range componentInstances {
1168
+ totalExpected += tt .expectInstances [id ]
1169
+ require .Len (t , instances , tt .expectInstances [id ], id .String ())
1170
+ }
1171
+ totalExpected += len (tt .pipelineConfigs ) * 2 // one fanout & one capabilities node per pipeline
1172
+ require .Equal (t , totalExpected , pg .componentGraph .Nodes ().Len ())
1173
+ })
1174
+ }
1175
+ }
1176
+
1013
1177
func TestConnectorRouter (t * testing.T ) {
1014
1178
rcvrID := component .MustNewID ("examplereceiver" )
1015
1179
routeTracesID := component .MustNewIDWithName ("examplerouter" , "traces" )
0 commit comments