Skip to content

feat(cdk): handle include_files from configured catalog #512

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 30 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
a6658e6
connector builder: initial changes to pass file reference info to data
aldogonzalez8 Apr 17, 2025
66c1f7d
file-mode-api: refactor classes to independent files
aldogonzalez8 Apr 17, 2025
5aaa9fc
Auto-fix lint and format issues
Apr 17, 2025
3ff54b7
file-mode-api: fix imports
aldogonzalez8 Apr 17, 2025
c568be2
Auto-fix lint and format issues
Apr 17, 2025
52abace
file-api: fix mypy typing
aldogonzalez8 Apr 18, 2025
6136e2c
Auto-fix lint and format issues
Apr 18, 2025
0fbf122
file-api: minor changes to connector builder file uploader
aldogonzalez8 Apr 18, 2025
1e61e19
Update airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_…
aldogonzalez8 Apr 23, 2025
06e5581
Update unit_tests/sources/declarative/file/test_file_stream.py
aldogonzalez8 Apr 23, 2025
5d113ec
file-api: fix coderabbit messing
aldogonzalez8 Apr 23, 2025
e860e3f
file-mode-api: fix classes and names to fit our vocabulary.
aldogonzalez8 Apr 23, 2025
3ea1674
file-mode-api: run ruff format
aldogonzalez8 Apr 23, 2025
2035956
file-mode-api: run ruff check . --fix
aldogonzalez8 Apr 23, 2025
aa0a832
file-mode-api: Mock the test instead of messing with Classes
aldogonzalez8 Apr 23, 2025
c8e29d1
file-api: run ruff format .
aldogonzalez8 Apr 23, 2025
5df84a3
file-mode-api: initial changes to handle include_files selection from…
aldogonzalez8 Apr 26, 2025
8e54720
file-mode-api: ruff format .
aldogonzalez8 Apr 26, 2025
9933a52
file-mode-api: add more tests and make internal check of include file…
aldogonzalez8 Apr 26, 2025
13fac98
Merge branch 'main' into ac8/file-api/connector-builder-support-3
aldogonzalez8 Apr 26, 2025
35b293f
Merge branch 'ac8/file-api/connector-builder-support-3' into ac8/file…
aldogonzalez8 Apr 26, 2025
1f9522a
file-api: move catalog pass from streams() to ModelToComponentFactory…
aldogonzalez8 Apr 28, 2025
d1075e7
file-api: fix problem when catalog is None
aldogonzalez8 Apr 28, 2025
9b81b74
file-api: fix lint
aldogonzalez8 Apr 28, 2025
bc4c7d6
file-api: remove unused content_extractor
aldogonzalez8 Apr 28, 2025
be25429
Merge branch 'ac8/file-api/connector-builder-support-3' into ac8/file…
aldogonzalez8 Apr 28, 2025
5fb3f0f
merge from main
aldogonzalez8 Apr 28, 2025
a0a2ea6
file-api: remove duplicated test
aldogonzalez8 Apr 28, 2025
b2693d2
merge from main
aldogonzalez8 Apr 30, 2025
c914bbd
remove unnecesary default object
aldogonzalez8 Apr 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def __init__(
emit_connector_builder_messages=emit_connector_builder_messages,
disable_resumable_full_refresh=True,
connector_state_manager=self._connector_state_manager,
catalog=catalog,
)

