https://dagster.io/ logo
a

Arun Kumar

04/09/2021, 10:12 AM
Hi, can sensors also target a particular date partition of the pipeline similar to schedules (could not find this clearly in the doc)? In our case, we want to start the pipeline run based on a sensor which polls an external API and trigger the pipeline partition for the current date when the sensor triggers it.
s

sashank

04/09/2021, 10:38 AM
Hey Arun. I think you can skip over the partition API here. Let me know if there's a reason you need to use partitions that I might be missing. Your sensor can directly provide the date through config for your pipeline.
Copy code
import datetime
from dagster import sensor, RunRequest, SkipReason

@solid(config_schema={"date": str})
def my_solid(context):
    date = context.solid_config['date']
    # You can now access date here

@pipeline
def my_pipeline():
    my_solid()

@sensor(pipeline_name="my_pipeline")
def my_sensor(_)
   if external_api_condition:
       date = datetime.datetime.now().strftime("%Y/%m/%d")
       yield RunRequest(run_config={"solids": {"my_solid": {"config": {"date": date}}}})
    else:
        yield SkipReason(skip_message="External API condition not met")
^ just edited a typo
a

Arun Kumar

04/09/2021, 10:54 AM
Thanks @sashank If I just provide the date through config, and skip the Partition API completely I might not be able to view the pipelines by partition on Dagit right? I would like to use partition and backfill feature. Let me know if I am missing something here
s

sashank

04/09/2021, 10:58 AM
Yes you are correct. Have you already manually defined a partition set?
a

Arun Kumar

04/09/2021, 11:07 AM
I haven't started working on the implementation yet. Currently looking to design the pipeline and reading through docs. Let's say I have defined the following partition_set manually. If I trigger my
test_pipeline
pipeline using a sensor once everyday by passing the date partition through config, will dagit show the status of these runs in the partitions view of this pipeline? I guess it won't and thats's why I asked if sensors can use/generate PartitionSets
Copy code
import dagster
import datetime

def run_config_for_date_partition(partition):
    date = partition.value
    return {"solids": {"query_telemetry_events": {"config": {"date": date}}}}

partition_sets = dagster.PartitionSetDefinition(
    name="partition_set",
    pipeline_name="test_pipeline",
    partition_fn=dagster.utils.partitions.date_partition_range(
        start=datetime.datetime(2021, 4, 09),
        delta_range="days",
        inclusive=True,
        fmt="%Y-%m-%d-%H",
    ),
    run_config_fn_for_partition=run_config_for_date_partition,
)
s

sashank

04/09/2021, 11:13 AM
Great! From there, all you have to do is include the correct tags, and Dagit will show the status of the runs in the partitions view:
Copy code
@sensor(pipeline_name="test_pipeline")
def my_sensor(_):
    date_str = datetime.datetime.now().strftime("%Y-%m-%d-%H") # Make sure this format matches the partition set format
    partition = partition_set.get_partition(name=date_str)

    yield RunRequest(
        run_config=partition_set.run_config_for_partition(partition),
        tags=partition_set.tags_for_partition(partition),
    )
These are both great things to document
@Dagster Bot docs Document how to use
date_partition_range
to create custom date range partition sets
d

Dagster Bot

04/09/2021, 11:14 AM
s

sashank

04/09/2021, 11:15 AM
@Dagster Bot docs Document how to create runs for specific partitions using tags and
PartitionSet.tags_for_partition
d

Dagster Bot

04/09/2021, 11:15 AM
a

Arun Kumar

04/09/2021, 11:20 AM
Wow, this is cool 🙂 Could not find the info about
tags_for_partition
in the docs. Thanks a lot for the quick help and creating the tickets. I will try this out soon
s

sashank

04/09/2021, 11:25 AM
Yup it's not documented. Thanks for the interesting question and uncovering this gap!
👍 1
2 Views