Skip to content

feat: extend spec class for config migrations #538

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: pnilan/feat/implement-validators
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 180 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3803,6 +3803,33 @@ definitions:
title: Advanced Auth
description: Advanced specification for configuring the authentication flow.
"$ref": "#/definitions/AuthFlow"
config_normalization_rules:
title: Config Normalization Rules
type: object
additional_properties: false
properties:
config_migrations:
title: Config Migrations
description: The config will be migrated according to these transformations and updated within the platform for subsequent syncs.
type: array
items:
anyOf:
- "$ref": '#/definitions/ConfigTransformations/RemapField'
transformations:
title: Transformations
description: The list of transformations that will be applied on the incoming config at the start of a sync.
type: array
items:
anyOf:
- "$ref": '#/definitions/ConfigTransformations/RemapField'
validations:
title: Validations
description: The list of validations that will be performed on the incoming config before starting a sync
type: array
items:
anyOf:
- "$ref": "#/definitions/DpathValidator"
- "$ref": "#/definitions/PredicateValidator"
SubstreamPartitionRouter:
title: Substream Partition Router
description: Partition router that is used to retrieve records that have been partitioned according to records from the specified parent streams. An example of a parent stream is automobile brands and the substream would be the various car models associated with each branch.
Expand Down Expand Up @@ -4164,6 +4191,159 @@ definitions:
description: The GraphQL query to be executed
default: {}
additionalProperties: true
DpathValidator:
title: Dpath Validator
description: Validator that extracts the value located at a given field path.
type: object
required:
- type
- field_path
- validation_strategy
properties:
type:
type: string
enum: [ DpathValidator ]
field_path:
title: Field Path
description: List of potentially nested fields describing the full path of the field to validate. Use "*" to validate all values from an array.
type: array
items:
type: string
interpolation_context:
- config
examples:
- [ "data" ]
- [ "data", "records" ]
- [ "data", "{{ parameters.name }}" ]
- [ "data", "*", "record" ]
validation_strategy:
title: Validation Stragey
description: The condition that the specified config value will be evaluated against
anyOf:
- "$ref": "#/definitions/ValidateAdheresToSchema"
PredicateValidator:
title: Predicate Validator
description: Validator that applies a validation strategy to a specified value.
type: object
required:
- type
- value
- validation_strategy
properties:
type:
type: string
enum: [PredicateValidator]
value:
title: Value
description: The value to be validated. Can be a literal value or interpolated from configuration.
type:
- string
- number
- object
- array
- boolean
- "null"
interpolation_context:
- config
examples:
- "test-value"
- "{{ config['api_version'] }}"
- "{{ config['tenant_id'] }}"
- 123
validation_strategy:
title: Validation Strategy
description: The validation strategy to apply to the value.
anyOf:
- "$ref": "#/definitions/ValidateAdheresToSchema"
ValidateAdheresToSchema:
title: Validate Adheres To Schema
description: Validates that a user-provided schema adheres to a specified JSON schema.
type: object
required:
- type
- schema
properties:
type:
type: string
enum: [ValidateAdheresToSchema]
schema:
title: JSON Schema
description: The JSON schema used for validation.
type:
- string
- object
interpolation_context:
- config
examples:
- "{{ config['report_validation_schema'] }}"
- |
'{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Person",
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "The person's name"
},
"age": {
"type": "integer",
"minimum": 0,
"description": "The person's age"
}
},
"required": ["name", "age"]
}'
- $schema: "http://json-schema.org/draft-07/schema#"
title: Person
type: object
properties:
name:
type: string
description: "The person's name"
age:
type: integer
minimum: 0
description: "The person's age"
required:
- name
- age
ConfigTransformations:
RemapField:
title: Remap Field
description: Transformation that remaps a field's value to another value based on a static map.
type: object
required:
- type
- map
- field_path
properties:
type:
type: string
enum: [RemapField]
map:
title: Value Mapping
description: A mapping of original values to new values. When a field value matches a key in this map, it will be replaced with the corresponding value.
type:
- object
- string
additionalProperties: true
examples:
- pending: "in_progress"
done: "completed"
cancelled: "terminated"
- "{{ config['status_mapping'] }}"
field_path:
title: Field Path
description: The path to the field whose value should be remapped. Specified as a list of path components to navigate through nested objects.
type: array
items:
type: string
examples:
- ["status"]
- ["data", "status"]
- ["data", "{{ parameters.name }}", "status"]
- ["data", "*", "status"]
interpolation:
variables:
- title: config
Expand Down
157 changes: 136 additions & 21 deletions airbyte_cdk/sources/declarative/models/declarative_component_schema.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

# generated by datamodel-codegen:
# filename: declarative_component_schema.yaml

Expand Down Expand Up @@ -1523,6 +1521,62 @@ class Config:
query: Dict[str, Any] = Field(..., description="The GraphQL query to be executed")


