Joel Olazagasti
05/01/2023, 8:14 PMcompute minute
costs, but my understanding is that the Dagster job is essentially just monitoring the output of Airbyte logs. Is there way to minimize this to avoid 'double dipping' on compute costs? Is the solution just to schedule those long running jobs natively in Airbyte and declaring them as source assets in Dagster?Jermaine Sayson
05/03/2023, 4:31 PMTim Castillo
05/04/2023, 8:06 PMJoel Olazagasti
05/05/2023, 3:54 PM1.3
, but I think I've identified a fairly easy fix. Essentially, starting with the AirbyteManagedReconciler, simply changing the input parameter to an AirbyteResource, and then removing the param_checks looking for a ResourceDef
, and doing the same on this class and this function made the whole thing work with the new Airbyte pythonic resource. I just edited my local install of Dagster to test, and was able to get dagster dev
to run, and the cli tool to deploy config to Airbyte as well. I'm following the contributing instructions to set up a local environment for contributing right now, but I'm a little wary. I'm assuming the tests for this part of the library didn't catch this bug up-front, so I worry my changes might cause some other uncaught regression. Is there anyone more familiar with this part of the code that can help verify my assumptions/changes above?Aiman
05/16/2023, 4:22 AMValueError: Airbyte connections are not in sync with provided configuration
. I run it from my docker compose in virtual machine. Any insight from it? I will share my code as wellJermaine Sayson
05/16/2023, 10:33 AMJermaine Sayson
05/16/2023, 4:33 PMJoel Olazagasti
05/17/2023, 3:47 PMJean Lafleur
05/18/2023, 3:00 PMJoel Olazagasti
05/19/2023, 3:40 PMforward_logs
to False? Is Dagster still aware if the Airbyte sync succeeds or fails? Does it just mean I have to go introspect the error in Airbyte itself if it fails?Joel Olazagasti
05/19/2023, 7:07 PMAlejandro Henao Ruiz
05/29/2023, 2:45 PMairbyte_assets = load_assets_from_airbyte_instance(airbyte_instance)
Additionally, I have defined another asset that executes a stored procedure in SQL SERVER:
@asset(
group_name="sql_server_assets",
retry_policy=RetryPolicy(max_retries=5, delay=60),
)
def sp_sql_server():
# - Credenciales
server = 'xxx'
database = 'xxx'
username = 'xxx'
password = 'xxx'
# - Cadena de conexión
connection_string = f"DRIVER={{ODBC Driver 18 for SQL Server}};SERVER={server};DATABASE={database};UID={username};PWD={password}"
with pyodbc.connect(connection_string) as connection:
connection.execute("EXEC dbo.InsertTimestamp")
return None
My problem lies in the need to define dependencies between assets, that is, I want my sp_sql_server
asset to be executed first and then, through a dependency, a specific asset that is loaded from airbyte is executed, how can I do this?
I reviewed the example by benpankow @ben that is on github and there I see a way to define this dependency but in this example it is done the other way around, that is, first the airbyte asset is executed and then the other assets that are defined under the "non_argument_deps"
configuration " "depends_on"
. how to achieve something similar but in reverse way?
I thank you in advance for any help.Ashish Bansal
06/05/2023, 7:20 PMmysql_airbyte_assets = build_airbyte_assets(connection_id=connection_id, destination_tables=["table"])
I have an airbyte connection to do MySQL -> S3 sync. If I materialize above asset, I want to know S3 object path. How do I retrieve that in dagster?Ashish Bansal
06/06/2023, 1:02 AMmysql_airbyte_assets = build_airbyte_assets(connection_id=connection_id, destination_tables=["foobar"])
foobar = mysql_airbyte_assets[0]
@asset(ins={"foobar": AssetIn("foobar")})
def save_to_starrocks(context, foobar):
print("foobar", foobar, context.__dict__)
return False
Above snippet prints -
foobar None {'_pdb': None, '_events': [], '_output_metadata': {}}
I checked that AirByte materialization is working fine, but can anyone share hints on why foobar
asset is None here? ^Luke Dixon
06/16/2023, 12:33 PM2023-06-16 13:34:42 +0100 - dagster - ERROR - my_upstream_job - 55a4ed2c-611b-4ccf-a52e-95e607c292aa - airbyte_sync_2db70 - Request to Airbyte API failed: 502 Server Error: Bad Gateway for url: <http://localhost:8000/api/v1/jobs/cancel>
2023-06-16 13:34:42 +0100 - dagster - ERROR - my_upstream_job - 55a4ed2c-611b-4ccf-a52e-95e607c292aa - 6099 - airbyte_sync_2db70 - STEP_FAILURE - Execution of step "airbyte_sync_2db70" failed.
dagster._core.definitions.events.Failure: Max retries (3) exceeded with url: <http://localhost:8000/api/v1/jobs/cancel>.
Johannes Müller
07/01/2023, 8:23 AMEdo
07/15/2023, 4:14 AMssl=False
, but it's still required me to give ssl_mode
. I've tried ssl_mode="Preferred"
and ssl_mode=MysqlSource.Preferred
with no luck. Can anyone help? Thanks
dagster._check.ParameterCheckError: Param "ssl_mode" is not one of ['Preferred', 'Required', 'VerifyCA', 'VerifyIdentity']. Got <class 'dagster_airbyte.managed.generated.sources.MysqlSource.Preferred'> which is type <class 'type'>.
Fidocia Adityawarman
07/16/2023, 4:32 PMJulien DEBLANDER
08/01/2023, 4:19 PMJean Lafleur
08/03/2023, 7:27 PMPierre SAMAILLE
08/07/2023, 4:20 PMNicolas Guary
08/07/2023, 4:33 PMbuild_airbyte_assets
does not accept an io_manager_key
parameter while load_assets_from_airbyte_project
does ? (https://docs.dagster.io/_apidocs/libraries/dagster-airbyte#dagster_airbyte.build_airbyte_assets)Joel Olazagasti
08/14/2023, 4:53 PM1.4.1
to 1.4.5
and I am getting the error message
dagster._core.errors.DagsterInvalidDefinitionError: Conflicting versions of resource with key 'airbyte' were provided to different assets. When constructing a job, all resource definitions provided to assets must match by reference equality for a given key.
I'm passing the same instance of AirbyteResource
to all of my load_assets_from_airbyte_instance
calls. Does anyone have any insight here?Brent Shulman
08/16/2023, 3:27 PMload_assets_from_airbyte_instance
by the actually connection id?Christian Hollinger
08/17/2023, 2:17 PM$NAME/
(note the /
), so the stream name winds up being something like e2e/data
. When I use construct_airbyte_assets
and set _`destination_tables`_ as just data
, it'll fail with DagsterStepOutputNotFoundError
. If set destination_tables
to e2e/data
, it'll fail with "e2e/data" is not a valid name in Dagster. Names must be in regex ^[A-Za-z0-9_]+$
. The _`asset_key_prefix`_ only prefixes the asset w/in Dagster, but still doesn't find any valid output from the source.
What am I missing - is there not a way to map the source Airbyte stream name into a valid Dagster asset name?Shangwei Wang
08/22/2023, 9:18 PMload_assets_from_airbyte_instance
). What would be nice if i dont have to define them by hand one by one… Thanks!Aatish Master
08/30/2023, 1:28 AMbuild_airbyte_assets
. The airbyte connection extracts data from snowflake and outputs a csv file into an azure blob container.
The pipeline is working, in that it outputs the file, however the airbyte asset fails with the following error:
op 'airbyte_sync_820ee' did not fire output {bsv}
dagster._core.errors.DagsterStepOutputNotFoundError: Core compute for op "airbyte_sync_820ee" did not return an output for non-optional output "bsv"
Any ideas on how to resolve this? how could I configure this asset to not require an output? ThanksDang
09/07/2023, 9:59 AMdagster._check.ParameterCheckError: Param "data_source" is not one of ['AmazonS3', 'AzureBlobStorage']. Got 'MANAGED_TABLES_STORAGE' which is type <class 'str'>.
Dipesh Kumar
09/12/2023, 5:37 AMdagster._core.errors.DagsterInvalidDefinitionError: Duplicate asset key: AssetKey(['table_key'])
Is there a way to differentiate between the the 2 different asset keys between the 2 db's?
maybe added a tag or pull the connection or source name as part of the asset key?Nikolaos Dimitriadis
09/13/2023, 4:02 PMload_assets_from_airbyte_instance
with normalisation on airbyte being set to `off`:
• the loaded assets come with names like {the_prefix_I've_set}/{airbyte_stream_name}
.
• The tables that are created on BigQuery have names like {the_prefix_I've_set}._airbyte_raw_{airbyte_stream_name}
• The problem is that I struggle to create proper data lineage and dependencies when I later use DBT on a table named {the_prefix_I've_set}._airbyte_raw_{airbyte_stream_name}
that doesn’t exist for Dagster.
Is there any way for the assets to follow the names of the created from airbyte tables when automatically loaded? Any other workarounds?