https://dagster.io/ logo
#integration-dbt
Title
# integration-dbt
d

Dennis Gera

07/10/2023, 8:00 PM
Hi team!
How should I instantiate a dbt table as an asset in a job?
I currently load my dbt_assets like this
Copy code
@dbt_assets(
    manifest=manifest,
    exclude="config.materialized:snapshot",
)
def arado_dbt_assets(context: OpExecutionContext, dbt: DbtCli):
    yield from _process_dbt_assets(context=context, dbt=dbt)
I then want to use a dbt table as an input for a job. After reading the documentation on how to do this, I figure I need to have something like this
Copy code
@job(
    name="update_blocked_contacts_from_antifraud",
    description="Block contacts tagged as invalid",
)
def update_blocked_contacts_from_antifraud():
    contacts_to_block_asset = contacts_to_block.to_source_asset()
    block_contacts(contacts_to_block=contacts_to_block_asset)
where
contacts_to_block
is an asset of a table in my dbt assets. However, I'm not sure how to define
contacts_to_block
as a separate asset. I tried using
SourceAsset
instead of
to_source_asset()
, but couldn't load metadata from mt dbt_asset to my io_manager.
dagster spin 2
A workaround would be to redefine that dbt table as another asset (either @asset using asset_keys or @dbt_asset using select). But I'd rather not do this given that all my dbt tables are already defined as assets after loading my manifest through
@dbt_assets
c

chris

07/10/2023, 8:51 PM
The documentation you linked is for op-based jobs, but for assets you want to use
define_asset_job
Allows you to specify a selection across assets
d

Dennis Gera

07/10/2023, 8:55 PM
Hi @chris, sorry for not being clear on that. I have an op-based job. It looks something like this:
Copy code
@op(
    name="block_contacts",
    description="Update Contacts to be Blocked with Contact ID",
)
def block_contacts(context, contacts_to_block: pd.DataFrame, brain_api: BrainAPI) -> None:
    brain_api_client: BrainAPI = brain_api

    for ind in contacts_to_block.index:
        id_contact = contacts_to_block["id_contact"][ind]
        contact_update_data = ContactUpdate(
            is_blocked=True,
            blocked_reason=BlockedReasonEnum.INVALID_REGISTRATION,
        )
        brain_api_client.update_contact(id_contact=id_contact, contact_update_data=contact_update_data)
I then have a
@job
that runs that
@op
. I just need to load my dbt model with the contacts that need to be blocked as an asset. But the
@op
does the blocking and there's no resulting asset
c

chris

07/10/2023, 8:55 PM
Ah no sorry I did not read your question clearly enough
let me spin on this for a sec
keanu thanks 1
d

Dennis Gera

07/10/2023, 8:57 PM
fyi, I tried using
Copy code
contacts_to_block = arado_dbt_assets.to_source_asset(key=AssetKey(["analytics", "growth", "contacts_to_block"]))
But my io_manager does not read my asset's metadata as metadata from InputContext.
Here's an example of a workaround I wish I didn't have to make
Copy code
@asset(
    key_prefix=["analytics", "growth"],
    ins={"contacts_to_block": AssetIn(
            key=contacts_to_block_asset_key,            
            input_manager_key="postgres_pandas_io_manager",
            metadata={"dbt_schema": manifest.metadata_by_asset_key.get(contacts_to_block_asset_key).get("dbt_schema")},
        )},
    io_manager_key="postgres_pandas_io_manager",
)
def contacts_to_block_asset(context, contacts_to_block: pd.DataFrame) -> pd.DataFrame:
    return contacts_to_block
I then import that asset and use
.to_source_asset()
in my job
c

chris

07/10/2023, 9:09 PM
I see - so I think you have the right idea here
Copy code
arado_dbt_assets.to_source_asset(key=AssetKey(["analytics", "growth", "contacts_to_block"]))
with respect to providing the input to your job. I wouldn’t be surprised if we just aren’t handling input metadata correctly in this case. So as I understand it, when you’re running your job, and loading your asset in the IO manager, the
InputContext
object is not populated with the metadata from your asset?
d

Dennis Gera

07/10/2023, 9:12 PM
Right, in my IOManager, I do this:
Copy code
elif context.metadata.get("dbt_schema"): 
                schema = context.metadata.get("dbt_schema")
Although my
CustomizedDbtManifest
has a
node_info_to_metadata
that does this:
Copy code
metadata["dbt_schema"] = dbt_schema
I still have to declare the assetIn metadata with
Copy code
metadata={"dbt_schema": manifest.metadata_by_asset_key.get(contacts_to_block_asset_key).get("dbt_schema")}
like you saw me do in my workaround so that my IOManager can retrieve this info from the InputContext metadata
If I use
Copy code
arado_dbt_assets.to_source_asset(key=AssetKey(["analytics", "growth", "contacts_to_block"]))
then that metadata returns
None
c

chris

07/10/2023, 9:14 PM
Gotcha - it’s my hypothesis that that’s actually a bug. Going to file an issue around this
d

Dennis Gera

07/10/2023, 9:17 PM
Ok, thanks @chris!
2 Views