Hi, how do I best manage freshness policies. When ...
# ask-community
b
Hi, how do I best manage freshness policies. When I have a bigquery source table that is updated outside of the control of dagster. I can get events from Google pubsub to signal the table has been updated. Would I use asset versions for the source table? Or how can I signal that the data in the source table has been updated.
dagster bot responded by community 1
🤖 1
1
Downstream assets mostly live in dbt.
s
You might think about putting a sensor on your source table and then add a reconciliation for the downstream tables.
b
Wouldn’t it make sense to materialize via api instead of polling the table via api?
s
I had forgotten you had a pubsub message. I haven't done it, but a cloud function responding to the pubsub message to trigger the dagster API seems pretty viable as well.
👍 2
daggy love 1
t
I have a similar scenario I’m trying to solution for. I have a log sink going to pubsub for when a new source table lands in a monitored dataset in BigQuery. Does Dagster require being triggered via API from a Cloud Function? It isn’t possible to use a Dagster sensor directly tied to the pubsub topic?
👆 1
s
Even better, @Tony Tushar!
t
On second thought… 1. I don’t see any Dagster documentation about interacting with PubSub. 2. Do sensors incur compute billing as always being on? So perhaps it’s better to have a Cloud Function initiate a Dagster materialization via API
b
Actually I just checked if it is possible to send a post request directly from pub sub. And it seems it is not possible to define a post body. So a cloud function that triggers the api makes more sense. https://cloud.google.com/pubsub/docs/push
👍 1
s
A sensor that responds to a pubsub message could look like this (not tested, consider this pseudocode):
Copy code
import os
from dagster import sensor, RunRequest
from google.cloud import pubsub_v1


@sensor(job=trigger_job)
def my_directory_sensor():

    # project_id = "your-project-id" (probably in config)
    # subscription_id = "your-subscription-id"

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(project_id, subscription_id)

    NUM_MESSAGES = 1

    streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
    with subscriber:
        # The subscriber pulls a specific number of messages. The actual
        # number of messages pulled may be smaller than max_messages.
        response = subscriber.pull(
            request={"subscription": subscription_path, "max_messages": NUM_MESSAGES},
            retry=retry.Retry(deadline=300),
        )

        if len(response.received_messages) == 0:
            return

        ack_ids = []
        for received_message in response.received_messages:
            print(f"Received: {received_message.message.data}.")
            ack_ids.append(received_message.ack_id)

        # Acknowledges the received messages so they will not be sent again.
        subscriber.acknowledge(
            request={"subscription": subscription_path, "ack_ids": ack_ids}
        )
        filename = response.received_messages[0].message.data 
        
        if os.path.isfile(filename):
            yield RunRequest(
                run_key=filename,
                run_config={
                    "ops": {"process_file": {"config": {"filename": filename}}}
                },
            )
This uses a pubsub pull rather than push. By default, dagster sensor will attempt to pull a message from the subscription. If one exists (and the message contains the filename), the sensor will fire off the job using the filename pulled from the pubsub message. See https://cloud.google.com/pubsub/docs/pull#unary_pull_code_samples and https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors#defining-a-sensor.
🫠 1
🙏 1
b
@Sean Davis just tried your implementation and it seems to work as expected. Did not have to change much. Thank you!
👍 1