Mycchaka Kleinbort
03/09/2023, 1:35 PM@op(config_schema={"scheduled_date": str})
def configurable_op(context):
<http://context.log.info|context.log.info>(context.op_config["scheduled_date"])
@job
def configurable_job():
configurable_op()
@schedule(job=configurable_job, cron_schedule="0 0 * * *")
def configurable_job_schedule(context: ScheduleEvaluationContext):
scheduled_date = context.scheduled_execution_time.strftime("%Y-%m-%d")
return RunRequest(
run_key=None,
run_config={
"ops": {"configurable_op": {"config": {"scheduled_date": scheduled_date}}}
},
tags={"date": scheduled_date},
)
I'm trying to update some assets instead of running a job. Here is what I have:
from dagster import RunRequest, AssetKey, schedule
@schedule(cron_schedule="*/1 * * * *")
def please_update_assets_v1():
return RunRequest(
run_key=None,
run_config=None,
tags={'automated':'yes'},
asset_selection=[AssetKey('X_train'), AssetKey('y_train'), AssetKey('model')],
stale_assets_only=False,
)
This results in an error asking for a job_name; I tried adding some random name and the project loaded fine but the assets are not getting materialized.Jean Gonzalez
03/09/2023, 5:14 PMdefine_asset_job
function. https://docs.dagster.io/_apidocs/assetschris
03/09/2023, 10:18 PMchris
03/09/2023, 10:24 PM