Hebo Yang
08/22/2022, 5:28 PMAssetsDefinition.from_graph()
to create assets and GraphDefinition(output_mappings)
to output asset from an op of the graph. However, my asset generated doesn’t have partition metadata attached.
Could someone help take a look please? (Will provide more details in the thread)AssetsDefinition.from_graph()
and GraphDefinition()
. However, the in the asset page, the partition isn’t attached (see sc)
I needs the partition info from the context.instance.get_event_records()
so that I could figure out which partition the asset materialization was for from the sensor. (and it’s nice to have partitions properly display in the asset page). When I printed out the event logs, it looks like the AssetMaterialization is somehow missing the partition.
event_specific_data=StepMaterializationData(materialization=AssetMaterialization(asset_key=AssetKey(['DATALAKE', 'prod', 'ml', 'datasets', 'canary_complex_pipeline']), description=None, metadata_entries=[ MetadataEntry(label='path', description=None, entry_data=PathMetadataValue(path='/opt/dagster/dagster_home/storage/DATALAKE/prod/ml/datasets/canary_complex_pipeline/2022-08-21'))], partition=None), asset_lineage=[])
Here is the code I am using to create the asset, job, and dummy Op. Am I missing something in get_asset_op or maybe I need to pass in partition_mappings
or metadata_by_output_name
to from_graph()
?
asset = AssetsDefinition.from_graph(
graph,
keys_by_input_name=self.source_assets,
keys_by_output_name={self.asset_name: AssetKey(self.asset_key)},
resource_defs={"active_date": active_date_resouce, "labels": labels},
partitions_def=daily_partition,
)
job = graph.to_job(
name=self.get_name_for_source(),
config=self.get_daily_partition_config(),
# partitions_def=daily_partition,
resource_defs={"active_date": active_date_resouce, "labels": labels},
hooks={metrics_failure_hook},
)
def get_asset_op(op_name, ins, out_asset_name, out_asset_key: List[str], partition_def):
out = {
out_asset_name: Out(
Any,
asset_key=AssetKey(out_asset_key),
asset_partitions_def=partition_def
)
}
def asset_func(context, *args, **kwargs):
<http://context.log.info|context.log.info>(f"context: {type(context)} {context.partition_key}.")
yield Output(
value=out_asset_name,
output_name=out_asset_name,
)
return OpDefinition(
name=op_name,
ins=ins,
outs=out,
compute_fn=asset_func,
)
sandy
08/22/2022, 6:14 PMI couldn’t use `define_asset_job`() as I am hoping to include other ops inside my job.mind if I ask what kinds of things those other ops are doing?
Hebo Yang
08/22/2022, 6:40 PMsandy
08/22/2022, 6:45 PMdefine_asset_job
with graph-backed assets:
asset = AssetsDefinition.from_graph(...)
@repository
def repo():
return [asset, define_asset_job("name", selection=AssetSelection.assets(asset))]
would that work for you?Hebo Yang
08/22/2022, 8:25 PMsandy
08/22/2022, 8:26 PMAssetsDefinition.from_graph
on? are those ops downstream of your assets? yes, currently you would need to convert them to assets. do they not make sense as assets? or it's just a pain to need to convert them all at once vs. incrementally?Hebo Yang
08/22/2022, 8:32 PMsandy
08/22/2022, 8:37 PMFernando Mangussi
09/21/2022, 8:33 PMfrom dagster import (AssetsDefinition, asset, define_asset_job,
graph, op, repository,
TimeWindowPartitionsDefinition, with_resources,
)
every_10_min_partition = TimeWindowPartitionsDefinition(
'2022-09-21-00:23', fmt="%Y-%m-%d-%H:%M",
timezone='UTC',
cron_schedule='*/10 * * * *')
@asset(partitions_def=every_10_min_partition)
def upstream_asset() -> int:
return 1
@op
def add_one(value) -> int:
return value + 1
@op
def middle_op(from_add_one) -> int:
return from_add_one
@graph
def downstream_asset(upstream_asset) -> int:
return middle_op(add_one(upstream_asset))
downstream_asset = AssetsDefinition.from_graph(
graph_def=downstream_asset,
partitions_def=every_10_min_partition
)
downstream_asset_job = define_asset_job(
name="downstream_asset_job",
selection=["downstream_asset", "upstream_asset"],
partitions_def=every_10_min_partition,
)
@repository
def repo():
return [
downstream_asset,
upstream_asset,
downstream_asset_job,
]