https://dagster.io/ logo
r

Ryan

02/05/2021, 11:53 AM
Good day all. How can we pass resources to sensors? E.g., I have a pipeline with a DB or API connection as a required resource, and I'm writing a sensor to target that pipeline. The sensor itself needs to poll that DB/API for some state change in order to decide whether to kick off the rest of the pipeline. I know the
SensorExecutionContext
has a
DagsterInstance
, but not clear how to get from there to any resource information?
👍 2
d

daniel

02/05/2021, 2:27 PM
Hi Ryan - this is a reasonable request, but sensors and schedules don’t currently have a way to access the pipeline resources. Partly this is because the sensor loop that kicks off runs is happening separately from pipeline execution (unlike in, say, Airflow, where sensors are part of the DAG execution). You can still poll that same underlying DB/API as part of the sensor loop, it just doesn’t have access to the same resource abstraction that the pipeline does. @prha may have more context here as well.
r

Ryan

02/05/2021, 2:34 PM
Thanks Daniel, that tallies with what I've found so far. Not having access to resources per se is one thing, but given that sensors are required to be associated with a pipeline, it would make sense for them to have access to at least that pipeline's configs. Is there any way to navigate object relations from a
DagsterInstance
to any of the resources, or even the resource configs, which are currently defined within that instance? Or would one necessarily need to duplicate config in two places - once for the pipeline, and again somewhere to make it available to any sensors?
d

daniel

02/05/2021, 3:22 PM
I may not be understanding precisely what you mean by 'having access to the pipeline config'. The RunRequests that a sensor produces are responsible for supplying any config that the pipeline needs to run, including any resource config - so in that sense the sensor already needs to be aware of the pipeline's config (since it's responsible for supplying it). Wanting to access the underlying resource is very reasonable, it's just assumed in various ways in the system right now that at the time that you're creating the resource that you already have a run to execute. Right now your best bet may be to have the @resource decorator function produce an underlying object, and reference that object in your sensor? You could share a lot of code that way. e.g. in the example here: https://docs.dagster.io/overview/modes-resources-presets/modes-resources#glossary The @resource decorates a function that returns a PostgresDatabase class with the same config map - and if a sensor execution needs to use that same underlying resource, it could instantiate a PostgresDatabase directly? Not sure how cumbersome that is for your use case.
I guess even in that example it would get a bit messy since there's a dependent resource as well that you would also need to instantiate
r

Ryan

02/05/2021, 7:17 PM
Thanks daniel - really appreciate the suggestions, especially when the fundamental answer is, "that's not supported yet" 😁 I take your point about the sensor needing to know the pipeline's config anyway, which I had overlooked - first time using sensors 🤷🏼‍♂️ That in itself feels like a bit of a drawback, because again, it means we need to have the config in two places: once to apply to the pipeline, and then somehow separately available to the sensor (both to raise the RunRequest, and in our case to perform its own checking logic). About returning an underlying object, I'm not sure I follow . Our resources do return an object, but the issue is having the info to configure that object. Taking for example:
Copy code
class MyAPIClient:
    def __init__(self, config):
        self.base_url = config["base_url"]

    def query_for_new_work(self):
        # ...

    def post_something(self):
        # ...


@solid
def my_solid(required_resource_keys={"client"}):
    client.post_something()
    return 1


@resource(config_schema={"base_url": StringSource})
def my_resource(init_context):
    return MyAPIClient(init_context.resource_config)


@pipeline(
    mode_defs=[
        ModeDefinition(
            "default",
            resource_defs={"client": my_resource})
    ]
)
def my_pipeline():
    my_solid()


@sensor(pipeline_name="my_pipeline")
def my_sensor(context):
    url_config = somehow_get_runtime_configuration()
    client = MyAPIClient(url_config)
    
    new_items = client.query_for_new_work()
    if new_items:
        yield RunRequest(
            run_key="".join(new_items),
            run_config={"resources": {"my_resource": url_config}}
        )


@repository(name="my_repo")
def repo():
    return [my_pipeline, my_sensor]
So the sensor can happily create its own client object, but it needs to know or find out the appropriate config for that object. It would be really nice if it could use the config already made available to the pipeline via other means. But I get it - that's what we're saying we can't currently do 🙂 It seems then that the best way to "share config" between pipelines and sensors would be to have a preset yaml file passed to the pipeline via
preset_defs=[PresetDefinition.from_files(...)]
and then somewhat "manually" read that same file from within the sensor. Would you agree?
So above, the ideal would be for the
somehow_get_runtime_configuration()
method to magically be able to access whatever config was already in the instance, whether from the Dagit frontend or wherever, but seems the best we can do for now is to read from a preset yaml file.
p

