Averell
08/11/2022, 2:06 PM@multi_asset(
partitions_def=daily_partitions,
can_subset=True,
required_resource_keys={"my_client"},
outs={
"A": AssetOut(
key_prefix=["raw"],
),
"B": AssetOut(
key_prefix=["raw"],
),
}
)
def my_assets(context):
for obj in context.selected_output_names:
yield Output(value=<some_value>, output_name=obj)
And here is my test job:
@repository
def my_repo():
return [
with_resources(
load_assets_from_package_module(raw),
resource_defs=get_env_resources(deployment_env),
),
ScheduleDefinition(
job=define_asset_job("test_job", selection=["raw/A"]), cron_schedule="@daily"
),
]
my_repo().get_all_jobs()[1].execute_in_process(partition_key="2022-08-10")
I got the following error DagsterStepOutputNotFoundError: Core compute for op "my_assets" did not return an output for non-optional output "B"
I wonder why that error, when I have already set can_subset=True
?
Thanksclaire
08/11/2022, 5:40 PMis_required=False
on `AssetOut`:
@multi_asset(
partitions_def=partitions,
can_subset=True,
outs={
"A": AssetOut(
key_prefix=["raw"],
is_required=False,
),
"B": AssetOut(
key_prefix=["raw"],
is_required=False,
),
},
)
def my_assets(context):
yield Output(5, output_name="B")