super().__init__(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def __init__(
component_factory: Optional[ModelToComponentFactory] = None,
migrate_manifest: Optional[bool] = False,
normalize_manifest: Optional[bool] = False,
catalog: Optional[ConfiguredAirbyteCatalog] = None,
) -> None:
"""
Args:
Expand All @@ -124,6 +125,7 @@ def __init__(
else ModelToComponentFactory(
emit_connector_builder_messages,
max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
catalog=catalog,
)
)
self._message_repository = self._constructor.get_message_repository()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from airbyte_cdk.connector_builder.models import (
LogMessage as ConnectorBuilderLogMessage,
)
from airbyte_cdk.models import FailureType, Level
from airbyte_cdk.models import ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, FailureType, Level
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.declarative.async_job.job_orchestrator import AsyncJobOrchestrator
from airbyte_cdk.sources.declarative.async_job.job_tracker import JobTracker
Expand Down Expand Up @@ -583,6 +583,7 @@ def __init__(
disable_retries: bool = False,
disable_cache: bool = False,
disable_resumable_full_refresh: bool = False,
catalog: ConfiguredAirbyteCatalog = None,
message_repository: Optional[MessageRepository] = None,
connector_state_manager: Optional[ConnectorStateManager] = None,
max_concurrent_async_job_count: Optional[int] = None,
Expand All @@ -602,6 +603,9 @@ def __init__(
self._job_tracker: JobTracker = JobTracker(max_concurrent_async_job_count or 1)
# placeholder for deprecation warnings
self._collected_deprecation_logs: List[ConnectorBuilderLogMessage] = []
self._catalog_with_streams_name = (
self._get_catalog_with_streams_name(catalog) if catalog else None
)

def _init_mappings(self) -> None:
self.PYDANTIC_MODEL_TO_CONSTRUCTOR: Mapping[Type[BaseModel], Callable[..., Any]] = {
Expand Down Expand Up @@ -1887,8 +1891,9 @@ def create_declarative_stream(
)
file_uploader = None
if model.file_uploader:
include_files = self._get_include_files(model)
file_uploader = self._create_component_from_model(
model=model.file_uploader, config=config
model=model.file_uploader, config=config, include_files=include_files
)

retriever = self._create_component_from_model(
Expand Down Expand Up @@ -3679,7 +3684,7 @@ def create_fixed_window_call_rate_policy(
)

def create_file_uploader(
self, model: FileUploaderModel, config: Config, **kwargs: Any
self, model: FileUploaderModel, config: Config, include_files: bool, **kwargs: Any
) -> FileUploader:
name = "File Uploader"
requester = self._create_component_from_model(
Expand All @@ -3700,7 +3705,7 @@ def create_file_uploader(
download_target_extractor=download_target_extractor,
config=config,
file_writer=NoopFileWriter()
if emit_connector_builder_messages
if emit_connector_builder_messages or not include_files
else LocalFileSystemFileWriter(),
parameters=model.parameters or {},
filename_extractor=model.filename_extractor if model.filename_extractor else None,
Expand Down Expand Up @@ -3792,3 +3797,27 @@ def create_grouping_partition_router(
deduplicate=model.deduplicate if model.deduplicate is not None else True,
config=config,
)

@staticmethod
def _get_catalog_with_streams_name(
catalog: ConfiguredAirbyteCatalog,
) -> Mapping[str, ConfiguredAirbyteStream]:
"""
Returns a dict mapping stream names to their corresponding ConfiguredAirbyteStream objects.
"""
return {
configured_stream.stream.name: configured_stream
for configured_stream in catalog.streams
}

def _get_include_files(
self,
stream_model: DeclarativeStreamModel,
) -> bool:
"""
Returns the include_files for the stream if it exists in the catalog.
"""
if stream_model.name and self._catalog_with_streams_name:
configured_catalog_stream = self._catalog_with_streams_name.get(stream_model.name)
return bool(configured_catalog_stream and configured_catalog_stream.include_files)
return False
8 changes: 8 additions & 0 deletions airbyte_cdk/test/catalog_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ def with_json_schema(self, json_schema: Dict[str, Any]) -> "ConfiguredAirbyteStr
self._stream["stream"]["json_schema"] = json_schema
return self

def with_include_files(self, include_files: bool) -> "ConfiguredAirbyteStreamBuilder":
"""
Set whether the stream should include files in the sync.
:param include_files: True if files should be included, False otherwise.
"""
self._stream["include_files"] = include_files
return self

def build(self) -> ConfiguredAirbyteStream:
return ConfiguredAirbyteStreamSerializer.load(self._stream)

Expand Down
206 changes: 199 additions & 7 deletions unit_tests/sources/declarative/file/test_file_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,15 @@ def test_get_articles(self) -> None:
assert output.records

def test_get_article_attachments(self) -> None:
with HttpMocker() as http_mocker:
with (
HttpMocker() as http_mocker,
patch(
"airbyte_cdk.sources.declarative.retrievers.file_uploader.noop_file_writer.NoopFileWriter.write"
) as mock_noop_write,
patch(
"airbyte_cdk.sources.declarative.retrievers.file_uploader.local_file_system_file_writer.LocalFileSystemFileWriter.write"
) as mock_file_system_write,
):
http_mocker.get(
HttpRequest(url=STREAM_URL),
HttpResponse(json.dumps(find_template("file_api/articles", __file__)), 200),
Expand All @@ -138,14 +146,25 @@ def test_get_article_attachments(self) -> None:
),
)

file_size = 12345
mock_file_system_write.return_value = file_size # Simulate a file size

output = read(
self._config(),
CatalogBuilder()
.with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments"))
.with_stream(
ConfiguredAirbyteStreamBuilder()
.with_name("article_attachments")
.with_include_files(True)
)
.build(),
)

assert output.records
# Ensure that FileSystemFileWriter is called.
mock_file_system_write.assert_called()
# Ensure that NoopFileWriter is not called.
mock_noop_write.assert_not_called()
file_reference = output.records[0].record.file_reference
assert file_reference
assert file_reference.staging_file_url
Expand All @@ -158,7 +177,8 @@ def test_get_article_attachments(self) -> None:
)
assert file_reference.file_size_bytes

def test_get_article_attachments_with_filename_extractor(self) -> None:
def test_get_article_attachments_and_file_is_uploaded(self) -> None:
"""Test that article attachments can be read and the file is uploaded to the staging directory"""
with HttpMocker() as http_mocker:
http_mocker.get(
HttpRequest(url=STREAM_URL),
Expand All @@ -180,12 +200,68 @@ def test_get_article_attachments_with_filename_extractor(self) -> None:
output = read(
self._config(),
CatalogBuilder()
.with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments"))
.with_stream(
ConfiguredAirbyteStreamBuilder()
.with_name("article_attachments")
.with_include_files(True)
)
.build(),
yaml_file="test_file_stream_with_filename_extractor.yaml",
)
file_reference = output.records[0].record.file_reference
assert file_reference.file_size_bytes
assert Path(file_reference.staging_file_url).exists(), (
"File should be uploaded to the staging directory"
)

def test_get_article_attachments_with_filename_extractor(self) -> None:
"""Test that article attachments can be read with filename extractor and file system writer is called"""
with (
HttpMocker() as http_mocker,
patch(
"airbyte_cdk.sources.declarative.retrievers.file_uploader.noop_file_writer.NoopFileWriter.write"
) as mock_noop_write,
patch(
"airbyte_cdk.sources.declarative.retrievers.file_uploader.local_file_system_file_writer.LocalFileSystemFileWriter.write"
) as mock_file_system_write,
):
http_mocker.get(
HttpRequest(url=STREAM_URL),
HttpResponse(json.dumps(find_template("file_api/articles", __file__)), 200),
)
http_mocker.get(
HttpRequest(url=STREAM_ATTACHMENTS_URL),
HttpResponse(
json.dumps(find_template("file_api/article_attachments", __file__)), 200
),
)
http_mocker.get(
HttpRequest(url=STREAM_ATTACHMENT_CONTENT_URL),
HttpResponse(
find_binary_response("file_api/article_attachment_content.png", __file__), 200
),
)

file_size = 12345
mock_file_system_write.return_value = file_size # Simulate a file size

output = read(
self._config(),
CatalogBuilder()
.with_stream(
ConfiguredAirbyteStreamBuilder()
.with_name("article_attachments")
.with_include_files(True)
)
.build(),
yaml_file="test_file_stream_with_filename_extractor.yaml",
)

assert len(output.records) == 1
# Ensure that FileSystemFileWriter is called.
mock_file_system_write.assert_called()
# Ensure that NoopFileWriter is not called.
mock_noop_write.assert_not_called()
file_reference = output.records[0].record.file_reference
assert file_reference
assert (
Expand All @@ -196,10 +272,115 @@ def test_get_article_attachments_with_filename_extractor(self) -> None:
assert not re.match(
r"^article_attachments/[0-9a-fA-F-]{36}$", file_reference.source_file_relative_path
)
assert file_reference.file_size_bytes
assert file_reference.file_size_bytes == file_size

def test_get_article_attachments_with_include_files_false(self) -> None:
"""Test that article attachments can be read with including files False, it can be opt-out by configured catalog"""
include_files = False
with (
HttpMocker() as http_mocker,
patch(
"airbyte_cdk.sources.declarative.retrievers.file_uploader.noop_file_writer.NoopFileWriter.write"
) as mock_noop_write,
patch(
"airbyte_cdk.sources.declarative.retrievers.file_uploader.local_file_system_file_writer.LocalFileSystemFileWriter.write"
) as mock_file_system_write,
):
http_mocker.get(
HttpRequest(url=STREAM_URL),
HttpResponse(json.dumps(find_template("file_api/articles", __file__)), 200),
)
http_mocker.get(
HttpRequest(url=STREAM_ATTACHMENTS_URL),
HttpResponse(
json.dumps(find_template("file_api/article_attachments", __file__)), 200
),
)
http_mocker.get(
HttpRequest(url=STREAM_ATTACHMENT_CONTENT_URL),
HttpResponse(
find_binary_response("file_api/article_attachment_content.png", __file__), 200
),
)

mock_noop_write.return_value = NoopFileWriter.NOOP_FILE_SIZE

output = read(
self._config(),
CatalogBuilder()
.with_stream(
ConfiguredAirbyteStreamBuilder()
.with_name("article_attachments")
.with_include_files(include_files)
)
.build(),
yaml_file="test_file_stream_with_filename_extractor.yaml",
)

assert len(output.records) == 1
# Ensure that LocalFileSystemFileWriter is not called when include_files is False
mock_file_system_write.assert_not_called()
# Ensure that NoopFileWriter is called to simulate file writing
mock_noop_write.assert_called()
file_reference = output.records[0].record.file_reference
assert file_reference.file_size_bytes == NoopFileWriter.NOOP_FILE_SIZE

def test_get_article_attachments_without_include_files(self) -> None:
"""Test that article attachments can be read without including files, it can be opt-out by configured catalog"""
with (
HttpMocker() as http_mocker,
patch(
"airbyte_cdk.sources.declarative.retrievers.file_uploader.noop_file_writer.NoopFileWriter.write"
) as mock_noop_write,
patch(
"airbyte_cdk.sources.declarative.retrievers.file_uploader.local_file_system_file_writer.LocalFileSystemFileWriter.write"
) as mock_file_system_write,
):
http_mocker.get(
HttpRequest(url=STREAM_URL),
HttpResponse(json.dumps(find_template("file_api/articles", __file__)), 200),
)
http_mocker.get(
HttpRequest(url=STREAM_ATTACHMENTS_URL),
HttpResponse(
json.dumps(find_template("file_api/article_attachments", __file__)), 200
),
)
http_mocker.get(
HttpRequest(url=STREAM_ATTACHMENT_CONTENT_URL),
HttpResponse(
find_binary_response("file_api/article_attachment_content.png", __file__), 200
),
)

mock_noop_write.return_value = NoopFileWriter.NOOP_FILE_SIZE

output = read(
self._config(),
CatalogBuilder()
.with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments"))
.build(),
yaml_file="test_file_stream_with_filename_extractor.yaml",
)

assert len(output.records) == 1
# Ensure that LocalFileSystemFileWriter is not called when include_files is False
mock_file_system_write.assert_not_called()
# Ensure that NoopFileWriter is called to simulate file writing
mock_noop_write.assert_called()
file_reference = output.records[0].record.file_reference
assert file_reference.file_size_bytes == NoopFileWriter.NOOP_FILE_SIZE

def test_get_article_attachments_messages_for_connector_builder(self) -> None:
with HttpMocker() as http_mocker:
with (
HttpMocker() as http_mocker,
patch(
"airbyte_cdk.sources.declarative.retrievers.file_uploader.noop_file_writer.NoopFileWriter.write"
) as mock_noop_write,
patch(
"airbyte_cdk.sources.declarative.retrievers.file_uploader.local_file_system_file_writer.LocalFileSystemFileWriter.write"
) as mock_file_system_write,
):
http_mocker.get(
HttpRequest(url=STREAM_URL),
HttpResponse(json.dumps(find_template("file_api/articles", __file__)), 200),
Expand All @@ -217,6 +398,9 @@ def test_get_article_attachments_messages_for_connector_builder(self) -> None:
),
)

file_size = NoopFileWriter.NOOP_FILE_SIZE
mock_noop_write.return_value = file_size # Simulate a file size

# Define a mock factory that forces emit_connector_builder_messages=True
class MockModelToComponentFactory(OriginalModelToComponentFactory):
def __init__(self, *args, **kwargs):
Expand All @@ -231,12 +415,20 @@ def __init__(self, *args, **kwargs):
output = read(
self._config(),
CatalogBuilder()
.with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments"))
.with_stream(
ConfiguredAirbyteStreamBuilder()
.with_name("article_attachments")
.with_include_files(True)
)
.build(),
yaml_file="test_file_stream_with_filename_extractor.yaml",
)

assert len(output.records) == 1
# Ensure that NoopFileWriter is called.
mock_noop_write.assert_called()
# Ensure that LocalFileSystemFileWriter is not called.
mock_file_system_write.assert_not_called()
file_reference = output.records[0].record.file_reference
assert file_reference
assert file_reference.staging_file_url
Expand Down
Loading