Dennis Gera
07/10/2023, 8:00 PMHow should I instantiate a dbt table as an asset in a job?
I currently load my dbt_assets like this
@dbt_assets(
manifest=manifest,
exclude="config.materialized:snapshot",
)
def arado_dbt_assets(context: OpExecutionContext, dbt: DbtCli):
yield from _process_dbt_assets(context=context, dbt=dbt)
I then want to use a dbt table as an input for a job. After reading the documentation on how to do this, I figure I need to have something like this
@job(
name="update_blocked_contacts_from_antifraud",
description="Block contacts tagged as invalid",
)
def update_blocked_contacts_from_antifraud():
contacts_to_block_asset = contacts_to_block.to_source_asset()
block_contacts(contacts_to_block=contacts_to_block_asset)
where contacts_to_block
is an asset of a table in my dbt assets. However, I'm not sure how to define contacts_to_block
as a separate asset. I tried using SourceAsset
instead of to_source_asset()
, but couldn't load metadata from mt dbt_asset to my io_manager.Dennis Gera
07/10/2023, 8:02 PM@dbt_assets
chris
07/10/2023, 8:51 PMdefine_asset_job
chris
07/10/2023, 8:51 PMDennis Gera
07/10/2023, 8:55 PM@op(
name="block_contacts",
description="Update Contacts to be Blocked with Contact ID",
)
def block_contacts(context, contacts_to_block: pd.DataFrame, brain_api: BrainAPI) -> None:
brain_api_client: BrainAPI = brain_api
for ind in contacts_to_block.index:
id_contact = contacts_to_block["id_contact"][ind]
contact_update_data = ContactUpdate(
is_blocked=True,
blocked_reason=BlockedReasonEnum.INVALID_REGISTRATION,
)
brain_api_client.update_contact(id_contact=id_contact, contact_update_data=contact_update_data)
I then have a @job
that runs that @op
. I just need to load my dbt model with the contacts that need to be blocked as an asset. But the @op
does the blocking and there's no resulting assetchris
07/10/2023, 8:55 PMchris
07/10/2023, 8:56 PMDennis Gera
07/10/2023, 8:57 PMcontacts_to_block = arado_dbt_assets.to_source_asset(key=AssetKey(["analytics", "growth", "contacts_to_block"]))
But my io_manager does not read my asset's metadata as metadata from InputContext.Dennis Gera
07/10/2023, 8:58 PM@asset(
key_prefix=["analytics", "growth"],
ins={"contacts_to_block": AssetIn(
key=contacts_to_block_asset_key,
input_manager_key="postgres_pandas_io_manager",
metadata={"dbt_schema": manifest.metadata_by_asset_key.get(contacts_to_block_asset_key).get("dbt_schema")},
)},
io_manager_key="postgres_pandas_io_manager",
)
def contacts_to_block_asset(context, contacts_to_block: pd.DataFrame) -> pd.DataFrame:
return contacts_to_block
Dennis Gera
07/10/2023, 8:58 PM.to_source_asset()
in my jobchris
07/10/2023, 9:09 PMarado_dbt_assets.to_source_asset(key=AssetKey(["analytics", "growth", "contacts_to_block"]))
with respect to providing the input to your job. I wouldn’t be surprised if we just aren’t handling input metadata correctly in this case. So as I understand it, when you’re running your job, and loading your asset in the IO manager, the InputContext
object is not populated with the metadata from your asset?Dennis Gera
07/10/2023, 9:12 PMelif context.metadata.get("dbt_schema"):
schema = context.metadata.get("dbt_schema")
Although my CustomizedDbtManifest
has a node_info_to_metadata
that does this:
metadata["dbt_schema"] = dbt_schema
I still have to declare the assetIn metadata with
metadata={"dbt_schema": manifest.metadata_by_asset_key.get(contacts_to_block_asset_key).get("dbt_schema")}
like you saw me do in my workaround so that my IOManager can retrieve this info from the InputContext metadataDennis Gera
07/10/2023, 9:13 PMarado_dbt_assets.to_source_asset(key=AssetKey(["analytics", "growth", "contacts_to_block"]))
then that metadata returns None
chris
07/10/2023, 9:14 PMDennis Gera
07/10/2023, 9:17 PMchris
07/10/2023, 9:26 PM