Rafael Gomes
01/17/2023, 12:28 PMrun_request_for_partition
function. It does return the error from the image attached.
I did set the name
attribute in my asset job.
I cannot use RunRequest
and pass job_name
since it does not allow partitions.
Similar code:
my_job = define_asset_job(
name="my_job",
selection=AssetSelection.groups("my_groups"),
partitions_def=my_partitions,
)
@sensor(
jobs=[my_job],
default_status=DefaultSensorStatus.RUNNING,
minimum_interval_seconds=30,
)
def my_sensor():
...
for key in partitions:
request = my_job.run_request_for_partition(
partition_key=key, run_key=key
)
yield request
Nicolas Parot Alvarez
01/17/2023, 1:33 PMjob=my_job
instead in your sensor decorator ? Maybe it fails because it is expecting more than one job given you used the jobs
parameter.Rafael Gomes
01/17/2023, 1:38 PMmy_job = define_asset_job(
name="my_job",
selection=AssetSelection.groups("my_groups"),
partitions_def=my_partitions,
)
my_second_job = define_asset_job(
name="my_second_job",
selection=AssetSelection.groups("my_second_groups"),
partitions_def=my_second_partitions,
)
@sensor(
jobs=[my_job, my_second_job],
default_status=DefaultSensorStatus.RUNNING,
minimum_interval_seconds=30,
)
def my_sensor():
...
for key in partitions:
request = my_job.run_request_for_partition(
partition_key=key, run_key=key
)
yield request
for key in second_partitions:
second_request = my_second_job.run_request_for_partition(
partition_key=key, run_key=key
)
yield second_request
Nicolas Parot Alvarez
01/17/2023, 2:10 PMRunRequest
so my guess is that multi-job sensors is not yet compatible with run_request_for_partition
.
Looks like an oversight considering the jobs
parameter is marked as experimental and the use case is not covered in the tests: https://github.com/dagster-io/dagster/blob/1.1.10/python_modules/dagster/dagster_tests/daemon_sensor_tests/test_sensor_run.pyRafael Gomes
01/17/2023, 2:51 PMNicolas Parot Alvarez
01/18/2023, 9:22 AMRafael Gomes
01/18/2023, 4:54 PMNicolas Parot Alvarez
01/19/2023, 9:53 AM