Hi, Do you plan to implement the automaterialize p...
# integration-airbyte
j
Hi, Do you plan to implement the automaterialize policy for Airbyte Cloud?
o
hi @Julien DEBLANDER -- the
load_assets_from_airbyte_instance
(and related) functions support a
connection_to_auto_materialize_policy_fn
, which might be what you're looking for here
j
Is it supported with the latest Dagster version? I'm currently on 1.3.12 and I get the error: "load_assets_from_airbyte_instance is not yet supported for AirbyteCloudResource"
o
Ah sorry about that, I somehow missed that this was referring to Airbyte Cloud. This is not supported at the moment, as the Airbyte Cloud API doesn't currently support all of the endpoints that we use in the integration. What is your current setup with Dagster/Airbyte Cloud?
j
I'm just at the beginning of the implementation of Airbyte Cloud but we are currently fetching data with Airbyte on a schedule. Then I transform this data with dbt.
I guess I should stick to the schedule definition for now 🙂
o
got it, that sounds right to me
s
👋 am interested in setting up AMP for airbyte here too. We are hosting Airbyte ourselves in our own cluster, based on this thread, am i correct to assume
connection_to_auto_materialize_policy_fn
should work with OSS airbyte? I have AMP setup for all my dbt models, and Airbyte assets (defined as dbt
source
for my dbt models to get that nice lineage) via
load_assets_from_airbyte_instance
, but AMP doens’t show up for airbyte assets… since AMP doens’t show up, my dbt models are not being auto-materialized until I manually materialize the Airbyte assets… any pointers appreciated, thanks! (I’m on dagster 1.4.5)
o
Hi! I would expect that to work at the moment -- mind sharing the code you're using to set that up? If all else fails, it's possible to manually add AMPs to assets using something like
Copy code
list_of_assets = [...] # get this from load_assets_from_airbyte_instance
modified_list_of_assets = [
    assets_def.with_attributes(auto_materialize_policy=...) for assets_def in list_of_assets
]
s
Copy code
airbyte_assets = load_assets_from_airbyte_instance(  
      airbyte.airbyte_instance,
      key_prefix=["airbyte"],
      connection_to_group_fn=
          lambda connection_name:
              f'airbyte_{re.sub("[^0-9a-zA-Z_]+", "_", connection_name)}'.lower(),
      connection_filter=lambda connection: connection.name.startswith("Warehouse"),
      connection_to_auto_materialize_policy_fn=lambda _: AutoMaterializePolicy.lazy(),
  )
thanks, here is the init code
o
got it, not sure why that's happening -- but just to help debug, if you try out the
with_attributes
method above, does that resolve the issue?
s
will give that a try and report back, thanks!
just realized that
auto_materialize_policy
isn’t a supported param for
with_attributes
BUT, looks like the following works, now my airbyte jobs are able to auto-materialize accordingly to downstream dbt freshness policy 🎉
Copy code
airbyte_assets = _airbyte_assets.with_attributes_for_all(
     group_name=None,
     freshness_policy=None,
    backfill_policy=None,
     auto_materialize_policy=AutoMaterializePolicy.lazy(),
  )
🌈 1