class ValidateAdheresToSchema(BaseModel):
type: Literal["ValidateAdheresToSchema"]
schema_: Union[str, Dict[str, Any]] = Field(
...,
alias="schema",
description="The JSON schema used for validation.",
examples=[
"{{ config['report_validation_schema'] }}",
'\'{\n "$schema": "http://json-schema.org/draft-07/schema#",\n "title": "Person",\n "type": "object",\n "properties": {\n "name": {\n "type": "string",\n "description": "The person\'s name"\n },\n "age": {\n "type": "integer",\n "minimum": 0,\n "description": "The person\'s age"\n }\n },\n "required": ["name", "age"]\n}\'\n',
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Person",
"type": "object",
"properties": {
"name": {"type": "string", "description": "The person's name"},
"age": {
"type": "integer",
"minimum": 0,
"description": "The person's age",
},
},
"required": ["name", "age"],
},
],
title="JSON Schema",
)


class ConfigTransformations(BaseModel):
__root__: Any


class RemapField(BaseModel):
type: Literal["RemapField"]
map: Union[Dict[str, Any], str] = Field(
...,
description="A mapping of original values to new values. When a field value matches a key in this map, it will be replaced with the corresponding value.",
examples=[
{"pending": "in_progress", "done": "completed", "cancelled": "terminated"},
"{{ config['status_mapping'] }}",
],
title="Value Mapping",
)
field_path: List[str] = Field(
...,
description="The path to the field whose value should be remapped. Specified as a list of path components to navigate through nested objects.",
examples=[
["status"],
["data", "status"],
["data", "{{ parameters.name }}", "status"],
["data", "*", "status"],
],
title="Field Path",
)


class AddedFieldDefinition(BaseModel):
type: Literal["AddedFieldDefinition"]
path: List[str] = Field(
Expand Down Expand Up @@ -1905,29 +1959,49 @@ class GzipDecoder(BaseModel):
decoder: Union[CsvDecoder, GzipDecoder, JsonDecoder, JsonlDecoder]


class Spec(BaseModel):
type: Literal["Spec"]
connection_specification: Dict[str, Any] = Field(
class RequestBodyGraphQL(BaseModel):
type: Literal["RequestBodyGraphQL"]
value: RequestBodyGraphQlQuery


class DpathValidator(BaseModel):
type: Literal["DpathValidator"]
field_path: List[str] = Field(
...,
description="A connection specification describing how a the connector can be configured.",
title="Connection Specification",
)
documentation_url: Optional[str] = Field(
None,
description="URL of the connector's documentation page.",
examples=["https://docs.airbyte.com/integrations/sources/dremio"],
title="Documentation URL",
description='List of potentially nested fields describing the full path of the field to validate. Use "*" to validate all values from an array.',
examples=[
["data"],
["data", "records"],
["data", "{{ parameters.name }}"],
["data", "*", "record"],
],
title="Field Path",
)
advanced_auth: Optional[AuthFlow] = Field(
None,
description="Advanced specification for configuring the authentication flow.",
title="Advanced Auth",
validation_strategy: ValidateAdheresToSchema = Field(
...,
description="The condition that the specified config value will be evaluated against",
title="Validation Stragey",
)


class RequestBodyGraphQL(BaseModel):
type: Literal["RequestBodyGraphQL"]
value: RequestBodyGraphQlQuery
class PredicateValidator(BaseModel):
type: Literal["PredicateValidator"]
value: Optional[Union[str, float, Dict[str, Any], List[Any], bool]] = Field(
...,
description="The value to be validated. Can be a literal value or interpolated from configuration.",
examples=[
"test-value",
"{{ config['api_version'] }}",
"{{ config['tenant_id'] }}",
123,
],
title="Value",
)
validation_strategy: ValidateAdheresToSchema = Field(
...,
description="The validation strategy to apply to the value.",
title="Validation Strategy",
)


class CompositeErrorHandler(BaseModel):
Expand Down Expand Up @@ -1985,6 +2059,47 @@ class Config:
)


class ConfigNormalizationRules(BaseModel):
config_migrations: Optional[List[RemapField]] = Field(
[],
description="The config will be migrated according to these transformations and updated within the platform for subsequent syncs.",
title="Config Migrations",
)
transformations: Optional[List[RemapField]] = Field(
[],
description="The list of transformations that will be applied on the incoming config at the start of a sync.",
title="Transformations",
)
validations: Optional[List[Union[DpathValidator, PredicateValidator]]] = Field(
[],
description="The list of validations that will be performed on the incoming config before starting a sync",
title="Validations",
)


class Spec(BaseModel):
type: Literal["Spec"]
connection_specification: Dict[str, Any] = Field(
...,
description="A connection specification describing how a the connector can be configured.",
title="Connection Specification",
)
documentation_url: Optional[str] = Field(
None,
description="URL of the connector's documentation page.",
examples=["https://docs.airbyte.com/integrations/sources/dremio"],
title="Documentation URL",
)
advanced_auth: Optional[AuthFlow] = Field(
None,
description="Advanced specification for configuring the authentication flow.",
title="Advanced Auth",
)
config_normalization_rules: Optional[ConfigNormalizationRules] = Field(
None, title="Config Normalization Rules"
)


class DeclarativeSource1(BaseModel):
class Config:
extra = Extra.forbid
Expand Down Expand Up @@ -2164,7 +2279,7 @@ class Config:
]
] = Field(
None,
description="Component used to retrieve the schema for the current stream.",
description="One or many schema loaders can be used to retrieve the schema for the current stream. When multiple schema loaders are defined, schema properties will be merged together. Schema loaders defined first taking precedence in the event of a conflict.",
title="Schema Loader",
)
transformations: Optional[
Expand Down
Loading
Loading