https://dagster.io/ logo
#ask-community
Title
# ask-community
k

Kevin Zhao

12/19/2022, 10:19 PM
I have a question regarding dependency for graph backed asset. I have below dag:
sensor->graph_backed_asset->asset1->asset2
The graph-backed-asset is triggered by the sensor The dependency are created using named object. i.e.
Copy code
graph-backed-asset = AssetsDefinition.from_graph(graph)

@asset
def asset1(graph_backed_asset):
However when I created repo as:
Copy code
@repository
def repo():
    return [
        graph_backed_asset,
        asset1,
        asset2,
        sensor,
        define_asset_job("all_asset_job")
    ]
I get below errors when trying to call
execute_in_process
Copy code
dagster._core.errors.DagsterInvalidDefinitionError: Input asset '["graph_backed_asset"]' for asset '["asset1"]' is not produced by any of the provided asset ops and is not one of the provided sources
I have googled around the error message but so far no luck. I am sure it's a silly semantics issue but I have been reading the docs but so far no luck. Any suggestions are greatly appreciated.
🤖 1
bump
bump
s

sean

12/20/2022, 4:48 PM
Hi Kevin-- the first thing I notice is that this line:
graph-backed-asset = AssetsDefinition.from_graph(graph)
is not valid python-- python identifiers can’t use hyphens. I’m guessing your issue will be resolved if you just use
_
everywhere to separate words
k

Kevin Zhao

12/20/2022, 5:01 PM
good finding, those are not real python codes, they are pseudo codes
I have replaced the hyphen in the pseudo code so it's cleaner. Sorry for the confusion, again, in my setup all the python symbols are correct.
s

sean

12/20/2022, 5:22 PM
What is the name of the graph you are passing to
AssetsDefinition.from_graph
? The upstream assets need to depend on an asset key, it’s not clear what asset keys your GBA is producing right now.
k

Kevin Zhao

12/20/2022, 5:35 PM
Copy code
@op(
    config_schema={
        "source_bucket_name": str,
        "source_prefix": str,
        "working_folder": str,
    }
)
def download_file(context):
    import os
    from argparse import Namespace

    op_args = Namespace(**context.op_config)

    bucket = storage.Client().bucket(op_args.source_bucket_name)
    blob = bucket.blob(op_args.source_prefix)
    dest_file = os.path.join(
        op_args.working_folder, os.path.basename(op_args.source_prefix)
    )
    blob.download_to_filename(dest_file)
    return dest_file


@graph
def file_download_graph():
    return download_file()


def make_gcp_storage_sensor(
    source_bucket_name="eiq-data-matrix-cil", source_prefix="input/", job=None
):
    import os

    job = job or file_download_graph

    @sensor(
        job=job,
        minimum_interval_seconds=3600,  # every 1 hour
        description=f"Update in {os.path.join(source_bucket_name, source_prefix)} sensed for job {job.name}",
    )
    def gcp_storage_sensor():
        bucket = storage.Client(source_bucket_name).get_bucket(source_bucket_name)

        for blob in bucket.list_blobs(prefix=source_prefix, delimiter="/"):
            if blob.name != source_prefix:
                yield RunRequest(
                    # code omitted
                )

    return gcp_storage_sensor

downloaded_file_asset = AssetsDefinition.from_graph(file_download_graph)
that's the code block that generate the graph_backed_asset
basically it's an op(download_file) wrapped in graph -> file_download_graph
then
file_download_graph
passed to AssetsDefinition.from_graph to get the graph backed asset
bump
s

sean

12/20/2022, 9:55 PM
ah, the problem is that your GBA is not producing an asset named
graph_backed_asset
-- try this:
Copy code
AssetsDefinition.from_graph(file_download_graph, keys_by_output_name={"result": AssetKey(["graph_backed_asset"]))
💯 2
either that or you could name the output of your graph
graph_backed_asset
k

Kevin Zhao

12/21/2022, 3:22 PM
that works, thanks a lot for your help @sean
can I ask a follow up question, so my GBA is triggered by a sensor, which provides
source_bucket_name and source_prefix
to the download op in the GBA, how do I test the GBA and downstream assets using
execute_in_process
? When I try to execute the full asset dag, it complains about missing
source_bucket_name
and
source_prefix
. Should I manually trigger the sensor then call the GBA? I am not 100% about the 2nd approach since it doesn't look very kosher.
Ideally I want to tell the sensor to run immediately and start triggering the GBA
31 Views