Skip to content

feat: destination discover PoC and adding sync modes #527

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: main
Choose a base branch
from

Conversation

maxi297
Copy link
Contributor

@maxi297 maxi297 commented May 1, 2025

What

This PR contains various changes needed for DA. The changes are:

  • Cleanup to allow destinations to call parse_args without having to instantiate a destination object (I also added support to debug log while doing this)
  • Adding the discover command for destinations
  • Adding more features to build a json schema:
    • Ability to set allow_additional_properties == False
    • Ability to add other fields than type within each property objects
  • Ability to configure DestinationSyncMode in the configured catalog
  • Fix regarding HTTP matcher

For more context, the following PRs can be useful:

Summary by CodeRabbit

  • New Features
    • Added a new "discover" command for destinations, enabling catalog discovery via the command-line interface.
    • Introduced configurable handling of additional properties in dynamic JSON schema generation, with support for custom property field inferrers.
    • Enabled setting destination sync mode and destination object name in catalog stream configuration.
  • Bug Fixes
    • Improved error handling for invalid JSON bodies in HTTP request comparisons to prevent exceptions.
  • Tests
    • Added tests for dynamic schema loader features, including additional property field inference and stricter property validation.
    • Enhanced HTTP request matcher tests to cover mismatched body types.
    • Extended destination argument parsing tests to include debug flag verification.
  • Documentation
    • Marked the new property fields inferrer class as experimental and deprecated with a warning.

