TL;DR Can I reference assets which are created in ...
# ask-community
r
TL;DR Can I reference assets which are created in a different user deployment from the one that I write the pipeline in? --- Is there a way for user deployments to talk to each other when deployed on k8s? For better context, here is what I’m doing now: • have
dagster
repo which deploys dagit, daemon and psql and connect to user deployments (next 2 bullet points) •
dbt
repo with all the models. I create a user deployment in that repo and push to the same namespace that dagster is deployed to •
dagster-pipelines
repo that has all the actual pipelines that engineers will write. also pushed as a user deployment to the same namespace Now, the question is, can engineers reference dbt assets in the pipelines they write in the
dagster-pipelines
repo? I tried using doing:
Copy code
from dagster import define_asset_job

dnb_job = define_asset_job("dnb_asset_job", selection="model/dunbradstreet_silver/dnb")
in my
dagster_pipelines
repo, knowing that I have an asset with that name, but comes from a different user deployment (image attached). Unfortunately I get the following error, when trying to deploy:
Copy code
dagster._core.errors.DagsterInvalidDefinitionError: UnresolvedAssetJobDefinition dnb_asset_job specified, but no AssetsDefinitions exist on the repository.
Which makes sense if I can’t access that asset. Has anyone ran into this before? Thanks a ton in advance.
c
SourceAssets is designed for cross repo dependencies such as this: https://docs.dagster.io/concepts/assets/software-defined-assets#defining-external-asset-dependencies
r
Thank you @chris, found this yesterday on the main “Assets” docs page facepalm , was right there in front of me. Now having issues with using the loaded asset to to the prefix nature of it.
Copy code
dnb_asset = SourceAsset(key=AssetKey("model/dunbradstreet_silver/dnb"))

dnb_job = define_asset_job("dnb_asset_job", selection=AssetSelection.keys("model/dunbradstreet_silver/dnb"))

@repository
def dbt_jobs():
    return [dnb_asset, dnb_job]
The dnb_job part seems to be trapping me up. I know from docs that once I Source an Asset I can’t use the var name, but have to use the asset name (so, not dnb_asset, but “model/dunbradstreet_silver/dnb”). But it is problematic if I want to do something like this:
Copy code
dnb_asset = SourceAsset(key=AssetKey("model/dunbradstreet_silver/dnb"))

@asset
def my_derived_asset(model/dunbradstreet_silver/dnb):
    return model/dunbradstreet_silver/dnb + [4]
Tried a bunch of different things (AssetIn, using @asset(ins={}) , key prefix), but can’t seem to get the correct configuration to create an asset job based on the external asset. Also tried:
Copy code
dnb_job = define_asset_job("dnb_asset_job", selection="model/dunbradstreet_silver/dnb")
dnb_job = define_asset_job("dnb_asset_job", selection="dnb")
but nothing works.
Not sure how to best reference the loaded asset (loaded via SourceAsset, that has multiple levels, prefixes).
Also tried:
Copy code
dnb = SourceAsset(key=AssetKey("model/dunbradstreet_silver/dnb"))

@asset(ins={"dnb": AssetIn(key_prefix="model/dunbradstreet_silver")})
def dnb_asset(dnb):
    return dnb

Getting: dagster._core.errors.DagsterInvalidDefinitionError: Input asset '["model/dunbradstreet_silver", "dnb"]' for asset '["dnb_asset"]' is not produced by any of the provided asset ops and is not one of the provided sources
Also tried:
Copy code
@asset(ins={"dnb": SourceAsset(key=AssetKey("model/dunbradstreet_silver/dnb"))})
def dnb_asset(dnb):
    return dnb

Got: SourceAsset doesn't have argument key_prefix
c
A few things: • So you can't have a job in your current repo launch assets in another repo. The source asset represents the dependency, but it should only be used to actually retrieve the physical data, and not construct it. This means in your call to
define_asset_job
, you shouldn't be using a selection that tries to execute the source asset, per se. • In the actual asset, you can map asset keys to the inputs using
AssetIn
- you would do something like:
Copy code
@asset(ins={"dnb": AssetIn(key=AssetKey("model/dunbradstreet_silver/dnb"))})
def dnb_asset(dnb):
    return dnb
Then, all you need to do is provide the SourceAsset to the repository
r
So that last step would happen in the “original” dbt repo, and then I can refer to the dnb asset in the pipelines repo, right? Thanks a ton for you help!