Rasul Kireev
08/24/2022, 8:48 PMdagster
repo which deploys dagit, daemon and psql and connect to user deployments (next 2 bullet points)
• dbt
repo with all the models. I create a user deployment in that repo and push to the same namespace that dagster is deployed to
• dagster-pipelines
repo that has all the actual pipelines that engineers will write. also pushed as a user deployment to the same namespace
Now, the question is, can engineers reference dbt assets in the pipelines they write in the dagster-pipelines
repo? I tried using doing:
from dagster import define_asset_job
dnb_job = define_asset_job("dnb_asset_job", selection="model/dunbradstreet_silver/dnb")
in my dagster_pipelines
repo, knowing that I have an asset with that name, but comes from a different user deployment (image attached). Unfortunately I get the following error, when trying to deploy:
dagster._core.errors.DagsterInvalidDefinitionError: UnresolvedAssetJobDefinition dnb_asset_job specified, but no AssetsDefinitions exist on the repository.
Which makes sense if I can’t access that asset. Has anyone ran into this before? Thanks a ton in advance.chris
08/24/2022, 9:33 PMRasul Kireev
08/25/2022, 5:16 PMdnb_asset = SourceAsset(key=AssetKey("model/dunbradstreet_silver/dnb"))
dnb_job = define_asset_job("dnb_asset_job", selection=AssetSelection.keys("model/dunbradstreet_silver/dnb"))
@repository
def dbt_jobs():
return [dnb_asset, dnb_job]
The dnb_job part seems to be trapping me up.
I know from docs that once I Source an Asset I can’t use the var name, but have to use the asset name (so, not dnb_asset, but “model/dunbradstreet_silver/dnb”). But it is problematic if I want to do something like this:
dnb_asset = SourceAsset(key=AssetKey("model/dunbradstreet_silver/dnb"))
@asset
def my_derived_asset(model/dunbradstreet_silver/dnb):
return model/dunbradstreet_silver/dnb + [4]
Tried a bunch of different things (AssetIn, using @asset(ins={}) , key prefix), but can’t seem to get the correct configuration to create an asset job based on the external asset. Also tried:
dnb_job = define_asset_job("dnb_asset_job", selection="model/dunbradstreet_silver/dnb")
dnb_job = define_asset_job("dnb_asset_job", selection="dnb")
but nothing works.dnb = SourceAsset(key=AssetKey("model/dunbradstreet_silver/dnb"))
@asset(ins={"dnb": AssetIn(key_prefix="model/dunbradstreet_silver")})
def dnb_asset(dnb):
return dnb
Getting: dagster._core.errors.DagsterInvalidDefinitionError: Input asset '["model/dunbradstreet_silver", "dnb"]' for asset '["dnb_asset"]' is not produced by any of the provided asset ops and is not one of the provided sources
Also tried:
@asset(ins={"dnb": SourceAsset(key=AssetKey("model/dunbradstreet_silver/dnb"))})
def dnb_asset(dnb):
return dnb
Got: SourceAsset doesn't have argument key_prefix
chris
08/25/2022, 7:43 PMdefine_asset_job
, you shouldn't be using a selection that tries to execute the source asset, per se.
• In the actual asset, you can map asset keys to the inputs using AssetIn
- you would do something like:
@asset(ins={"dnb": AssetIn(key=AssetKey("model/dunbradstreet_silver/dnb"))})
def dnb_asset(dnb):
return dnb
Then, all you need to do is provide the SourceAsset to the repositoryRasul Kireev
08/29/2022, 2:17 PM