alex
01/24/2020, 6:31 PMAmanda Crawford
01/24/2020, 6:31 PMAmanda Crawford
01/24/2020, 7:40 PMAmanda Crawford
01/24/2020, 7:40 PMFailure condition: Couldn't import module dagster_postgres.run_storage when attempting to rehydrate the configurable class dagster_postgres.run_storage.PostgresRunStorage
alex
01/24/2020, 8:10 PMAmanda Crawford
01/24/2020, 8:34 PMAmanda Crawford
01/27/2020, 2:50 PMMarwan
03/04/2020, 9:18 PMmax
03/04/2020, 9:54 PMmax
03/04/2020, 9:54 PMMarwan
03/05/2020, 1:20 AMmax
03/05/2020, 1:25 AMmax
03/05/2020, 1:25 AMdwall
08/07/2020, 12:17 PMmax
10/20/2020, 4:17 PMFlorian Polster
11/14/2020, 12:21 PMMakoto
01/29/2021, 7:58 PMAmisha Singla
03/10/2021, 5:49 PMmake_airflow_dag
method for deploy purpose. We have some more jobs in that airflow, however those are not dagster based. I am wondering if there is way to communicate any information between dagster dag and airflow dag?
Example:
task 1 (dagster emr step operator. This yield an output variable) -> task 2 (airflow ecs operator and needs output from task 1)Amisha Singla
03/10/2021, 5:51 PMrupert
11/15/2021, 10:27 PMto_job
on a graph, the variable name you assign the result to needs to be the same as the name
argument to to_job
.
Howdy folks! I’m new to Dagster, and seeing if we can find a path to migrate to Dagster from our current Airflow deployment. Currently, I’m running across an interesting error from make_airflow_dag
. The tl;dr is that make_airflow_dag is behaving like it can’t find my job, even though it’s clear from error output that the job is available in the module with the correct name. Read on for details…
I have the sample graph and job like so:
from dagster import graph
from myproject.ops.hello import hello
@graph
def say_hello():
"""
A graph definition. This example graph has a single op.
For more hints on writing Dagster graphs, see our documentation overview on Graphs:
<https://docs.dagster.io/concepts/ops_graphs/graphs>
"""
hello()
say_hello_job = say_hello.to_job()
And in a separate file (let’s say it’s dag.py
), I have:
dag, tasks = make_airflow_dag(
module_name="myproject.graphs.say_hello",
job_name="say_hello_job",
)
When I run python dag.py
, I get the following:
Traceback (most recent call last):
File "repo/dags/dag.py", line 34, in <module>
job_name="say_hello_job",
File "/usr/local/lib/python3.7/site-packages/dagster_airflow/factory.py", line 311, in make_airflow_dag
op_kwargs=op_kwargs,
File "/usr/local/lib/python3.7/site-packages/dagster_airflow/factory.py", line 199, in _make_airflow_dag
pipeline = recon_repo.get_definition().get_pipeline(job_name)
File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/repository.py", line 971, in get_pipeline
return self._repository_data.get_pipeline(name)
File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/repository.py", line 732, in get_pipeline
return self._pipelines.get_definition(pipeline_name)
File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/repository.py", line 141, in get_definition
for found_name in self.get_definition_names()
dagster.core.errors.DagsterInvariantViolationError: Could not find pipeline 'say_hello_job'. Found: .
This makes it look like dagster-airflow can’t find the members of this module. But by chance, I found that if I change the job_name
argument to make_airflow_dag
to name that doesn’t exist in the module (e.g. my_nonexistent_job
), I rerun python dag.py
and get:
Traceback (most recent call last):
File "repo/dags/dag.py", line 34, in <module>
job_name="my_nonexistent_job",
File "/usr/local/lib/python3.7/site-packages/dagster_airflow/factory.py", line 311, in make_airflow_dag
op_kwargs=op_kwargs,
File "/usr/local/lib/python3.7/site-packages/dagster_airflow/factory.py", line 199, in _make_airflow_dag
pipeline = recon_repo.get_definition().get_pipeline(job_name)
File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/reconstructable.py", line 48, in get_definition
return repository_def_from_pointer(self.pointer)
File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/reconstructable.py", line 560, in repository_def_from_pointer
target = def_from_pointer(pointer)
File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/reconstructable.py", line 502, in def_from_pointer
target = pointer.load_target()
File "/usr/local/lib/python3.7/site-packages/dagster/core/code_pointer.py", line 282, in load_target
name=self.fn_name, module=self.module, dir=dir(module)
dagster.core.errors.DagsterInvariantViolationError: my_nonexistent_job not found in module holograph.graphs.say_hello. dir: ['__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', 'graph', 'hello', 'say_hello', 'say_hello_job']
It’s clear from the output that dagster-airflow has successfully loaded the module and that say_hello_job
is there. Similarly, if I set the job_name
to say_hello
, which is the name of the graph, I get a helpful error message telling me that my job is a graph, and to run .to_job
on it.
I don’t know what I can do to fix this! Any thoughts?Kyle Hamlin
01/19/2022, 5:24 PMmake_dagster_repo_from_airflow_dags_path
trying to convert my airflow DAGs to dagster pipelines. Getting the following import exception, seem like maybe the dagster-airflow only supports an older version of airflow. Either way looks to be broken, any advice to resolve?
Stack Trace:
File "/Users/khamlin/.pyenv/versions/3.8.8/lib/python3.8/site-packages/dagster/grpc/server.py", line 216, in __init__
self._repository_symbols_and_code_pointers.load()
File "/Users/khamlin/.pyenv/versions/3.8.8/lib/python3.8/site-packages/dagster/grpc/server.py", line 92, in load
self._loadable_repository_symbols = load_loadable_repository_symbols(
File "/Users/khamlin/.pyenv/versions/3.8.8/lib/python3.8/site-packages/dagster/grpc/server.py", line 110, in load_loadable_repository_symbols
loadable_targets = get_loadable_targets(
File "/Users/khamlin/.pyenv/versions/3.8.8/lib/python3.8/site-packages/dagster/grpc/utils.py", line 27, in get_loadable_targets
else loadable_targets_from_python_file(python_file, working_directory)
File "/Users/khamlin/.pyenv/versions/3.8.8/lib/python3.8/site-packages/dagster/core/workspace/autodiscovery.py", line 17, in loadable_targets_from_python_file
loaded_module = load_python_file(python_file, working_directory)
File "/Users/khamlin/.pyenv/versions/3.8.8/lib/python3.8/site-packages/dagster/core/code_pointer.py", line 93, in load_python_file
raise DagsterImportError(
The above exception was caused by the following exception:
ImportError: cannot import name 'BaseOperator' from 'airflow.operators' (/Users/khamlin/.pyenv/versions/3.8.8/lib/python3.8/site-packages/airflow/operators/__init__.py)
Stack Trace:
File "/Users/khamlin/.pyenv/versions/3.8.8/lib/python3.8/site-packages/dagster/core/code_pointer.py", line 79, in load_python_file
return import_module_from_path(module_name, python_file)
File "/Users/khamlin/.pyenv/versions/3.8.8/lib/python3.8/site-packages/dagster/seven/__init__.py", line 50, in import_module_from_path
spec.loader.exec_module(module)
File "<frozen importlib._bootstrap_external>", line 783, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "data_eng/jobs/make_dagster_pipeline.py", line 1, in <module>
from dagster_airflow.dagster_pipeline_factory import make_dagster_repo_from_airflow_dags_path
File "/Users/khamlin/.pyenv/versions/3.8.8/lib/python3.8/site-packages/dagster_airflow/__init__.py", line 10, in <module>
from .factory import make_airflow_dag, make_airflow_dag_containerized, make_airflow_dag_for_operator
File "/Users/khamlin/.pyenv/versions/3.8.8/lib/python3.8/site-packages/dagster_airflow/factory.py", line 7, in <module>
from airflow.operators import BaseOperator
Dagster Jarred
10/14/2022, 4:05 PMDavid Stern
10/26/2022, 4:55 AMrex
10/26/2022, 3:15 PMJoe
11/04/2022, 3:40 PMmake_dagster_job_from_airflow_dag
• It now works with dags using Apache Airflow 2.0+
• It has a new mock_xcom
parameter which will fake all calls to xcomDagster Jarred
12/07/2022, 6:45 PMJoe
12/22/2022, 8:54 PMDean Morin
01/03/2023, 11:29 PMdepends_on_past
option in DAGs where a scheduled DAG run won’t kick off until the DAG run preceding it finishes successfully. What’s the recommended way to achieve this in Dagster?Alexandre Guitton
02/06/2023, 6:00 PMmake_dagster_definitions_from_airflow_dags_path
to move some Airflow DAGs to Dagster.
But I can't find a way to set the executor config for the returned Definition
object, is it possible ?Joe
02/08/2023, 4:47 PM