Hey all. I want to create such a flow. Task A is w...
# ask-community
v
Hey all. I want to create such a flow. Task A is waiting for a new file. When the file is received, task B starts processing the file, and task A again returns to wait for a new file. Multiple B tasks can run at the same time. 1. How can I do this? 2. Also, I want to stop specific task B. Can I do that and how? 3. There may be a chance that there will be a lot of files and I want to sort it by priority. Is this possible in dagster or do I have to write it myself?
d
Hi vlad - what you're describing here sounds like a good use case for a sensor to me - https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors#sensors That would mean that there would be a separate run for each B, and the sensor would be A in your example. That would also let you use our run prioritization features that are described here: https://docs.dagster.io/deployment/run-coordinator#priorities
(and you could terminate specific B's by terminating B's run)
v
Thank you. I will try this.
@daniel Hello Do you have a tutorial for this? I tried to create a sensor that runs a job that prints the file path. When I run
dagster-daemon run
I get
Copy code
c:\users\vladi\appdata\local\programs\python\python38\lib\site-packages\dagster\core\instance\config.py:31: UserWarning: No dagster instance configuration file (dagster.yaml) found at D:\Python_Projects\DAGSTER\project. Defaulting to loading and storing all metadata with D:\Python_Projects\DAGSTER\project. If this is the desired behavior, create an empty dagster.yaml file in D:\Python_Projects\DAGSTER\project.
  warnings.warn(
I didn't find any dagster.yaml example. What should this file include?
d
These docs explain how to configure your dagster.yaml file: https://docs.dagster.io/deployment/dagster-instance#overview
v
@daniel I tried to set up the sensor but it didn't work. I followed this documentation: https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors Maybe I configured it wrong. Here are my steps: 1. Created a project:
Copy code
dagster new-project test_project
cd test_project
pip install --editable .
dagit
2. Created an empty file D:\Python_Projects\DAGSTER\test_project\dagster.yaml 4. Created an empty file D:\Python_Projects\DAGSTER\test_project\logs\event.log 4. In another terminal ran dagster-daemon
Copy code
$env:DAGSTER_HOME = "D:\Python_Projects\DAGSTER\test_project"
dagster-daemon run
There is my code: Op file print_path.py:
Copy code
from dagster import op


@op(config_schema={"filepath": str})
def print_path(context):
    filepath = context.op_config["filepath"]
    print(filepath)
Job file run_print_path.py:
Copy code
from dagster import job
from test_project.ops.print_path import print_path


@job
def run_print_path():
    print("Job started")
    print_path()
Sensor file print_sensor.py:
Copy code
import os
from dagster import sensor, RunRequest
from test_project.jobs.run_print_path import run_print_path

MY_DIRECTORY = r'D:\Python_Projects\DAGSTER\python_project\my_folder'


@sensor(job=run_print_path)
def print_sensor(_context):
    for filename in os.listdir(MY_DIRECTORY):
        filepath = os.path.join(MY_DIRECTORY, filename)
        if os.path.isfile(filepath):
            yield RunRequest(
                run_key=filename,
                run_config={"ops": {"process_file": {"config": {"filepath": filepath}}}},
            )
Repository file repository.py:
Copy code
from dagster import repository
from test_project.jobs.run_print_path import run_print_path
from test_project.sensors.print_sensor import print_sensor


@repository
def test_project():
    jobs = [run_print_path]
    sensors = [print_sensor]
    return jobs + sensors
In browser I see that:
d
dagit and the daemon process need to both have the same value of DAGSTER_HOME, do they? If they do, the daemon look like it's running correctly from the process output? The end goal is to make that message in dagit that says the daemon isn't running to go away
v
I didn't configure DAGSTER_HOME for dagit. I run
dagit
and
dagster-daemon run
in the same folder D:\Python_Projects\DAGSTER\test_project
d
Can you try configuring it for dagit as well?
v
Just run
$env:DAGSTER_HOME = "D:\Python_Projects\DAGSTER\test_project"
before I run
dagit
?
d
Yeah
v
Now I get this error
Copy code
2022-01-29 16:18:27 +0200 - dagster.daemon.SensorDaemon - ERROR - Sensor daemon caught an error for sensor print_sensor : dagster.core.errors.DagsterInvalidConfigError: Error in config for job
    Error 1: Received unexpected config entry "process_file" at path root:ops. Expected: "{ print_path: { config: { filepath: String } outputs?: [{ result?: { json: { path: String } pickle: { path: String } } }] } }".
    Error 2: Missing required config entry "print_path" at path root:ops. Sample config for missing entry: {'print_path': {'config': {'filepath': '...'}}}

Stack Trace:
  File "c:\users\vladi\appdata\local\programs\python\python38\lib\site-packages\dagster\grpc\impl.py", line 354, in get_external_execution_plan_snapshot
    create_execution_plan(
  File "c:\users\vladi\appdata\local\programs\python\python38\lib\site-packages\dagster\core\execution\api.py", line 743, in create_execution_plan
    resolved_run_config = ResolvedRunConfig.build(pipeline_def, run_config, mode=mode)
  File "c:\users\vladi\appdata\local\programs\python\python38\lib\site-packages\dagster\core\system_config\objects.py", line 159, in build
    raise DagsterInvalidConfigError(
d
It looks like your sensor isn't returning run config in the format that dagster is expecting. The text of the error message includes the format that it is expecting
1
v
Now it works. Thank you!
condagster 1