Emilja Dankevičiūtė
12/15/2022, 11:27 AMassets
. Said function is called once in a repository
along with dbt
and airbyte
build functions. All looks ok in Dagit (upstreams resoved, etc). However, when I click materialize before the op
that builds said asset
the parent function is called several times again. Am I doing something wrong? Or is this an expected behavior?
I've looked at airbyte assets and I can see a CacheableAssetsDefinition
instead of AssetsDefinition
used there. Should I just re-implement this for Hightouch? I couldn't find an example that would show how to load assets from external source, only how to explicitly define them in Dagster. But for us at the moment, this would cause a lot of boilerplate, so we're testing other aproaches.Resource
, so I'm asuming the repo reload is an expected behavior. Will test the whole thing out.jamie
12/15/2022, 3:33 PMEmilja Dankevičiūtė
12/16/2022, 10:39 AMben
12/16/2022, 3:32 PMCacheableAssetsDefinition
which defines two methods
• compute_cacheable_data
, which runs at initial repo load time to generate a series of AssetsDefinitionCacheableData
. This is the point at which you would reach out to external APIs and gather the data necessary to build your assets.
• build_definitions
, which transforms the metadata generated earlier into Asset definitions. This is done each subsequent time the repo is loaded (e.g. in a run environment)Emilja Dankevičiūtė
12/16/2022, 3:34 PMben
12/16/2022, 3:34 PMEmilja Dankevičiūtė
12/16/2022, 3:35 PMcompute_cacheable_data
and build_definitions
methods. All looks ok in Dagit, even the dependencies, however, when I try to manually materialize asset, the code tried to materialize a different asset than what Dagit shows. We're trying to look through airbyte integration by ourselves in the meanwhile, but maybe you'll have ideas on what we're doing wrong since you're already familiar with how this should be implemented?
class SupersetAssetsDefinition(CacheableAssetsDefinition):
def __init__(self, dbt_resouce: dbt_cli_resource, superset_resource: SupersetResource) -> None:
self._dbt_resouce = dbt_resouce
self._superset_resource = superset_resource
super().__init__(f"superset-dbt")
# runs at initial repo load time
def compute_cacheable_data(self) -> Sequence[AssetsDefinitionCacheableData]:
<http://self._dbt_resouce.ls|self._dbt_resouce.ls>()
manifest_path = os.path.join(LOCAL_DBT_TARGET_PATH, "manifest.json")
with open(manifest_path, "r", encoding="utf8") as f:
dbt_manifest_json = json.load(f)
return [self._build_definition(node) for node in dbt_manifest_json['nodes'].values()]
# runs each subsequent time the repo is loaded
def build_definitions(self, data: Sequence[AssetsDefinitionCacheableData]) -> Sequence[AssetsDefinition]:
assets = []
for asset_def in data:
asset_name = asset_def.extra_metadata["table"]
dataset = asset_def.extra_metadata["table"]
@asset(name=asset_name, non_argument_deps=asset_def.keys_by_input_name.values(), group_name='superset', compute_kind='superset')
def superset_asset(context):
print(f'create or update {dataset}')
dataset_id = self._create_or_update_dataset(dataset)
self._update_columns(dataset_id, asset_def.extra_metadata['columns'])
assets.append(superset_asset)
...
def _build_definition(self, dbt_node: Dict[str, Any]) -> AssetsDefinitionCacheableData:
group_name = dbt_node['fqn'][1] # the second folder is used as group name by dagster dbt
return AssetsDefinitionCacheableData(
keys_by_input_name={dbt_node['name']: AssetKey(['bi_platform', group_name, dbt_node['name']])},
group_name="superset",
key_prefix="superset",
extra_metadata={"table": dbt_node['name'], 'columns': dbt_node['columns']},
)
What are we missing?- superset_stg_hubspot__deals - STEP_WORKER_STARTING - Launching subprocess for "superset_stg_hubspot__deals"...
But then we see create or update superset_action_log
in what's actually being materialized in other places and api access logs.Saul Burgos
02/28/2023, 8:27 PM