Alec Ryan
05/01/2022, 12:16 AMprha
05/02/2022, 12:26 AMAlec Ryan
05/02/2022, 1:03 PMfrom dagster import AssetKey, asset_sensor, RunRequest, get_dagster_logger
from datetime import datetime
def create_snowflake_asset_sensor(key, snowflake_job):
@asset_sensor(
asset_key=AssetKey(key),
job=snowflake_job
)
def snowflake_sensor(context, asset_event):
asset_partition = asset_event.dagster_event.partition
yield RunRequest(
run_key=context.cursor,
run_config={
"resources": {
"snowflake": {
"config": {
'account': ACCOUNT,
'user':USER,
'password':PASSWORD,
'database': DB,
'warehouse': 'NHL_ANALYTICS'
}
},
"run_parameters" : {
"config": {
"run_date" : asset_partition
}
}
}
}
)
return snowflake_sensor
prha
05/02/2022, 3:41 PMasset_sensor
to create a custom sensor that gets the behavior you want.Alec Ryan
05/07/2022, 5:36 PMprha
05/09/2022, 6:50 PMfrom dagster import sensor, AssetKey, DagsterEventType, EventRecordsFilter, RunRequest
@sensor(job=my_job)
def my_asset_sensor(context):
records_cursor = int(context.cursor) if context.cursor else None
asset_records = context.instance.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
asset_key=AssetKey("table_a"),
after_cursor=records_cursor,
),
ascending=False,
)
if not asset_records:
return
for record in asset_records:
yield RunRequest(
run_key=str(record.record_id),
run_config={},
)
last_asset_record = asset_records[-1]
context.update_cursor(str(last_asset_record.record_id))
Alec Ryan
05/10/2022, 12:25 AMprha
05/10/2022, 12:26 AM