-
Notifications
You must be signed in to change notification settings - Fork 19
feat: destination discover PoC and adding sync modes #527
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
@@ -72,7 +72,11 @@ def _to_mapping( | |||
elif isinstance(body, bytes): | |||
return json.loads(body.decode()) # type: ignore # assumes return type of Mapping[str, Any] | |||
elif isinstance(body, str): | |||
return json.loads(body) # type: ignore # assumes return type of Mapping[str, Any] | |||
try: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without this addition, depending on the order of evaluation, test test_given_on_match_is_mapping_but_not_input_when_matches_then_return_false
would fail
/autofix
|
📝 WalkthroughWalkthroughThis update introduces a "discover" command to the destination CLI, adds an inferrer mechanism and configurability for additional properties in dynamic schema loading, and enhances test coverage for these features. Error handling for non-JSON bodies in HTTP request testing is improved, and builder/test utilities are extended for destination sync modes and schema property scenarios. Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant CLI
participant Destination
participant discover()
participant AirbyteCatalog
User->>CLI: Run "discover" command with --config
CLI->>Destination: parse_args("discover", --config)
CLI->>Destination: run_cmd(parsed_args)
Destination->>discover(): discover()
discover()-->>Destination: AirbyteCatalog
Destination->>CLI: yield AirbyteMessage(type=CATALOG, catalog)
CLI->>User: Output catalog message
sequenceDiagram
participant Loader as DynamicSchemaLoader
participant Inferrer as AdditionalPropertyFieldsInferrer
Loader->>Loader: get_json_schema()
alt additional_property_fields_inferrer is set
Loader->>Inferrer: infer(property_definition)
Inferrer-->>Loader: additional_fields
Loader->>Loader: Merge additional_fields into property_definition
end
Loader->>Loader: Set "additionalProperties" per allow_additional_properties
Loader-->>Caller: Return JSON schema
Suggested labels
Suggested reviewers
Would you like to add more documentation or usage examples for the new "discover" command and schema inferrer, wdyt? Tip ⚡️ Faster reviews with caching
Enjoy the performance boost—your workflow just got faster. 📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
⏰ Context from checks skipped due to timeout of 90000ms (4)
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (6)
airbyte_cdk/test/catalog_builder.py (1)
37-41
: Add minimal doc-string and type hint to the new builder helper?
with_destination_sync_mode()
is handy! For consistency withwith_sync_mode()
, would you like to:
- add a short doc-string describing the param, and
- return
self
with an explicit-> "ConfiguredAirbyteStreamBuilder"
(already present but worth keeping)?Small thing, but it keeps the fluent API self-documenting – wdyt?
airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py (2)
185-191
: Potential key override when merging inferred fields
value.update(extra_fields)
will silently overwrite keys such as"type"
if an inferrer returns a colliding key.
Should we detect collisions and either raise or log a warning to avoid hard-to-debug schema issues? Something like:for k in extra_fields: if k in value: logger.warning("Overwriting %s in inferred property fields", k) value.update(extra_fields)Would that be helpful, wdyt?
295-303
: Minor typing nit – remove thetype: ignore
?Since
_get_airbyte_type
now returnsDict[str, Any]
, the mypy ignore on thedeepcopy
return may no longer be needed.
Could we try dropping the# type: ignore
and see if the type checker is happy? wdyt?airbyte_cdk/destinations/destination.py (2)
30-90
: Duplicated CLI parser – reuse existing helper to avoid drift?The new
parse_args()
is almost identical to the one inairbyte_cdk/entrypoint.py
.
Would it make sense to extract a shared helper (e.g.,cdk.cli_utils.parse_destination_args
) so that future flag changes don’t need to be duplicated in two places? wdyt?
138-141
: Static wrapper seems redundantSince the free-function
parse_args
already exists in the module, the@staticmethod
wrapper could be removed and callers can import the function directly, reducing one level of indirection.
Is the extra layer intentional for API stability, or could we drop it, wdyt?unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py (1)
426-429
: Well-structured mock helper function.This utility function elegantly creates a mock
Retriever
that returns the specified HTTP response body, making the new tests cleaner and more readable. What do you think about adding a docstring to clarify its purpose for future contributors? wdyt?def _mock_schema_loader_retriever(http_response_body) -> Retriever: + """ + Create a mock Retriever that returns the specified HTTP response body. + + Args: + http_response_body: The HTTP response body to be returned by the retriever + + Returns: + A mock Retriever instance + """ retriever = Mock(spec=Retriever) retriever.read_records.return_value = iter([http_response_body]) return retriever
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
airbyte_cdk/destinations/destination.py
(4 hunks)airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py
(8 hunks)airbyte_cdk/test/catalog_builder.py
(2 hunks)airbyte_cdk/test/mock_http/request.py
(1 hunks)unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py
(3 hunks)unit_tests/test/mock_http/test_request.py
(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (3)
airbyte_cdk/test/mock_http/request.py (1)
airbyte_cdk/test/mock_http/response.py (1)
body
(19-20)
airbyte_cdk/destinations/destination.py (6)
airbyte_cdk/entrypoint.py (2)
parse_args
(65-118)discover
(224-233)airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
discover
(175-181)unit_tests/test_entrypoint.py (1)
discover
(55-56)airbyte_cdk/sources/abstract_source.py (1)
discover
(85-90)unit_tests/sources/test_source.py (2)
discover
(47-48)catalog
(70-93)airbyte_cdk/models/airbyte_protocol.py (1)
AirbyteMessage
(79-88)
airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py (3)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
ComplexFieldType
(806-808)airbyte_cdk/sources/source.py (1)
ExperimentalClassWarning
(24-25)unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py (1)
infer
(436-437)
⏰ Context from checks skipped due to timeout of 90000ms (5)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (12)
airbyte_cdk/test/mock_http/request.py (1)
75-79
: Excellent error handling improvement!This is a great addition for making the code more robust by handling JSON decode errors gracefully. Instead of raising an exception when a string isn't valid JSON, returning
None
is appropriate since the comparison should fail anyway when one body is a mapping and another isn't. This change helps prevent test failures that would happen depending on the order of evaluation.The comment you included also explains the reasoning clearly, which is helpful for future maintainers.
unit_tests/test/mock_http/test_request.py (1)
160-165
: Good test case addition!This test effectively verifies that when one request has a mapping body and the other has a non-JSON string body, the
matches
method correctly returnsFalse
. It's a nice complement to the error handling added in the_to_mapping
method.It's worth noting that without the error handling added to the
_to_mapping
method, this test could fail depending on which request is evaluated first when checking for a match. Great job on covering this edge case!airbyte_cdk/test/catalog_builder.py (1)
5-6
: Import looks good – just a quick check on dependency boundariesNice addition 👍. Just make sure
DestinationSyncMode
is available in all test environments (some downstream projects run the test helpers without the full protocol package).
Maybe add a defensivetry / except ImportError
around the import, or document the extra requirement, wdyt?airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py (2)
160-162
: Default forallow_additional_properties
– safe by default?The new flag defaults to
True
. For many connectors we’d like schemas to be strict unless the author explicitly opts-in.
Have we considered defaulting toFalse
to follow “secure by default” principles, or at least logging a warning when it’sTrue
, wdyt?
196-200
: Great touch exposingadditionalProperties
Just a 👍 that wiring this flag straight into the output schema is elegant and keeps the JSON Schema minimal.
airbyte_cdk/destinations/destination.py (1)
164-166
: Forwardconfig
todiscover()
for symmetryIf we adopt the earlier suggestion to pass
config
, we’d need:- yield AirbyteMessage(type=Type.CATALOG, catalog=self.discover()) + yield AirbyteMessage(type=Type.CATALOG, catalog=self.discover(config=config))Keeps runtime behaviour consistent – wdyt?
unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py (6)
7-8
: Imports properly updated for the new functionality.The additional imports make sense for supporting the new test features related to type handling in additional properties and mocking. Good job keeping imports organized.
18-26
: Comprehensive imports for the new schema testing capabilities.Clear organization of imports to support the new testing functionality. The explicit import of
AdditionalPropertyFieldsInferrer
makes the code's intent easier to follow.
32-32
: Good use of a constant for parameters.Creating a consistent
_ANY_PARAMETERS
constant helps maintain uniformity across tests. Nice touch!
432-437
: Well-implemented test inferrer class.This test implementation of
AdditionalPropertyFieldsInferrer
is clean and focused - it returns predefined properties which simplifies testing. The interface is clear and follows good practices.
476-507
: Thorough test for additional properties configuration.The test clearly demonstrates that when
allow_additional_properties=False
and no inferrer is provided, the schema correctly setsadditionalProperties: False
while maintaining the expected property definitions.The setup is clean with good separation of concerns between this test and the previous one, allowing for focused testing of each feature.
440-473
:✅ Verification successful
Comprehensive test for property field inferrer functionality.
This test effectively verifies that additional property fields are correctly merged into the schema and that
additionalProperties
is set toFalse
when configured. The use of dictionary union operator on line 447 is clean and modern.Note: The dictionary merge operator (
|
) requires Python 3.9+. Is this compatibility requirement intentional and documented elsewhere in the project? wdyt?
🏁 Script executed:
#!/bin/bash # Check the required Python version in setup.py or similar files echo "Checking Python version requirements..." cat setup.py | grep -E "python_requires|python_version"Length of output: 187
Let’s check if the project actually pins Python 3.9+ in its config files (e.g.
pyproject.toml
,setup.cfg
,tox.ini
):
🏁 Script executed:
#!/bin/bash # Searching for Python version constraints in project config files echo "Searching for 'python_requires' or 'requires-python' across the repository..." rg -n 'python_requires|requires-python' . echo "Searching for version specifiers '>=3.' across the repository..." rg -n '>= *3\.[0-9]+' .Length of output: 21953
Python 3.9+ support is already guaranteed
The project’s pyproject.toml specifies
python = ">=3.10,<3.13"
, so using the|
operator is fully supported and intentional. No further changes needed here.• See pyproject.toml (line 32):
python = ">=3.10,<3.13"
@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) | ||
class AdditionalPropertyFieldsInferrer(ABC): | ||
""" | ||
Infers additional fields to be added to each property. For example, if this inferrer returns {"toto": "tata"}, a property that would have looked like this: | ||
``` | ||
"properties": { | ||
"Id": { | ||
"type": ["null", "string"], | ||
}, | ||
<...> | ||
} | ||
``` | ||
... will look like this: | ||
``` | ||
"properties": { | ||
"Id": { | ||
"type": ["null", "string"], | ||
"toto": "tata" | ||
}, | ||
<...> | ||
} | ||
``` | ||
""" | ||
|
||
@abstractmethod | ||
def infer(self, property_definition: MutableMapping[str, Any]) -> MutableMapping[str, Any]: | ||
""" | ||
Infers additional property fields from the given property definition. | ||
""" | ||
pass | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider making the inferrer contract read-only
infer()
receives a mutable property_definition
. Accidental in-place edits by an inferrer could leak into the final schema.
Would switching the parameter type from MutableMapping
to Mapping
(and passing a copy) help protect against that?
- def infer(self, property_definition: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
+ def infer(self, property_definition: Mapping[str, Any]) -> Mapping[str, Any]:
That way implementers must return a new dict, reducing side-effects – wdyt?
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) | |
class AdditionalPropertyFieldsInferrer(ABC): | |
""" | |
Infers additional fields to be added to each property. For example, if this inferrer returns {"toto": "tata"}, a property that would have looked like this: | |
``` | |
"properties": { | |
"Id": { | |
"type": ["null", "string"], | |
}, | |
<...> | |
} | |
``` | |
... will look like this: | |
``` | |
"properties": { | |
"Id": { | |
"type": ["null", "string"], | |
"toto": "tata" | |
}, | |
<...> | |
} | |
``` | |
""" | |
@abstractmethod | |
def infer(self, property_definition: MutableMapping[str, Any]) -> MutableMapping[str, Any]: | |
""" | |
Infers additional property fields from the given property definition. | |
""" | |
pass | |
@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) | |
class AdditionalPropertyFieldsInferrer(ABC): | |
""" | |
Infers additional fields to be added to each property. For example, if this inferrer returns {"toto": "tata"}, a property that would have looked like this: |
@staticmethod | ||
def parse_args(args: List[str]) -> argparse.Namespace: | ||
return parse_args(args) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In terms of patterns, I think my preference here and proposed best best practice (eventually) is that we make this a class method on the base connector class. I would call it "launch" and then every connector class could be able to invoke itself. In theory, all sources and destinations could share the same implementation across connectors of the same type, and unless the connector needs something special (a code smell anyway), the "cls" input arg should be all you need in order to instantiate a connector using CLI args
I don't think we need to tackle all the scope here in this PR, but your implementation is already very close to what I think is the ideal state, so I'll mention it here as a non-blocking proposal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense to me. I started diverging a bit from the current solution because the run_cmd
it is not static so we need to instantiate the object before calling it but the way we instantiate the destination depends on how the method of the protocal that is called (see this piece of code for an example).
So do we agree that the launch
method would be static? If so, I think we are moving the right direction. I can start adding this in later changes. If not, I'll need more information about what you had in mind.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maxi297 - Difference b/w static method and class method for this use case is just the a class method knows what class it is (static methods don't get a cls
input), and that class method implementations can be inherited by subclasses. So, yes, agreed method is static in a sense that it doesn't need the object, but in order to not need to implement on each class, I think making a class method is slightly cleaner in the long run. Sorry if I'm not good at explaining this, and please don't block on my comment - either way, it's a step in the right direction, I think, if the class knows how to instantiate itself, without needing an external class or function in order to be instantiated. 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maxi297 - Here's the early implementation I wrote for S3 a while back...
@classmethod
def launch(cls, args: list[str] | None = None) -> None:
"""Launch the source using the provided CLI args.
If no args are provided, the launch args will be inferred automatically.
In the future, we should consider moving this method to the Connector base class,
so that all sources and destinations can launch themselves and so none of this
code needs to live in the connector itself.
"""
args = args or sys.argv[1:]
catalog_path = AirbyteEntrypoint.extract_catalog(args)
# TODO: Delete if not needed:
# config_path = AirbyteEntrypoint.extract_config(args)
# state_path = AirbyteEntrypoint.extract_state(args)
source = cls.create(
configured_catalog_path=Path(catalog_path) if catalog_path else None,
)
# The following function will wrap the execution in proper error handling.
# Failures prior to here may not emit proper Airbyte TRACE or CONNECTION_STATUS messages.
launch(
source=source,
args=args,
)
This wasn't a very good implementation, but you can see the generic cls.create()
ref.
Where cls.create()
was defined as:
@classmethod
def create(
cls,
*,
configured_catalog_path: Path | str | None = None,
) -> SourceS3:
"""Create a new instance of the source.
...
# A bunch of hacky stuff here.
...
return cls(
# These are the defaults for the source. No need for a caller to change them:
stream_reader=SourceS3StreamReader(),
spec_class=Config,
cursor_cls=Cursor,
# This is needed early. (We also will provide it again later.)
catalog=configured_catalog,
# These will be provided later, after we have wrapped proper error handling.
config=None,
state=None,
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another problem with our current design is that we ask for config, catalog, etc in the constructor, but then pass it again during invocation of 'check', 'discover', etc. I think it would be better to not need these in the constructor at all, in which case we greatly simplify the process of creating a connector class, and we put all code that can fail in a code path that can properly message about any failures.
Again - hopefully this is helpful for long-term thinking, but I don't think it needs to block the current PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
classmethod makes sense, yes!
And I would prefer to have these as part of the constructor personally as if we don't, each method will need to instantiate the object it needs every time is called or have some kind on intelligent accessor which would check if the field has already been instantiated and if not instantiate it. It feels complex instead of just instantiating the connector properly in launch
. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🔭 Outside diff range comments (1)
unit_tests/destinations/test_destination.py (1)
1-1
:⚠️ Potential issueFix Ruff formatting issues
There's a pipeline failure indicating Ruff formatting issues. Could you run
ruff format
to fix the code style issues?#!/bin/bash # Run Ruff formatter to fix code style issues ruff format unit_tests/destinations/test_destination.py🧰 Tools
🪛 GitHub Actions: Linters
[error] 1-1: Ruff formatting check failed. File would be reformatted. Run 'ruff format' to fix code style issues.
🧹 Nitpick comments (1)
unit_tests/destinations/test_destination.py (1)
49-54
: Consider testing the debug flag when explicitly set to TrueThe test cases are verifying the default
debug=False
behavior, but there's no test case that verifies what happens whendebug=True
is explicitly set. Would it be helpful to add a test case like this to verify the flag works correctly when enabled, wdyt?[ (["spec"], {"command": "spec", "debug": False}), + (["spec", "--debug"], {"command": "spec", "debug": True}), (["check", "--config", "bogus_path/"], {"command": "check", "config": "bogus_path/", "debug": False}), ( ["write", "--config", "config_path1", "--catalog", "catalog_path1"], {"command": "write", "config": "config_path1", "catalog": "catalog_path1", "debug": False}, ), ],
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
unit_tests/destinations/test_destination.py
(1 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
unit_tests/destinations/test_destination.py
[error] 1-1: Ruff formatting check failed. File would be reformatted. Run 'ruff format' to fix code style issues.
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Analyze (python)
(["spec"], {"command": "spec", "debug": False}), | ||
(["check", "--config", "bogus_path/"], {"command": "check", "config": "bogus_path/", "debug": False}), | ||
( | ||
["write", "--config", "config_path1", "--catalog", "catalog_path1"], | ||
{"command": "write", "config": "config_path1", "catalog": "catalog_path1"}, | ||
{"command": "write", "config": "config_path1", "catalog": "catalog_path1", "debug": False}, | ||
), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Test coverage gap for the new "discover" command
I notice the test parameterization has been updated to include "debug": False
in the expected outputs, which aligns with your new debug logging functionality. However, I don't see any test cases for the new discover
command mentioned in the PR summary. Should we add a test case for this new functionality, wdyt?
[
(["spec"], {"command": "spec", "debug": False}),
(["check", "--config", "bogus_path/"], {"command": "check", "config": "bogus_path/", "debug": False}),
(
["write", "--config", "config_path1", "--catalog", "catalog_path1"],
{"command": "write", "config": "config_path1", "catalog": "catalog_path1", "debug": False},
),
+ (
+ ["discover", "--config", "config_path1"],
+ {"command": "discover", "config": "config_path1", "debug": False},
+ ),
],
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
(["spec"], {"command": "spec", "debug": False}), | |
(["check", "--config", "bogus_path/"], {"command": "check", "config": "bogus_path/", "debug": False}), | |
( | |
["write", "--config", "config_path1", "--catalog", "catalog_path1"], | |
{"command": "write", "config": "config_path1", "catalog": "catalog_path1"}, | |
{"command": "write", "config": "config_path1", "catalog": "catalog_path1", "debug": False}, | |
), | |
[ | |
(["spec"], {"command": "spec", "debug": False}), | |
(["check", "--config", "bogus_path/"], {"command": "check", "config": "bogus_path/", "debug": False}), | |
( | |
["write", "--config", "config_path1", "--catalog", "catalog_path1"], | |
{"command": "write", "config": "config_path1", "catalog": "catalog_path1", "debug": False}, | |
), | |
( | |
["discover", "--config", "config_path1"], | |
{"command": "discover", "config": "config_path1", "debug": False}, | |
), | |
], |
|
||
return { | ||
"$schema": "https://json-schema.org/draft-07/schema#", | ||
"type": "object", | ||
"additionalProperties": True, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💎 Nice. Appreciate that we're following JSON Schema standards for communicating this. 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maxi297 - I have reviewed all the code here and it looks good. 👍
Thanks for responding to my comments - and again, nothing blocking there.
What I didn't find was a definition of DestinationSyncMode
or any changes to sync modes. I did want to review those - can you point me to them?
@staticmethod | ||
def parse_args(args: List[str]) -> argparse.Namespace: | ||
return parse_args(args) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maxi297 - Here's the early implementation I wrote for S3 a while back...
@classmethod
def launch(cls, args: list[str] | None = None) -> None:
"""Launch the source using the provided CLI args.
If no args are provided, the launch args will be inferred automatically.
In the future, we should consider moving this method to the Connector base class,
so that all sources and destinations can launch themselves and so none of this
code needs to live in the connector itself.
"""
args = args or sys.argv[1:]
catalog_path = AirbyteEntrypoint.extract_catalog(args)
# TODO: Delete if not needed:
# config_path = AirbyteEntrypoint.extract_config(args)
# state_path = AirbyteEntrypoint.extract_state(args)
source = cls.create(
configured_catalog_path=Path(catalog_path) if catalog_path else None,
)
# The following function will wrap the execution in proper error handling.
# Failures prior to here may not emit proper Airbyte TRACE or CONNECTION_STATUS messages.
launch(
source=source,
args=args,
)
This wasn't a very good implementation, but you can see the generic cls.create()
ref.
Where cls.create()
was defined as:
@classmethod
def create(
cls,
*,
configured_catalog_path: Path | str | None = None,
) -> SourceS3:
"""Create a new instance of the source.
...
# A bunch of hacky stuff here.
...
return cls(
# These are the defaults for the source. No need for a caller to change them:
stream_reader=SourceS3StreamReader(),
spec_class=Config,
cursor_cls=Cursor,
# This is needed early. (We also will provide it again later.)
catalog=configured_catalog,
# These will be provided later, after we have wrapped proper error handling.
config=None,
state=None,
)
@staticmethod | ||
def parse_args(args: List[str]) -> argparse.Namespace: | ||
return parse_args(args) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another problem with our current design is that we ask for config, catalog, etc in the constructor, but then pass it again during invocation of 'check', 'discover', etc. I think it would be better to not need these in the constructor at all, in which case we greatly simplify the process of creating a connector class, and we put all code that can fail in a code path that can properly message about any failures.
Again - hopefully this is helpful for long-term thinking, but I don't think it needs to block the current PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
pyproject.toml (1)
94-98
: Document Custom Source Rationale
Thetestpypi
supplemental source is added solely to fetch the dev protocol-models-dataclasses package. Could you add a brief comment above this block explaining that it's temporary until the next official release? This will help future maintainers understand its purpose. wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
poetry.lock
is excluded by!**/*.lock
📒 Files selected for processing (4)
airbyte_cdk/destinations/destination.py
(4 hunks)airbyte_cdk/models/__init__.py
(1 hunks)airbyte_cdk/test/catalog_builder.py
(3 hunks)pyproject.toml
(2 hunks)
✅ Files skipped from review due to trivial changes (1)
- airbyte_cdk/models/init.py
🚧 Files skipped from review as they are similar to previous changes (2)
- airbyte_cdk/test/catalog_builder.py
- airbyte_cdk/destinations/destination.py
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: preview_docs
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Analyze (python)
🔇 Additional comments (1)
pyproject.toml (1)
33-33
:✅ Verification successful
Verify Dev Pre-release Version Usage
You've pinnedairbyte-protocol-models-dataclasses
to a specific dev build (0.15.0.dev1746621859
) from thetestpypi
index. Would you consider confirming that this version is stable and available (and that it won't get yanked), or switching to a released SemVer once it's out? wdyt?
🏁 Script executed:
#!/bin/bash # Check if the dev version exists on testpypi pip index versions airbyte-protocol-models-dataclasses \ --extra-index-url https://test.pypi.org/simple/ | grep "0.15.0.dev1746621859"Length of output: 258
Double-check Test PyPI for the dev version
It looks like thepip index
check didn’t surface0.15.0.dev1746621859
—let’s hit the Test PyPI JSON API directly to confirm whether this build is actually published (or if we should reconsider pinning to it). wdyt?
🏁 Script executed:
#!/bin/bash # Verify if the specific dev version exists on Test PyPI’s JSON endpoint curl -s https://test.pypi.org/pypi/airbyte-protocol-models-dataclasses/json \ | grep '"0.15.0.dev1746621859"'Length of output: 65646
Dev build confirmed—0.15.0.dev1746621859 is live and not yanked
- pyproject.toml (line 33): Test PyPI JSON metadata shows
"version":"0.15.0.dev1746621859"
and"yanked":false
.Looks safe to pin—would you still like to switch to the first official 0.15.0 release once it’s out? wdyt?
What
This PR contains various changes needed for DA. The changes are:
parse_args
without having to instantiate a destination object (I also added support to debug log while doing this)allow_additional_properties == False
type
within each property objectsFor more context, the following PRs can be useful:
Summary by CodeRabbit