Josh Kutsko
05/12/2023, 12:41 AMhightouch_instance = ht_resource.configured(
{"api_key": os.getenv("HIGHTOUCH_API_KEY")}
)
run_ht_deal_sync_op = hightouch_sync_op.configured(
{"sync_id": HT_DEAL_DATA}, name="platform_deal_sync"
)
@asset(io_manager_key="io_manager", required_resource_keys={"hightouch"})
def hightouch_deal_sync():
run_ht_deal_sync_op()
But I am getting the following error when running the asset through dagster locally: “Compute function of op ‘platform_deal_sync’ has context argument, but no context was provided when invoking.”
If I’m understanding correctly, the hightouch sync op needs the configuration argument to execute, but I thought that the ht_resource.configured() call should set that up already.Josh Kutsko
05/12/2023, 1:03 AMJosh Kutsko
05/12/2023, 1:13 AMLe Yang
05/12/2023, 1:26 AMJosh Kutsko
05/12/2023, 1:28 AMLe Yang
05/12/2023, 1:30 AMJosh Kutsko
05/12/2023, 1:32 AMdefs = Definitions(
assets=load_assets_from_package_module(assets),
resources={
"io_manager": FilesystemIOManager(),
"hightouch": hightouch_instance,
"airbyte": airbyte_instance,
},
)
Zach
05/12/2023, 4:06 PMZach
05/12/2023, 4:07 PMhightouch_sync_op
and put it directly in the hightouch_deal_sync
assetZach
05/12/2023, 4:08 PMJosh Kutsko
05/12/2023, 4:08 PMhightouch_sync_op
method as an op.Josh Kutsko
05/12/2023, 4:09 PMZach
05/12/2023, 4:10 PMJosh Kutsko
05/12/2023, 4:15 PMJosh Kutsko
05/12/2023, 4:16 PMZach
05/12/2023, 4:25 PM@asset
definition is just vanilla python code to carry out whatever task / transformation you're modeling, the same way an op is defined. so a simple asset would be like
@asset
def unique_companies(industry_df: spark.sql.DataFrame):
return [r.company for r in industry_df.select("company").distinct().collect()]
Zach
05/12/2023, 4:26 PMJosh Kutsko
05/12/2023, 4:27 PMZach
05/12/2023, 4:28 PMJosh Kutsko
05/12/2023, 4:29 PMDan Meyer
06/21/2023, 6:06 PMAssetsDefinition.from_op
Dan Meyer
06/21/2023, 6:06 PMJosh Kutsko
06/21/2023, 6:14 PMfrom_op
should technically work, but my approach was to use the @graph
decorator. The code is pretty hacky, really just a way to consolidate my syncs into assets so that the rest of the asset model can be clean.Josh Kutsko
06/21/2023, 6:14 PM@graph_asset(
ins={
"platform_tables": AssetIn(
key=AssetKey(["platform_tables"]),
)
},
)
def platform_export_live(platform_tables):
first_sync = run_ht_first_sync_op(start_after=platform_tables)
return run_ht_second_sync_op(start_after=first_sync)
Josh Kutsko
06/21/2023, 6:15 PMJosh Kutsko
06/21/2023, 6:15 PMDan Meyer
06/21/2023, 6:19 PMreally just a way to consolidate my syncs into assets so that the rest of the asset model can be clean.Awesome - that's really all we need, too 🙂
Bashir Jaji
06/21/2023, 6:21 PMJosh Kutsko
06/21/2023, 6:21 PM