Lindsay S
12/20/2022, 10:41 PMdaily_partitions_def = DailyPartitionsDefinition(start_date="2022-01-19")
@asset(
group_name="my_table_asset",
partitions_def=daily_partitions_def,
required_resource_keys={"some_api_client"}
)
def my_table_asset(context):
date = context.asset_partition_key_for_output()
context.resources.some_api_client.fetch_data(from_date_str=date)
...
partitioned_asset_job = define_asset_job(
name="my_table_asset_job",
selection=AssetSelection.assets(my_table_asset),
partitions_def=daily_partitions_def,
)
@repository
def extract_load():
return [
partitioned_asset_job, # when I remove this the code doesn't fail
with_resources(
[my_table_asset], resource_defs = resources_by_env[os.getenv("DAGSTER_DEPLOYMENT")]
),
]
rex
12/20/2022, 11:02 PMdagit -f ./minimal_example.py
# ./minimal_example.py
from dagster import (
AssetSelection,
DailyPartitionsDefinition,
ScheduleDefinition,
asset,
define_asset_job,
repository,
)
partitions_def = DailyPartitionsDefinition(start_date="2022-11-01")
@asset(
group_name="staging",
partitions_def=partitions_def,
)
def my_asset():
pass
# Materialize all assets in the repository
partitioned_asset_job = define_asset_job(
"partitioned_asset_job",
AssetSelection.assets(my_asset),
partitions_def=partitions_def,
)
@repository
def minimal_example():
return [
ScheduleDefinition(
job=partitioned_asset_job,
cron_schedule="@weekly",
),
my_asset,
]
Lindsay S
12/20/2022, 11:10 PMrex
12/20/2022, 11:12 PMDefinitions
API should not break existing references of @repository
Lindsay S
12/20/2022, 11:13 PMrex
12/20/2022, 11:15 PMDefinitions
, just for reference
from dagster import (
AssetSelection,
DailyPartitionsDefinition,
Definitions,
ScheduleDefinition,
asset,
define_asset_job,
)
partitions_def = DailyPartitionsDefinition(start_date="2022-11-01")
@asset(
group_name="staging",
partitions_def=partitions_def,
)
def my_asset():
pass
partitioned_asset_job = define_asset_job(
"partitioned_asset_job",
AssetSelection.assets(my_asset),
partitions_def=partitions_def,
)
defs = Definitions(
assets=[my_asset],
schedules=[
ScheduleDefinition(
job=partitioned_asset_job,
cron_schedule="@weekly",
)
],
)
marcos
01/29/2023, 10:05 PMdagster._check.CheckError: Failure condition: Tried to access partition_key_range for a non-partitioned run
. Any ideas?