@@ -72,7 +72,11 @@ def _to_mapping(
elif isinstance(body, bytes):
return json.loads(body.decode()) # type: ignore # assumes return type of Mapping[str, Any]
elif isinstance(body, str):
return json.loads(body) # type: ignore # assumes return type of Mapping[str, Any]
try:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this addition, depending on the order of evaluation, test test_given_on_match_is_mapping_but_not_input_when_matches_then_return_false would fail

@maxi297 maxi297 requested a review from aaronsteers May 2, 2025 15:20
@maxi297
Copy link
Contributor Author

maxi297 commented May 2, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

@maxi297 maxi297 changed the title [WIP] destination discover PoC [WIP] destination discover PoC and adding sync modes May 2, 2025
@maxi297 maxi297 requested a review from lazebnyi May 5, 2025 19:04
@maxi297 maxi297 marked this pull request as ready for review May 5, 2025 19:06
Copy link
Contributor

coderabbitai bot commented May 5, 2025

📝 Walkthrough

Walkthrough

This update introduces a "discover" command to the destination CLI, adds an inferrer mechanism and configurability for additional properties in dynamic schema loading, and enhances test coverage for these features. Error handling for non-JSON bodies in HTTP request testing is improved, and builder/test utilities are extended for destination sync modes and schema property scenarios.

Changes

File(s) Change Summary
airbyte_cdk/destinations/destination.py Added "discover" CLI command to Destination; refactored argument parsing; introduced abstract discover() method; extended command handling for catalog discovery.
airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py Introduced AdditionalPropertyFieldsInferrer abstract class; added inferrer and allow_additional_properties options to DynamicSchemaLoader; adjusted schema generation and typing.
airbyte_cdk/test/catalog_builder.py Added with_destination_sync_mode and with_destination_object_name methods to ConfiguredAirbyteStreamBuilder; changed default sync_mode in with_stream to SyncMode.full_refresh.
airbyte_cdk/test/mock_http/request.py Improved error handling in HttpRequest._to_mapping for non-JSON string bodies to avoid exceptions.
unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py Added helper, inferrer test class, and tests for additional property inferrer and allow_additional_properties in DynamicSchemaLoader.
unit_tests/test/mock_http/test_request.py Added test for mismatched mapping/non-mapping request body types in HttpRequestMatcherTest.
unit_tests/destinations/test_destination.py Updated test parameterization to include "debug": False in expected parsed args for destination CLI commands.
airbyte_cdk/models/__init__.py Added imports for DestinationCatalog and DestinationOperation from airbyte_protocol.
pyproject.toml Changed airbyte-protocol-models-dataclasses dependency to fixed version 0.15.0.dev1746648278 from testpypi source; added testpypi package source.
airbyte_cdk/models/airbyte_protocol_serializers.py Reorganized imports: moved serpyco_rs imports below Airbyte protocol dataclasses imports; changed from relative to absolute imports for protocol dataclasses.
airbyte_cdk/logger.py Changed import sources for Airbyte models and serializers to airbyte_protocol_dataclasses.models and adjusted import order.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant CLI
    participant Destination
    participant discover()
    participant AirbyteCatalog

    User->>CLI: Run "discover" command with --config
    CLI->>Destination: parse_args("discover", --config)
    CLI->>Destination: run_cmd(parsed_args)
    Destination->>discover(): discover()
    discover()-->>Destination: AirbyteCatalog
    Destination->>CLI: yield AirbyteMessage(type=CATALOG, catalog)
    CLI->>User: Output catalog message
Loading
sequenceDiagram
    participant Loader as DynamicSchemaLoader
    participant Inferrer as AdditionalPropertyFieldsInferrer

    Loader->>Loader: get_json_schema()
    alt additional_property_fields_inferrer is set
        Loader->>Inferrer: infer(property_definition)
        Inferrer-->>Loader: additional_fields
        Loader->>Loader: Merge additional_fields into property_definition
    end
    Loader->>Loader: Set "additionalProperties" per allow_additional_properties
    Loader-->>Caller: Return JSON schema
Loading

Suggested labels

enhancement

Suggested reviewers

  • lazebnyi
  • bazarnov

Would you like to add more documentation or usage examples for the new "discover" command and schema inferrer, wdyt?

Tip

⚡️ Faster reviews with caching
  • CodeRabbit now supports caching for code and dependencies, helping speed up reviews. This means quicker feedback, reduced wait times, and a smoother review experience overall. Cached data is encrypted and stored securely. This feature will be automatically enabled for all accounts on May 16th. To opt out, configure Review - Disable Cache at either the organization or repository level. If you prefer to disable all data retention across your organization, simply turn off the Data Retention setting under your Organization Settings.

Enjoy the performance boost—your workflow just got faster.


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 18d36f5 and ceac06c.

📒 Files selected for processing (1)
  • airbyte_cdk/test/catalog_builder.py (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • airbyte_cdk/test/catalog_builder.py
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: Publish SDM to DockerHub
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: SDM Docker Image Build
✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (6)
airbyte_cdk/test/catalog_builder.py (1)

37-41: Add minimal doc-string and type hint to the new builder helper?

with_destination_sync_mode() is handy! For consistency with with_sync_mode(), would you like to:

  1. add a short doc-string describing the param, and
  2. return self with an explicit -> "ConfiguredAirbyteStreamBuilder" (already present but worth keeping)?

Small thing, but it keeps the fluent API self-documenting – wdyt?

airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py (2)

185-191: Potential key override when merging inferred fields

value.update(extra_fields) will silently overwrite keys such as "type" if an inferrer returns a colliding key.
Should we detect collisions and either raise or log a warning to avoid hard-to-debug schema issues? Something like:

for k in extra_fields:
    if k in value:
        logger.warning("Overwriting %s in inferred property fields", k)
value.update(extra_fields)

Would that be helpful, wdyt?


295-303: Minor typing nit – remove the type: ignore?

Since _get_airbyte_type now returns Dict[str, Any], the mypy ignore on the deepcopy return may no longer be needed.
Could we try dropping the # type: ignore and see if the type checker is happy? wdyt?

airbyte_cdk/destinations/destination.py (2)

30-90: Duplicated CLI parser – reuse existing helper to avoid drift?

The new parse_args() is almost identical to the one in airbyte_cdk/entrypoint.py.
Would it make sense to extract a shared helper (e.g., cdk.cli_utils.parse_destination_args) so that future flag changes don’t need to be duplicated in two places? wdyt?


138-141: Static wrapper seems redundant

Since the free-function parse_args already exists in the module, the @staticmethod wrapper could be removed and callers can import the function directly, reducing one level of indirection.
Is the extra layer intentional for API stability, or could we drop it, wdyt?

unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py (1)

426-429: Well-structured mock helper function.

This utility function elegantly creates a mock Retriever that returns the specified HTTP response body, making the new tests cleaner and more readable. What do you think about adding a docstring to clarify its purpose for future contributors? wdyt?

def _mock_schema_loader_retriever(http_response_body) -> Retriever:
+    """
+    Create a mock Retriever that returns the specified HTTP response body.
+    
+    Args:
+        http_response_body: The HTTP response body to be returned by the retriever
+        
+    Returns:
+        A mock Retriever instance
+    """
    retriever = Mock(spec=Retriever)
    retriever.read_records.return_value = iter([http_response_body])
    return retriever
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ce2a7bb and f2c993e.

📒 Files selected for processing (6)
  • airbyte_cdk/destinations/destination.py (4 hunks)
  • airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py (8 hunks)
  • airbyte_cdk/test/catalog_builder.py (2 hunks)
  • airbyte_cdk/test/mock_http/request.py (1 hunks)
  • unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py (3 hunks)
  • unit_tests/test/mock_http/test_request.py (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (3)
airbyte_cdk/test/mock_http/request.py (1)
airbyte_cdk/test/mock_http/response.py (1)
  • body (19-20)
airbyte_cdk/destinations/destination.py (6)
airbyte_cdk/entrypoint.py (2)
  • parse_args (65-118)
  • discover (224-233)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
  • discover (175-181)
unit_tests/test_entrypoint.py (1)
  • discover (55-56)
airbyte_cdk/sources/abstract_source.py (1)
  • discover (85-90)
unit_tests/sources/test_source.py (2)
  • discover (47-48)
  • catalog (70-93)
airbyte_cdk/models/airbyte_protocol.py (1)
  • AirbyteMessage (79-88)
airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py (3)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
  • ComplexFieldType (806-808)
airbyte_cdk/sources/source.py (1)
  • ExperimentalClassWarning (24-25)
unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py (1)
  • infer (436-437)
⏰ Context from checks skipped due to timeout of 90000ms (5)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (12)
airbyte_cdk/test/mock_http/request.py (1)

75-79: Excellent error handling improvement!

This is a great addition for making the code more robust by handling JSON decode errors gracefully. Instead of raising an exception when a string isn't valid JSON, returning None is appropriate since the comparison should fail anyway when one body is a mapping and another isn't. This change helps prevent test failures that would happen depending on the order of evaluation.

The comment you included also explains the reasoning clearly, which is helpful for future maintainers.

unit_tests/test/mock_http/test_request.py (1)

160-165: Good test case addition!

This test effectively verifies that when one request has a mapping body and the other has a non-JSON string body, the matches method correctly returns False. It's a nice complement to the error handling added in the _to_mapping method.

It's worth noting that without the error handling added to the _to_mapping method, this test could fail depending on which request is evaluated first when checking for a match. Great job on covering this edge case!

airbyte_cdk/test/catalog_builder.py (1)

5-6: Import looks good – just a quick check on dependency boundaries

Nice addition 👍. Just make sure DestinationSyncMode is available in all test environments (some downstream projects run the test helpers without the full protocol package).
Maybe add a defensive try / except ImportError around the import, or document the extra requirement, wdyt?

airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py (2)

160-162: Default for allow_additional_properties – safe by default?

The new flag defaults to True. For many connectors we’d like schemas to be strict unless the author explicitly opts-in.
Have we considered defaulting to False to follow “secure by default” principles, or at least logging a warning when it’s True, wdyt?


196-200: Great touch exposing additionalProperties

Just a 👍 that wiring this flag straight into the output schema is elegant and keeps the JSON Schema minimal.

airbyte_cdk/destinations/destination.py (1)

164-166: Forward config to discover() for symmetry

If we adopt the earlier suggestion to pass config, we’d need:

-            yield AirbyteMessage(type=Type.CATALOG, catalog=self.discover())
+            yield AirbyteMessage(type=Type.CATALOG, catalog=self.discover(config=config))

Keeps runtime behaviour consistent – wdyt?

unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py (6)

7-8: Imports properly updated for the new functionality.

The additional imports make sense for supporting the new test features related to type handling in additional properties and mocking. Good job keeping imports organized.


18-26: Comprehensive imports for the new schema testing capabilities.

Clear organization of imports to support the new testing functionality. The explicit import of AdditionalPropertyFieldsInferrer makes the code's intent easier to follow.


32-32: Good use of a constant for parameters.

Creating a consistent _ANY_PARAMETERS constant helps maintain uniformity across tests. Nice touch!


432-437: Well-implemented test inferrer class.

This test implementation of AdditionalPropertyFieldsInferrer is clean and focused - it returns predefined properties which simplifies testing. The interface is clear and follows good practices.


476-507: Thorough test for additional properties configuration.

The test clearly demonstrates that when allow_additional_properties=False and no inferrer is provided, the schema correctly sets additionalProperties: False while maintaining the expected property definitions.

The setup is clean with good separation of concerns between this test and the previous one, allowing for focused testing of each feature.


440-473:

✅ Verification successful

Comprehensive test for property field inferrer functionality.

This test effectively verifies that additional property fields are correctly merged into the schema and that additionalProperties is set to False when configured. The use of dictionary union operator on line 447 is clean and modern.

Note: The dictionary merge operator (|) requires Python 3.9+. Is this compatibility requirement intentional and documented elsewhere in the project? wdyt?


🏁 Script executed:

#!/bin/bash
# Check the required Python version in setup.py or similar files

echo "Checking Python version requirements..."
cat setup.py | grep -E "python_requires|python_version"

Length of output: 187


Let’s check if the project actually pins Python 3.9+ in its config files (e.g. pyproject.toml, setup.cfg, tox.ini):


🏁 Script executed:

#!/bin/bash
# Searching for Python version constraints in project config files
echo "Searching for 'python_requires' or 'requires-python' across the repository..."
rg -n 'python_requires|requires-python' .
echo "Searching for version specifiers '>=3.' across the repository..."
rg -n '>= *3\.[0-9]+' .

Length of output: 21953


Python 3.9+ support is already guaranteed

The project’s pyproject.toml specifies python = ">=3.10,<3.13", so using the | operator is fully supported and intentional. No further changes needed here.

• See pyproject.toml (line 32): python = ">=3.10,<3.13"

Comment on lines +116 to +146
@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
class AdditionalPropertyFieldsInferrer(ABC):
"""
Infers additional fields to be added to each property. For example, if this inferrer returns {"toto": "tata"}, a property that would have looked like this:
```
"properties": {
"Id": {
"type": ["null", "string"],
},
<...>
}
```
... will look like this:
```
"properties": {
"Id": {
"type": ["null", "string"],
"toto": "tata"
},
<...>
}
```
"""

@abstractmethod
def infer(self, property_definition: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
"""
Infers additional property fields from the given property definition.
"""
pass

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider making the inferrer contract read-only

infer() receives a mutable property_definition. Accidental in-place edits by an inferrer could leak into the final schema.
Would switching the parameter type from MutableMapping to Mapping (and passing a copy) help protect against that?

-    def infer(self, property_definition: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
+    def infer(self, property_definition: Mapping[str, Any]) -> Mapping[str, Any]:

That way implementers must return a new dict, reducing side-effects – wdyt?

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
class AdditionalPropertyFieldsInferrer(ABC):
"""
Infers additional fields to be added to each property. For example, if this inferrer returns {"toto": "tata"}, a property that would have looked like this:
```
"properties": {
"Id": {
"type": ["null", "string"],
},
<...>
}
```
... will look like this:
```
"properties": {
"Id": {
"type": ["null", "string"],
"toto": "tata"
},
<...>
}
```
"""
@abstractmethod
def infer(self, property_definition: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
"""
Infers additional property fields from the given property definition.
"""
pass
@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
class AdditionalPropertyFieldsInferrer(ABC):
"""
Infers additional fields to be added to each property. For example, if this inferrer returns {"toto": "tata"}, a property that would have looked like this:

@maxi297 maxi297 changed the title [WIP] destination discover PoC and adding sync modes feat: destination discover PoC and adding sync modes May 5, 2025
Comment on lines +138 to +140
@staticmethod
def parse_args(args: List[str]) -> argparse.Namespace:
return parse_args(args)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In terms of patterns, I think my preference here and proposed best best practice (eventually) is that we make this a class method on the base connector class. I would call it "launch" and then every connector class could be able to invoke itself. In theory, all sources and destinations could share the same implementation across connectors of the same type, and unless the connector needs something special (a code smell anyway), the "cls" input arg should be all you need in order to instantiate a connector using CLI args

I don't think we need to tackle all the scope here in this PR, but your implementation is already very close to what I think is the ideal state, so I'll mention it here as a non-blocking proposal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense to me. I started diverging a bit from the current solution because the run_cmd it is not static so we need to instantiate the object before calling it but the way we instantiate the destination depends on how the method of the protocal that is called (see this piece of code for an example).

So do we agree that the launch method would be static? If so, I think we are moving the right direction. I can start adding this in later changes. If not, I'll need more information about what you had in mind.

Copy link
Contributor

@aaronsteers aaronsteers May 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maxi297 - Difference b/w static method and class method for this use case is just the a class method knows what class it is (static methods don't get a cls input), and that class method implementations can be inherited by subclasses. So, yes, agreed method is static in a sense that it doesn't need the object, but in order to not need to implement on each class, I think making a class method is slightly cleaner in the long run. Sorry if I'm not good at explaining this, and please don't block on my comment - either way, it's a step in the right direction, I think, if the class knows how to instantiate itself, without needing an external class or function in order to be instantiated. 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maxi297 - Here's the early implementation I wrote for S3 a while back...

    @classmethod
    def launch(cls, args: list[str] | None = None) -> None:
        """Launch the source using the provided CLI args.

        If no args are provided, the launch args will be inferred automatically.

        In the future, we should consider moving this method to the Connector base class,
        so that all sources and destinations can launch themselves and so none of this
        code needs to live in the connector itself.
        """
        args = args or sys.argv[1:]
        catalog_path = AirbyteEntrypoint.extract_catalog(args)
        # TODO: Delete if not needed:
        # config_path = AirbyteEntrypoint.extract_config(args)
        # state_path = AirbyteEntrypoint.extract_state(args)

        source = cls.create(
            configured_catalog_path=Path(catalog_path) if catalog_path else None,
        )
        # The following function will wrap the execution in proper error handling.
        # Failures prior to here may not emit proper Airbyte TRACE or CONNECTION_STATUS messages.
        launch(
            source=source,
            args=args,
        )

This wasn't a very good implementation, but you can see the generic cls.create() ref.

Where cls.create() was defined as:

    @classmethod
    def create(
        cls,
        *,
        configured_catalog_path: Path | str | None = None,
    ) -> SourceS3:
        """Create a new instance of the source.
        ...
        # A bunch of hacky stuff here.
        ...
        return cls(
            # These are the defaults for the source. No need for a caller to change them:
            stream_reader=SourceS3StreamReader(),
            spec_class=Config,
            cursor_cls=Cursor,
            # This is needed early. (We also will provide it again later.)
            catalog=configured_catalog,
            # These will be provided later, after we have wrapped proper error handling.
            config=None,
            state=None,
        )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another problem with our current design is that we ask for config, catalog, etc in the constructor, but then pass it again during invocation of 'check', 'discover', etc. I think it would be better to not need these in the constructor at all, in which case we greatly simplify the process of creating a connector class, and we put all code that can fail in a code path that can properly message about any failures.

Again - hopefully this is helpful for long-term thinking, but I don't think it needs to block the current PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

classmethod makes sense, yes!

And I would prefer to have these as part of the constructor personally as if we don't, each method will need to instantiate the object it needs every time is called or have some kind on intelligent accessor which would check if the field has already been instantiated and if not instantiate it. It feels complex instead of just instantiating the connector properly in launch. WDYT?

@github-actions github-actions bot added the enhancement New feature or request label May 6, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🔭 Outside diff range comments (1)
unit_tests/destinations/test_destination.py (1)

1-1: ⚠️ Potential issue

Fix Ruff formatting issues

There's a pipeline failure indicating Ruff formatting issues. Could you run ruff format to fix the code style issues?

#!/bin/bash
# Run Ruff formatter to fix code style issues
ruff format unit_tests/destinations/test_destination.py
🧰 Tools
🪛 GitHub Actions: Linters

[error] 1-1: Ruff formatting check failed. File would be reformatted. Run 'ruff format' to fix code style issues.

🧹 Nitpick comments (1)
unit_tests/destinations/test_destination.py (1)

49-54: Consider testing the debug flag when explicitly set to True

The test cases are verifying the default debug=False behavior, but there's no test case that verifies what happens when debug=True is explicitly set. Would it be helpful to add a test case like this to verify the flag works correctly when enabled, wdyt?

        [
            (["spec"], {"command": "spec", "debug": False}),
+           (["spec", "--debug"], {"command": "spec", "debug": True}),
            (["check", "--config", "bogus_path/"], {"command": "check", "config": "bogus_path/", "debug": False}),
            (
                ["write", "--config", "config_path1", "--catalog", "catalog_path1"],
                {"command": "write", "config": "config_path1", "catalog": "catalog_path1", "debug": False},
            ),
        ],
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f2c993e and c6c8d8f.

📒 Files selected for processing (1)
  • unit_tests/destinations/test_destination.py (1 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
unit_tests/destinations/test_destination.py

[error] 1-1: Ruff formatting check failed. File would be reformatted. Run 'ruff format' to fix code style issues.

⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Analyze (python)

Comment on lines +49 to 54
(["spec"], {"command": "spec", "debug": False}),
(["check", "--config", "bogus_path/"], {"command": "check", "config": "bogus_path/", "debug": False}),
(
["write", "--config", "config_path1", "--catalog", "catalog_path1"],
{"command": "write", "config": "config_path1", "catalog": "catalog_path1"},
{"command": "write", "config": "config_path1", "catalog": "catalog_path1", "debug": False},
),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Test coverage gap for the new "discover" command

I notice the test parameterization has been updated to include "debug": False in the expected outputs, which aligns with your new debug logging functionality. However, I don't see any test cases for the new discover command mentioned in the PR summary. Should we add a test case for this new functionality, wdyt?

        [
            (["spec"], {"command": "spec", "debug": False}),
            (["check", "--config", "bogus_path/"], {"command": "check", "config": "bogus_path/", "debug": False}),
            (
                ["write", "--config", "config_path1", "--catalog", "catalog_path1"],
                {"command": "write", "config": "config_path1", "catalog": "catalog_path1", "debug": False},
            ),
+           (
+               ["discover", "--config", "config_path1"],
+               {"command": "discover", "config": "config_path1", "debug": False},
+           ),
        ],
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
(["spec"], {"command": "spec", "debug": False}),
(["check", "--config", "bogus_path/"], {"command": "check", "config": "bogus_path/", "debug": False}),
(
["write", "--config", "config_path1", "--catalog", "catalog_path1"],
{"command": "write", "config": "config_path1", "catalog": "catalog_path1"},
{"command": "write", "config": "config_path1", "catalog": "catalog_path1", "debug": False},
),
[
(["spec"], {"command": "spec", "debug": False}),
(["check", "--config", "bogus_path/"], {"command": "check", "config": "bogus_path/", "debug": False}),
(
["write", "--config", "config_path1", "--catalog", "catalog_path1"],
{"command": "write", "config": "config_path1", "catalog": "catalog_path1", "debug": False},
),
(
["discover", "--config", "config_path1"],
{"command": "discover", "config": "config_path1", "debug": False},
),
],


return {
"$schema": "https://json-schema.org/draft-07/schema#",
"type": "object",
"additionalProperties": True,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💎 Nice. Appreciate that we're following JSON Schema standards for communicating this. 👍

Copy link
Contributor

@aaronsteers aaronsteers left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maxi297 - I have reviewed all the code here and it looks good. 👍

Thanks for responding to my comments - and again, nothing blocking there.

What I didn't find was a definition of DestinationSyncMode or any changes to sync modes. I did want to review those - can you point me to them?

Comment on lines +138 to +140
@staticmethod
def parse_args(args: List[str]) -> argparse.Namespace:
return parse_args(args)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maxi297 - Here's the early implementation I wrote for S3 a while back...

    @classmethod
    def launch(cls, args: list[str] | None = None) -> None:
        """Launch the source using the provided CLI args.

        If no args are provided, the launch args will be inferred automatically.

        In the future, we should consider moving this method to the Connector base class,
        so that all sources and destinations can launch themselves and so none of this
        code needs to live in the connector itself.
        """
        args = args or sys.argv[1:]
        catalog_path = AirbyteEntrypoint.extract_catalog(args)
        # TODO: Delete if not needed:
        # config_path = AirbyteEntrypoint.extract_config(args)
        # state_path = AirbyteEntrypoint.extract_state(args)

        source = cls.create(
            configured_catalog_path=Path(catalog_path) if catalog_path else None,
        )
        # The following function will wrap the execution in proper error handling.
        # Failures prior to here may not emit proper Airbyte TRACE or CONNECTION_STATUS messages.
        launch(
            source=source,
            args=args,
        )

This wasn't a very good implementation, but you can see the generic cls.create() ref.

Where cls.create() was defined as:

    @classmethod
    def create(
        cls,
        *,
        configured_catalog_path: Path | str | None = None,
    ) -> SourceS3:
        """Create a new instance of the source.
        ...
        # A bunch of hacky stuff here.
        ...
        return cls(
            # These are the defaults for the source. No need for a caller to change them:
            stream_reader=SourceS3StreamReader(),
            spec_class=Config,
            cursor_cls=Cursor,
            # This is needed early. (We also will provide it again later.)
            catalog=configured_catalog,
            # These will be provided later, after we have wrapped proper error handling.
            config=None,
            state=None,
        )

Comment on lines +138 to +140
@staticmethod
def parse_args(args: List[str]) -> argparse.Namespace:
return parse_args(args)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another problem with our current design is that we ask for config, catalog, etc in the constructor, but then pass it again during invocation of 'check', 'discover', etc. I think it would be better to not need these in the constructor at all, in which case we greatly simplify the process of creating a connector class, and we put all code that can fail in a code path that can properly message about any failures.

Again - hopefully this is helpful for long-term thinking, but I don't think it needs to block the current PR.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
pyproject.toml (1)

94-98: Document Custom Source Rationale
The testpypi supplemental source is added solely to fetch the dev protocol-models-dataclasses package. Could you add a brief comment above this block explaining that it's temporary until the next official release? This will help future maintainers understand its purpose. wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c6c8d8f and af0640c.

⛔ Files ignored due to path filters (1)
  • poetry.lock is excluded by !**/*.lock
📒 Files selected for processing (4)
  • airbyte_cdk/destinations/destination.py (4 hunks)
  • airbyte_cdk/models/__init__.py (1 hunks)
  • airbyte_cdk/test/catalog_builder.py (3 hunks)
  • pyproject.toml (2 hunks)
✅ Files skipped from review due to trivial changes (1)
  • airbyte_cdk/models/init.py
🚧 Files skipped from review as they are similar to previous changes (2)
  • airbyte_cdk/test/catalog_builder.py
  • airbyte_cdk/destinations/destination.py
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: preview_docs
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Analyze (python)
🔇 Additional comments (1)
pyproject.toml (1)

33-33:

✅ Verification successful

Verify Dev Pre-release Version Usage
You've pinned airbyte-protocol-models-dataclasses to a specific dev build (0.15.0.dev1746621859) from the testpypi index. Would you consider confirming that this version is stable and available (and that it won't get yanked), or switching to a released SemVer once it's out? wdyt?


🏁 Script executed:

#!/bin/bash
# Check if the dev version exists on testpypi
pip index versions airbyte-protocol-models-dataclasses \
  --extra-index-url https://test.pypi.org/simple/ | grep "0.15.0.dev1746621859"

Length of output: 258


Double-check Test PyPI for the dev version
It looks like the pip index check didn’t surface 0.15.0.dev1746621859—let’s hit the Test PyPI JSON API directly to confirm whether this build is actually published (or if we should reconsider pinning to it). wdyt?


🏁 Script executed:

#!/bin/bash
# Verify if the specific dev version exists on Test PyPI’s JSON endpoint
curl -s https://test.pypi.org/pypi/airbyte-protocol-models-dataclasses/json \
  | grep '"0.15.0.dev1746621859"'

Length of output: 65646


Dev build confirmed—0.15.0.dev1746621859 is live and not yanked

  • pyproject.toml (line 33): Test PyPI JSON metadata shows "version":"0.15.0.dev1746621859" and "yanked":false.

Looks safe to pin—would you still like to switch to the first official 0.15.0 release once it’s out? wdyt?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants