rupert
11/15/2021, 10:27 PMto_job
on a graph, the variable name you assign the result to needs to be the same as the name
argument to to_job
.
Howdy folks! I’m new to Dagster, and seeing if we can find a path to migrate to Dagster from our current Airflow deployment. Currently, I’m running across an interesting error from make_airflow_dag
. The tl;dr is that make_airflow_dag is behaving like it can’t find my job, even though it’s clear from error output that the job is available in the module with the correct name. Read on for details…
I have the sample graph and job like so:
from dagster import graph
from myproject.ops.hello import hello
@graph
def say_hello():
"""
A graph definition. This example graph has a single op.
For more hints on writing Dagster graphs, see our documentation overview on Graphs:
<https://docs.dagster.io/concepts/ops_graphs/graphs>
"""
hello()
say_hello_job = say_hello.to_job()
And in a separate file (let’s say it’s dag.py
), I have:
dag, tasks = make_airflow_dag(
module_name="myproject.graphs.say_hello",
job_name="say_hello_job",
)
When I run python dag.py
, I get the following:
Traceback (most recent call last):
File "repo/dags/dag.py", line 34, in <module>
job_name="say_hello_job",
File "/usr/local/lib/python3.7/site-packages/dagster_airflow/factory.py", line 311, in make_airflow_dag
op_kwargs=op_kwargs,
File "/usr/local/lib/python3.7/site-packages/dagster_airflow/factory.py", line 199, in _make_airflow_dag
pipeline = recon_repo.get_definition().get_pipeline(job_name)
File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/repository.py", line 971, in get_pipeline
return self._repository_data.get_pipeline(name)
File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/repository.py", line 732, in get_pipeline
return self._pipelines.get_definition(pipeline_name)
File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/repository.py", line 141, in get_definition
for found_name in self.get_definition_names()
dagster.core.errors.DagsterInvariantViolationError: Could not find pipeline 'say_hello_job'. Found: .
This makes it look like dagster-airflow can’t find the members of this module. But by chance, I found that if I change the job_name
argument to make_airflow_dag
to name that doesn’t exist in the module (e.g. my_nonexistent_job
), I rerun python dag.py
and get:
Traceback (most recent call last):
File "repo/dags/dag.py", line 34, in <module>
job_name="my_nonexistent_job",
File "/usr/local/lib/python3.7/site-packages/dagster_airflow/factory.py", line 311, in make_airflow_dag
op_kwargs=op_kwargs,
File "/usr/local/lib/python3.7/site-packages/dagster_airflow/factory.py", line 199, in _make_airflow_dag
pipeline = recon_repo.get_definition().get_pipeline(job_name)
File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/reconstructable.py", line 48, in get_definition
return repository_def_from_pointer(self.pointer)
File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/reconstructable.py", line 560, in repository_def_from_pointer
target = def_from_pointer(pointer)
File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/reconstructable.py", line 502, in def_from_pointer
target = pointer.load_target()
File "/usr/local/lib/python3.7/site-packages/dagster/core/code_pointer.py", line 282, in load_target
name=self.fn_name, module=self.module, dir=dir(module)
dagster.core.errors.DagsterInvariantViolationError: my_nonexistent_job not found in module holograph.graphs.say_hello. dir: ['__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', 'graph', 'hello', 'say_hello', 'say_hello_job']
It’s clear from the output that dagster-airflow has successfully loaded the module and that say_hello_job
is there. Similarly, if I set the job_name
to say_hello
, which is the name of the graph, I get a helpful error message telling me that my job is a graph, and to run .to_job
on it.
I don’t know what I can do to fix this! Any thoughts?max
11/15/2021, 10:28 PMrupert
11/15/2021, 10:30 PMowen
11/15/2021, 10:51 PMmake_airflow_dag
at that file, it actually only allows you to load say_hello
, not say_hello_job
(because say_hello_job
is not created using a decorator). All of this behavior is basically impossible to decipher based on the error messages you were seeing, so apologies.rupert
11/15/2021, 10:53 PMowen
11/15/2021, 10:53 PM@graph
with @job
, and get rid of the to_job() call if you don't plan on making multiple jobs out of the same graph, or for a more general solution, you could create an explicit repository that contains all of the jobs you want to be able to load from that filerupert
11/15/2021, 10:54 PMmake_airflow_dag
?owen
11/15/2021, 10:55 PMrupert
11/15/2021, 11:06 PMrupert
11/15/2021, 11:10 PMdir(myproject.repository)
, which is: ['__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', 'myproject', 'my_hourly_schedule', 'my_sensor', 'repository', 'say_hello_job']
rupert
11/15/2021, 11:14 PMrupert
11/15/2021, 11:18 PMrupert
11/15/2021, 11:21 PMfrom myproject.graphs.say_hello import say_hello
say_hello_job = say_hello.to_job()
which is interesting, since in the above file there are no @graph or @job annotations. The error messages that gives dir
output shows that say_hello
and say_hello_job
are both accessible within the module. So it seems like there’s no difference between this and my first way of trying it.owen
11/15/2021, 11:32 PMsubdir/test_repo.py
)
from dagster import job, repository
@job
def foo():
...
@job
def bar():
...
@repository
def repo():
return [foo, bar]
and I was able to successfully load both of these jobs as dags:
from dagster_airflow import make_airflow_dag
dag, tasks = make_airflow_dag(
module_name="subdir.test_repo",
job_name="foo",
)
print(dag, tasks)
dag, tasks = make_airflow_dag(
module_name="subdir.test_repo",
job_name="bar",
)
print(dag, tasks)
rupert
11/15/2021, 11:32 PMrupert
11/15/2021, 11:33 PM@job
to an @graph
, and make a job with to_job
?owen
11/15/2021, 11:33 PMfrom dagster import graph, repository
@graph
def foo():
...
@graph
def bar():
...
@repository
def repo():
return [foo.to_job(), bar.to_job()]
owen
11/15/2021, 11:34 PMrupert
11/15/2021, 11:34 PMrupert
11/15/2021, 11:34 PMowen
11/15/2021, 11:34 PMrupert
11/15/2021, 11:35 PM"dagster~=0.13.5",
"dagster_aws~=0.13.5",
"dagster_airflow~=0.13.5",
"dagit~=0.13.5",
rupert
11/15/2021, 11:35 PMowen
11/15/2021, 11:35 PMowen
11/15/2021, 11:36 PMrupert
11/15/2021, 11:37 PMrupert
11/15/2021, 11:52 PMowen
11/15/2021, 11:56 PMrupert
11/15/2021, 11:56 PMrupert
11/15/2021, 11:56 PM@repository
and below) from your minified example and it runs fine.rupert
11/15/2021, 11:58 PMfrom dagster import graph
from myproject.ops.hello import hello
from dagster_aws.redshift import redshift_resource
@graph
def say_hello():
"""
A graph definition. This example graph has a single op.
For more hints on writing Dagster graphs, see our documentation overview on Graphs:
<https://docs.dagster.io/concepts/ops_graphs/graphs>
"""
hello()
friendship = say_hello.to_job(
name="foobar",
resource_defs={"foo": redshift_resource}
)
rupert
11/15/2021, 11:59 PMsay_hello
, which is confusing to me since it’s not named that-- unless it inherits its name from the graph used to make it?owen
11/16/2021, 12:01 AMmake_airflow_dag(
module_name="myproject.x.y.z",
job_name="say_hello",
)
but not:
make_airflow_dag(
module_name="myproject.x.y.z",
job_name="friendship",
)
?rupert
11/16/2021, 12:03 AMCould not find pipeline 'friendship'. Found: .
) that indicates the module member exists, but doesn’t work for whatever reasonrupert
11/16/2021, 12:04 AMjob_name
to "friendship2"
or something similar that is not an export of that file, I get the other error from my original post, the one with dir
output.rupert
11/16/2021, 12:07 AMdagster.core.errors.DagsterInvalidDefinitionError: Failed attempting to coerce Graph calculate_postpaid_billing in to a Job. Use to_job instead
, passing the required information.
The bit that I didn’t notice earlier is the failed attempting to coerce part.rupert
11/16/2021, 12:08 AMTraceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/graph.py", line 521, in coerce_to_job
return self.to_job()
File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/graph.py", line 515, in to_job
version_strategy=version_strategy,
File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/job.py", line 52, in __init__
version_strategy=version_strategy,
File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/pipeline.py", line 241, in __init__
for mode_def in self._mode_definitions
File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/pipeline.py", line 241, in <dictcomp>
for mode_def in self._mode_definitions
File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/pipeline.py", line 730, in _checked_resource_reqs_for_mode
raise DagsterInvalidDefinitionError(error_msg)
dagster.core.errors.DagsterInvalidDefinitionError: resource key 'data_warehouse' is required by op 'get_opted_in_orgs', but is not provided. Provide a resource for key 'data_warehouse', or change 'data_warehouse' to one of the provided resource keys: ['io_manager'].
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "repo/dags/dag.py", line 62, in <module>
dag_kwargs={"default_args": DEFAULT_ARGS, "max_active_runs": 1},
File "/usr/local/lib/python3.7/site-packages/dagster_airflow/factory.py", line 311, in make_airflow_dag
op_kwargs=op_kwargs,
File "/usr/local/lib/python3.7/site-packages/dagster_airflow/factory.py", line 199, in _make_airflow_dag
pipeline = recon_repo.get_definition().get_pipeline(job_name)
File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/reconstructable.py", line 48, in get_definition
return repository_def_from_pointer(self.pointer)
File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/reconstructable.py", line 561, in repository_def_from_pointer
repo_def = repository_def_from_target_def(target)
File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/reconstructable.py", line 551, in repository_def_from_target_def
repository_data=CachingRepositoryData.from_list([target]),
File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/repository.py", line 610, in from_list
coerced = definition.coerce_to_job()
File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/graph.py", line 526, in coerce_to_job
) from err
dagster.core.errors.DagsterInvalidDefinitionError: Failed attempting to coerce Graph mygraph in to a Job. Use to_job instead
, passing the required information.
rupert
11/16/2021, 12:11 AMmake_airflow_dag
is finding the graph object, and coercing it into a job if possible. With our minimal example, the coercion succeeds so it looks like the job was found as expected. In my slightly larger example, I have an op with a resource requirement. The job passes the resource in via resource_defs
, but the graph does not. As a result the graph->job coercion fails.owen
11/16/2021, 12:12 AMrupert
11/16/2021, 12:14 AMowen
11/16/2021, 12:14 AMrupert
11/16/2021, 12:16 AMrupert
11/16/2021, 12:17 AMrequired_resource_keys
on one of the ops in your jobowen
11/16/2021, 12:23 AMfrom dagster import op, graph, repository, ResourceDefinition
@op(required_resource_keys={"abc"})
def x():
...
@graph
def foo():
x()
foo_job = foo.to_job(
name="foo_job", resource_defs={"abc": ResourceDefinition.hardcoded_resource("xyz")}
)
loading "foo_job" works fine for me -- I'll need to dig into this more to figure out what exactly is happeningrupert
11/16/2021, 12:23 AMowen
11/16/2021, 12:24 AMrupert
11/16/2021, 12:24 AMowen
11/16/2021, 12:25 AMowen
11/16/2021, 12:26 AMrupert
11/16/2021, 12:26 AMrupert
11/16/2021, 12:30 AMname
(but not the variable name) of your foo_job
so that the name and variable name don’t match.owen
11/16/2021, 12:48 AMowen
11/16/2021, 12:49 AMrupert
11/16/2021, 12:50 AMmax
11/16/2021, 4:37 PMowen
11/16/2021, 5:18 PMowen
11/16/2021, 5:21 PM