https://dagster.io/ logo
Title
v

Vlad Efanov

01/26/2022, 4:08 PM
Hey all. I want to create such a flow. Task A is waiting for a new file. When the file is received, task B starts processing the file, and task A again returns to wait for a new file. Multiple B tasks can run at the same time. 1. How can I do this? 2. Also, I want to stop specific task B. Can I do that and how? 3. There may be a chance that there will be a lot of files and I want to sort it by priority. Is this possible in dagster or do I have to write it myself?
d

daniel

01/26/2022, 4:11 PM
Hi vlad - what you're describing here sounds like a good use case for a sensor to me - https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors#sensors That would mean that there would be a separate run for each B, and the sensor would be A in your example. That would also let you use our run prioritization features that are described here: https://docs.dagster.io/deployment/run-coordinator#priorities
(and you could terminate specific B's by terminating B's run)
v

Vlad Efanov

01/26/2022, 4:21 PM
Thank you. I will try this.
@daniel Hello Do you have a tutorial for this? I tried to create a sensor that runs a job that prints the file path. When I run
dagster-daemon run
I get
c:\users\vladi\appdata\local\programs\python\python38\lib\site-packages\dagster\core\instance\config.py:31: UserWarning: No dagster instance configuration file (dagster.yaml) found at D:\Python_Projects\DAGSTER\project. Defaulting to loading and storing all metadata with D:\Python_Projects\DAGSTER\project. If this is the desired behavior, create an empty dagster.yaml file in D:\Python_Projects\DAGSTER\project.
  warnings.warn(
I didn't find any dagster.yaml example. What should this file include?
d

daniel

01/27/2022, 3:58 PM
These docs explain how to configure your dagster.yaml file: https://docs.dagster.io/deployment/dagster-instance#overview
v

Vlad Efanov

01/29/2022, 12:11 PM
@daniel I tried to set up the sensor but it didn't work. I followed this documentation: https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors Maybe I configured it wrong. Here are my steps: 1. Created a project:
dagster new-project test_project
cd test_project
pip install --editable .
dagit
2. Created an empty file D:\Python_Projects\DAGSTER\test_project\dagster.yaml 4. Created an empty file D:\Python_Projects\DAGSTER\test_project\logs\event.log 4. In another terminal ran dagster-daemon
$env:DAGSTER_HOME = "D:\Python_Projects\DAGSTER\test_project"
dagster-daemon run
There is my code: Op file print_path.py:
from dagster import op


@op(config_schema={"filepath": str})
def print_path(context):
    filepath = context.op_config["filepath"]
    print(filepath)
Job file run_print_path.py:
from dagster import job
from test_project.ops.print_path import print_path


@job
def run_print_path():
    print("Job started")
    print_path()
Sensor file print_sensor.py:
import os
from dagster import sensor, RunRequest
from test_project.jobs.run_print_path import run_print_path

MY_DIRECTORY = r'D:\Python_Projects\DAGSTER\python_project\my_folder'


@sensor(job=run_print_path)
def print_sensor(_context):
    for filename in os.listdir(MY_DIRECTORY):
        filepath = os.path.join(MY_DIRECTORY, filename)
        if os.path.isfile(filepath):
            yield RunRequest(
                run_key=filename,
                run_config={"ops": {"process_file": {"config": {"filepath": filepath}}}},
            )
Repository file repository.py:
from dagster import repository
from test_project.jobs.run_print_path import run_print_path
from test_project.sensors.print_sensor import print_sensor


@repository
def test_project():
    jobs = [run_print_path]
    sensors = [print_sensor]
    return jobs + sensors
In browser I see that:
d

daniel

01/29/2022, 12:46 PM
dagit and the daemon process need to both have the same value of DAGSTER_HOME, do they? If they do, the daemon look like it's running correctly from the process output? The end goal is to make that message in dagit that says the daemon isn't running to go away
v

Vlad Efanov

01/29/2022, 12:49 PM
I didn't configure DAGSTER_HOME for dagit. I run
dagit
and
dagster-daemon run
in the same folder D:\Python_Projects\DAGSTER\test_project
d

daniel

01/29/2022, 1:00 PM
Can you try configuring it for dagit as well?
v

Vlad Efanov

01/29/2022, 1:12 PM
Just run
$env:DAGSTER_HOME = "D:\Python_Projects\DAGSTER\test_project"
before I run
dagit
?
d

daniel

01/29/2022, 1:27 PM
Yeah
v

Vlad Efanov

01/29/2022, 2:20 PM
Now I get this error
2022-01-29 16:18:27 +0200 - dagster.daemon.SensorDaemon - ERROR - Sensor daemon caught an error for sensor print_sensor : dagster.core.errors.DagsterInvalidConfigError: Error in config for job
    Error 1: Received unexpected config entry "process_file" at path root:ops. Expected: "{ print_path: { config: { filepath: String } outputs?: [{ result?: { json: { path: String } pickle: { path: String } } }] } }".
    Error 2: Missing required config entry "print_path" at path root:ops. Sample config for missing entry: {'print_path': {'config': {'filepath': '...'}}}

Stack Trace:
  File "c:\users\vladi\appdata\local\programs\python\python38\lib\site-packages\dagster\grpc\impl.py", line 354, in get_external_execution_plan_snapshot
    create_execution_plan(
  File "c:\users\vladi\appdata\local\programs\python\python38\lib\site-packages\dagster\core\execution\api.py", line 743, in create_execution_plan
    resolved_run_config = ResolvedRunConfig.build(pipeline_def, run_config, mode=mode)
  File "c:\users\vladi\appdata\local\programs\python\python38\lib\site-packages\dagster\core\system_config\objects.py", line 159, in build
    raise DagsterInvalidConfigError(
d

daniel

01/29/2022, 2:28 PM
It looks like your sensor isn't returning run config in the format that dagster is expecting. The text of the error message includes the format that it is expecting
1
v

Vlad Efanov

01/29/2022, 2:37 PM
Now it works. Thank you!
:condagster: 1