Grigoriy Sterin
06/07/2022, 8:23 PMjohann
06/09/2022, 5:27 PMGrigoriy Sterin
06/09/2022, 5:30 PMjohann
06/09/2022, 5:37 PMclaire
06/09/2022, 6:06 PM@asset
def my_asset():
return 1
@asset_sensor(asset_key=AssetKey("my_asset"), job=my_job)
def sensor_to_test(context, asset_event):
yield RunRequest(
run_key=context.cursor,
run_config={
"ops": {
"read_materialization": {
"config": {
"asset_key": asset_event.dagster_event.asset_key.path,
}
}
}
},
)
def test_sensor():
with instance_for_test() as instance:
build_assets_job("yields_my_asset", [my_asset]).execute_in_process(instance=instance)
for run_request in sensor_to_test(build_sensor_context(instance=instance)):
assert run_request.run_key
Grigoriy Sterin
06/09/2022, 6:07 PM