prha

02/05/2021, 7:45 PM
Hey Ryan. I’ve been thinking about
PresetDefinition
as “give me the config for resources and solids for pipeline execution”. Also been thinking about sensors as tracking state outside of the context of pipeline execution, and whose responsibility it is to to provide the resources/solid config for pipeline execution. Essentially, playing the same role as a
PresetDefinition
, but dynamically rather than statically. It sounds like what you’re looking for is some common abstract utility “read configuration from yaml or some place”, that could provide pieces of config to both sensors and preset definitions? Do I have that right?
Also, happy to hop on a call to discuss…. Sensors are definitely a new API and would love your help understanding how you’d ideally like to structure and configure them.
r

Ryan

02/06/2021, 12:45 PM
Hey Phil 👋🏽🙂 "read configuration from yaml or some place" is right. As one of my colleagues said, "I would have thought the purpose of sensors was to monitor external things and that the purpose of resources was to be an abstraction for external...resources," which I think gets to the heart of it. If we were able to either A) pass resources directly to sensors, as we do with pipelines, or B) access some preconfigured state on pipelines, e.g. presets, either of those should help resolve the current challenge. But I think option A would be cleaner, more consistent, and maybe even a little more in keeping with the way you've been thinking about sensors up till now. It's interesting, because speaking for myself, sensors seem more akin to solids than presets - just "special" solids which the framework will run for us repeatedly (and perhaps without the overhead of a "proper" pipeline run in terms of logging etc? I'm not sure). And thinking of them more like solids I think makes the challenge here a little clearer: yes, they track outside state, but they'll usually need some kind of config in order to know where to access that outside state, and there's a good chance that the pipeline they're kicking off will also need to know some of that same config (e.g. in order to send outputs back to the same db where the inputs came from, or even just e.g. sensor monitors for new files and then pipeline needs to download them). So currently, that's where there's a bit of a challenge, in making that same config available to both sensors and the ensuing pipeline. Resources seem like a good abstraction for that purpose 🙂 I appreciate what daniel mentioned about the current Resource initialisation process assuming that there's already a run to execute. Maybe that's where things could be changed - like I said, thinking about sensors more like little, one-shot solids, with less overhead to run?
Worth noting that one way I've considered getting around this is: • Use a schedule instead of a sensor • The first solid in the pipeline becomes a "faux sensor" solid, and checks for those external state changes (using some of the
@resources
set up for our pipeline) • Use optional outputs to conditionally only carry on with the rest of the pipeline if we have new work to do 🙂 We'd probably have to think a bit about rolling our own solution for something like `run_key`s, but it feels like it would work. That said, in this case, because we're using config yamls for presets anyway, we'll probably just manually read those config files each time the sensor runs and initialise our clients for external resources that way.
Sorry for the length! Am not articulate enough to make it more succinct 😅 Very happy to discuss slack call next week if it would be helpful!
d

daniel

02/06/2021, 2:46 PM
This makes a ton of sense, thanks for explaining the thinking. The one thing I do want to make sure we’re on the same page about is that even with that schedule/solid workaround you mentioned, the schedule would still need to supply any config for the resource (and any other config for the pipeline) - i.e. the ‘somehow_get_runtime_configuration’ problem from earlier would still be a problem. It seems like there are actually two separate pieces of feedback here (both very reasonable) - one about being able to use resources in sensors, and one about more easily sharing pipeline config so that it doesn’t need to be duplicated across different places that launch runs for that pipeline.
p

Prasad Chakka

04/30/2021, 2:16 PM
hi @Ryan, I have hit upon similar problem. What did you end up doing of the options that you mentioned?
d

Donny Winston

06/22/2021, 8:15 PM
Also interested. I plan to use the strategy @Ryan outlined of using a e.g. cron_schedule=“* * * * *” schedule with a “guard” solid at the head of the pipeline, to sense whether to bail or not (i.e., https://docs.dagster.io/concepts/solids-pipelines/pipelines#conditional-branching). I gave a thumbs up to (https://github.com/dagster-io/dagster/issues/3794).
Following up: I couldn’t bring myself to hack a solid-as-sensor, and I settled on using
build_init_resource_context
and a
PresetDefinition
to instantiate a configured resource in a sensor via environment variables. This way, I get to reuse the mode and preset config I use for the pipeline that the sensor triggers. This seems to work well. Gist here: https://gist.github.com/dwinston/ebe4ab0c36a9d646c3cd8a6c77eb6644
2 Views