Dominick Giordano
03/30/2022, 2:28 AMgrpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.DEADLINE_EXCEEDED details = "Deadline Exceeded" debug_error_string = "{"created":"@1648579523.062155830","description":"Error received from peer unix:/tmp/tmp270j3kna","file":"src/core/lib/surface/call.cc","file_line":903,"grpc_message":"Deadline Exceeded","grpc_status":4}"
I am trying to have a pipeline job run everytime a file has been added into a very large s3 bucket. My sensor functions currently look like:
def config_dagster_sensors():
##Dagster Sensors
dagster_sensors = []
def build_sensor() -> SensorDefinition:
return SensorDefinition(
name="pipeline_name_sensor",
pipeline_name="pipeline_name",
mode="s3",
minimum_interval_seconds=10,
evaluation_fn=sensor_fn
)
def sensor_fn(context: SensorExecutionContext):
new_s3_keys = get_s3_keys("bucket_name", since_key=context.cursor)
if not new_s3_keys:
yield SkipReason("No new s3 files found for bucket.")
return
for s3_key in new_s3_keys:
yield RunRequest(run_key=s3_key, run_config={}, pipeline_name="pipeline_name")
context.update_cursor(s3_key)
dagster_sensors.append(build_sensor())
return dagster_sensors
I am getting the impression the error is coming from not being able to finish in the 60 second time. Is there anyway to override this and allow it to finish no matter how long it takes?johann
03/30/2022, 2:34 PMnew_s3_keys
you’ll process within a single tick, and use the cursor to just pick up where you left offDominick Giordano
03/30/2022, 2:47 PMjohann
03/30/2022, 2:49 PMfor s3_key in new_s3_keys[:KEYS_PER_TICK_LIMIT]:
johann
03/30/2022, 2:51 PMdaniel
03/30/2022, 2:54 PMdaniel
03/30/2022, 2:55 PMDominick Giordano
03/30/2022, 3:04 PMDominick Giordano
03/30/2022, 3:58 PMdaniel
03/30/2022, 3:58 PMdaniel
03/30/2022, 4:00 PMDominick Giordano
03/30/2022, 4:02 PMDominick Giordano
03/30/2022, 4:02 PMdaniel
03/30/2022, 4:02 PMdaniel
03/30/2022, 4:03 PMDominick Giordano
03/30/2022, 4:03 PMdaniel
03/30/2022, 4:03 PMDominick Giordano
03/30/2022, 4:04 PMdaniel
03/30/2022, 4:04 PMDominick Giordano
03/30/2022, 4:05 PMdaniel
03/30/2022, 4:06 PMDominick Giordano
03/30/2022, 4:06 PMDominick Giordano
03/30/2022, 4:07 PMException: Stopping dagster-daemon process since the following threads are no longer sending heartbeats: ['SENSOR']
daniel
03/30/2022, 4:07 PMDominick Giordano
03/30/2022, 4:09 PMdaniel
03/30/2022, 4:10 PMdaniel
03/30/2022, 4:11 PM