Benedikt Buchert
01/28/2023, 8:05 PMBenedikt Buchert
01/28/2023, 8:32 PMSean Davis
01/29/2023, 6:53 PMBenedikt Buchert
01/29/2023, 8:06 PMSean Davis
01/29/2023, 8:12 PMTony Tushar
02/01/2023, 10:33 PMSean Davis
02/02/2023, 1:00 AMTony Tushar
02/02/2023, 3:38 PMBenedikt Buchert
02/02/2023, 3:41 PMSean Davis
02/03/2023, 4:10 AMimport os
from dagster import sensor, RunRequest
from google.cloud import pubsub_v1
@sensor(job=trigger_job)
def my_directory_sensor():
# project_id = "your-project-id" (probably in config)
# subscription_id = "your-subscription-id"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
NUM_MESSAGES = 1
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
with subscriber:
# The subscriber pulls a specific number of messages. The actual
# number of messages pulled may be smaller than max_messages.
response = subscriber.pull(
request={"subscription": subscription_path, "max_messages": NUM_MESSAGES},
retry=retry.Retry(deadline=300),
)
if len(response.received_messages) == 0:
return
ack_ids = []
for received_message in response.received_messages:
print(f"Received: {received_message.message.data}.")
ack_ids.append(received_message.ack_id)
# Acknowledges the received messages so they will not be sent again.
subscriber.acknowledge(
request={"subscription": subscription_path, "ack_ids": ack_ids}
)
filename = response.received_messages[0].message.data
if os.path.isfile(filename):
yield RunRequest(
run_key=filename,
run_config={
"ops": {"process_file": {"config": {"filename": filename}}}
},
)
This uses a pubsub pull rather than push. By default, dagster sensor will attempt to pull a message from the subscription. If one exists (and the message contains the filename), the sensor will fire off the job using the filename pulled from the pubsub message. See https://cloud.google.com/pubsub/docs/pull#unary_pull_code_samples and https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors#defining-a-sensor.Benedikt Buchert
02/13/2023, 4:39 PM