Kevin Zhao
12/19/2022, 10:19 PMsensor->graph_backed_asset->asset1->asset2
The graph-backed-asset is triggered by the sensor
The dependency are created using named object. i.e.
graph-backed-asset = AssetsDefinition.from_graph(graph)
@asset
def asset1(graph_backed_asset):
However when I created repo as:
@repository
def repo():
return [
graph_backed_asset,
asset1,
asset2,
sensor,
define_asset_job("all_asset_job")
]
I get below errors when trying to call execute_in_process
dagster._core.errors.DagsterInvalidDefinitionError: Input asset '["graph_backed_asset"]' for asset '["asset1"]' is not produced by any of the provided asset ops and is not one of the provided sources
I have googled around the error message but so far no luck.
I am sure it's a silly semantics issue but I have been reading the docs but so far no luck.
Any suggestions are greatly appreciated.sean
12/20/2022, 4:48 PMgraph-backed-asset = AssetsDefinition.from_graph(graph)
is not valid python-- python identifiers can’t use hyphens. I’m guessing your issue will be resolved if you just use _
everywhere to separate wordsKevin Zhao
12/20/2022, 5:01 PMsean
12/20/2022, 5:22 PMAssetsDefinition.from_graph
? The upstream assets need to depend on an asset key, it’s not clear what asset keys your GBA is producing right now.Kevin Zhao
12/20/2022, 5:35 PM@op(
config_schema={
"source_bucket_name": str,
"source_prefix": str,
"working_folder": str,
}
)
def download_file(context):
import os
from argparse import Namespace
op_args = Namespace(**context.op_config)
bucket = storage.Client().bucket(op_args.source_bucket_name)
blob = bucket.blob(op_args.source_prefix)
dest_file = os.path.join(
op_args.working_folder, os.path.basename(op_args.source_prefix)
)
blob.download_to_filename(dest_file)
return dest_file
@graph
def file_download_graph():
return download_file()
def make_gcp_storage_sensor(
source_bucket_name="eiq-data-matrix-cil", source_prefix="input/", job=None
):
import os
job = job or file_download_graph
@sensor(
job=job,
minimum_interval_seconds=3600, # every 1 hour
description=f"Update in {os.path.join(source_bucket_name, source_prefix)} sensed for job {job.name}",
)
def gcp_storage_sensor():
bucket = storage.Client(source_bucket_name).get_bucket(source_bucket_name)
for blob in bucket.list_blobs(prefix=source_prefix, delimiter="/"):
if blob.name != source_prefix:
yield RunRequest(
# code omitted
)
return gcp_storage_sensor
downloaded_file_asset = AssetsDefinition.from_graph(file_download_graph)
file_download_graph
passed to AssetsDefinition.from_graph to get the graph backed assetsean
12/20/2022, 9:55 PMgraph_backed_asset
-- try this:AssetsDefinition.from_graph(file_download_graph, keys_by_output_name={"result": AssetKey(["graph_backed_asset"]))
graph_backed_asset
Kevin Zhao
12/21/2022, 3:22 PMsource_bucket_name and source_prefix
to the download op in the GBA, how do I test the GBA and downstream assets using execute_in_process
?
When I try to execute the full asset dag, it complains about missing source_bucket_name
and source_prefix
. Should I manually trigger the sensor then call the GBA? I am not 100% about the 2nd approach since it doesn't look very kosher.