Peter Lim
02/28/2024, 7:08 PMScout
02/28/2024, 7:08 PMAutoMaterializeRule.materialize_on_cron
rule. This rule allows you to specify that an asset partition should be materialized if it has not been materialized since the latest cron schedule tick. For assets with a time component to their partitions definition, this rule will request all partitions that have been missed since the previous tick. Here's an example from the documentation on how to use materialize_on_cron
with an `AutoMaterializePolicy`: python from dagster import AutoMaterializePolicy, AutoMaterializeRule, asset materialize_on_cron_policy = AutoMaterializePolicy.eager().with_rules( # try to materialize this asset if it hasn't been materialized since the last cron tick AutoMaterializeRule.materialize_on_cron("0 9 * * *", timezone="US/Central"), ) @asset(auto_materialize_policy=materialize_on_cron_policy) def root_asset(): ...
In this example, the materialize_on_cron
rule is added to an eager auto-materialize policy. The cron schedule is set to "0 9 * * *" which means every day at 9:00 AM according to the "US/Central" timezone. The root_asset
will be materialized if it hasn't been materialized since the last cron tick. If you have a partitioned asset, you can define the partitions and attach the auto-materialize policy to the asset as follows: python from dagster import AutoMaterializePolicy, DailyPartitionsDefinition, asset @asset( partitions_def=DailyPartitionsDefinition(start_date="2020-10-10"), auto_materialize_policy=AutoMaterializePolicy.eager(), ) def partitioned_asset(): ...
In this case, partitioned_asset
is a daily-partitioned asset with an eager auto-materialize policy. Dagster will notice new partitions and auto-materialize them as they are added. Remember to enable auto-materialization in your Dagster deployment through the UI by toggling the appropriate setting in the "Deployment" section under "Daemons" or "Agents" depending on whether you're using an open-source deployment or Dagster Cloud. Please note that the materialize_on_cron
rule can be customized with additional parameters such as timezone
and all_partitions
to further control the behavior of the auto-materialization based on cron schedules.Peter Lim
02/28/2024, 9:16 PMScout
02/28/2024, 9:17 PMpython from dagster import ScheduleDefinition, repository from dagster.core.asset_defs import build_assets_job # Define your assets and assign them to groups @asset(group_name="group1") def asset_a(): ... @asset(group_name="group2") def asset_b(): ... # Build jobs for each asset group group1_job = build_assets_job("group1_job", asset_group_name="group1") group2_job = build_assets_job("group2_job", asset_group_name="group2") # Define schedules for each job group1_schedule = ScheduleDefinition(job=group1_job, cron_schedule="0 8 * * *") group2_schedule = ScheduleDefinition(job=group2_job, cron_schedule="0 9 * * *") @repository def my_repository(): return [group1_job, group2_job, group1_schedule, group2_schedule]
In this example, asset_a
is part of group1
, and asset_b
is part of group2
. We create separate jobs for each group using build_assets_job
and then define schedules for each job with different cron schedules. This way, the materializations of assets in group1
and group2
will happen in separate runs according to their respective schedules. Please note that the exact syntax and features available may vary depending on the version of Dagster you are using, and you should refer to the latest Dagster documentation for the most up-to-date information.Peter Lim
02/28/2024, 9:18 PMScout
02/28/2024, 9:18 PMpython from dagster import asset, repository from dagster.core.asset_defs import build_assets_job from dagster.core.definitions.policy import AutoMaterializePolicy # Define assets and assign them to groups @asset(group_name="group1") def asset_group1_a(): ... @asset(group_name="group1") def asset_group1_b(): ... @asset(group_name="group2") def asset_group2_a(): ... @asset(group_name="group2") def asset_group2_b(): ... # Define an auto-materialization policy auto_mat_policy = AutoMaterializePolicy.eager() # Build jobs for each asset group with the auto-materialization policy group1_job = build_assets_job( "group1_job", asset_group_name="group1", auto_materialize_policy=auto_mat_policy ) group2_job = build_assets_job( "group2_job", asset_group_name="group2", auto_materialize_policy=auto_mat_policy ) @repository def my_repository(): return [group1_job, group2_job]
In this example, we define two asset groups, group1
and group2
, each with its own assets. We then create a job for each asset group using build_assets_job
and apply an eager auto-materialization policy to the assets within those jobs. This way, each job will handle the auto-materialization of its respective asset group. Please keep in mind that the auto-materialization feature was marked as experimental at the time of my knowledge cutoff, and the APIs and capabilities may have evolved since then. Always refer to the latest Dagster documentation for the most current information and best practices.