Hey guys! My team's pretty new to Dagster. We're t...
# ask-community
Hey guys! My team's pretty new to Dagster. We're trying to load assets from an external source (Hightouch, we're using reverse ETL) into Dagster. I wrote a function that calls Hightouch api and maps all data syncs to
. Said function is called once in a
along with
build functions. All looks ok in Dagit (upstreams resoved, etc). However, when I click materialize before the
that builds said
the parent function is called several times again. Am I doing something wrong? Or is this an expected behavior? I've looked at airbyte assets and I can see a
instead of
used there. Should I just re-implement this for Hightouch? I couldn't find an example that would show how to load assets from external source, only how to explicitly define them in Dagster. But for us at the moment, this would cause a lot of boilerplate, so we're testing other aproaches.
It looks like Dagster is reloading a repository when clicking materialize on an asset. I can find references to lazy loading of repositories.
I found that for airbyte, the requests are cached on
, so I'm asuming the repo reload is an expected behavior. Will test the whole thing out.
cc @rex and @ben since you’re familiar with the dbt and airbyte integrations
thanks for cc!
I've tested and for example airbyte assets are not behaving that way - no requests to airbyte api happens even if repository is "reloaded". Still trying to pinpoint why that is.
I think it's because of cacheable assets. I can see some references in this slack group and some pull requests. It's not documented is it something that's still in development or is it supposed to be for internal (dagster core) use only?
Hi Emilja, cacheable assets are a relatively new abstraction that’s mostly useful for e.g. building integrations. We could definitely do a better job of documenting their behavior. Dagster loads the Python repository each time you kick off a run - if we were to reach out to external APIs to gather a list of assets each time we do this, it could flood those APIs with requests and potentially produce inconsistent results. Cacheable assets are a way for us to only make those expensive queries once - on initial repo load. To use them, you can implement
which defines two methods •
, which runs at initial repo load time to generate a series of
. This is the point at which you would reach out to external APIs and gather the data necessary to build your assets. •
, which transforms the metadata generated earlier into Asset definitions. This is done each subsequent time the repo is loaded (e.g. in a run environment)
Hi, Ben. Thanks a lot for the info!
Let me know if that makes sense, happy to provide some more context
It does. I thought it was something like that, just didn't know the exact implementation.
Hey, @ben! We're trying to implement CacheableAssetsDefinition. We've created a dedicated class and implemented
methods. All looks ok in Dagit, even the dependencies, however, when I try to manually materialize asset, the code tried to materialize a different asset than what Dagit shows. We're trying to look through airbyte integration by ourselves in the meanwhile, but maybe you'll have ideas on what we're doing wrong since you're already familiar with how this should be implemented?
Copy code
class SupersetAssetsDefinition(CacheableAssetsDefinition):
    def __init__(self, dbt_resouce: dbt_cli_resource, superset_resource: SupersetResource) -> None:
        self._dbt_resouce = dbt_resouce
        self._superset_resource = superset_resource


    # runs at initial repo load time
    def compute_cacheable_data(self) -> Sequence[AssetsDefinitionCacheableData]:

        manifest_path = os.path.join(LOCAL_DBT_TARGET_PATH, "manifest.json")
        with open(manifest_path, "r", encoding="utf8") as f:
            dbt_manifest_json = json.load(f)
        return [self._build_definition(node) for node in dbt_manifest_json['nodes'].values()]

    # runs each subsequent time the repo is loaded
    def build_definitions(self, data: Sequence[AssetsDefinitionCacheableData]) -> Sequence[AssetsDefinition]:

        assets = []

        for asset_def in data:
            asset_name = asset_def.extra_metadata["table"]
            dataset = asset_def.extra_metadata["table"]

            @asset(name=asset_name, non_argument_deps=asset_def.keys_by_input_name.values(), group_name='superset', compute_kind='superset')
            def superset_asset(context):
                print(f'create or update {dataset}')
                dataset_id = self._create_or_update_dataset(dataset)
                self._update_columns(dataset_id, asset_def.extra_metadata['columns'])
   def _build_definition(self, dbt_node: Dict[str, Any]) -> AssetsDefinitionCacheableData:
        group_name = dbt_node['fqn'][1] # the second folder is used as group name by dagster dbt

        return AssetsDefinitionCacheableData(
            keys_by_input_name={dbt_node['name']: AssetKey(['bi_platform', group_name, dbt_node['name']])},
            extra_metadata={"table": dbt_node['name'], 'columns': dbt_node['columns']},
What are we missing?
The asset name is logged correctly in other cases, for example:
Copy code
- superset_stg_hubspot__deals - STEP_WORKER_STARTING - Launching subprocess for "superset_stg_hubspot__deals"...
But then we see
create or update superset_action_log
in what's actually being materialized in other places and api access logs.
Problem found, nevermind. Was not aware that variables inside for loops aren't scoped to the loop only in python.
silly question. Where are you passing this "SupersetAssetsDefinition" ?? in the repository? How is use?