:white_check_mark: Solved! New tl;dr: when calling...
# integration-airflow
r
Solved! New tl;dr: when calling
to_job
on a graph, the variable name you assign the result to needs to be the same as the
name
argument to
to_job
. Howdy folks! I’m new to Dagster, and seeing if we can find a path to migrate to Dagster from our current Airflow deployment. Currently, I’m running across an interesting error from
make_airflow_dag
. The tl;dr is that make_airflow_dag is behaving like it can’t find my job, even though it’s clear from error output that the job is available in the module with the correct name. Read on for details… I have the sample graph and job like so:
Copy code
from dagster import graph

from myproject.ops.hello import hello

@graph
def say_hello():
    """
    A graph definition. This example graph has a single op.

    For more hints on writing Dagster graphs, see our documentation overview on Graphs:
    <https://docs.dagster.io/concepts/ops_graphs/graphs>
    """
    hello()

say_hello_job = say_hello.to_job()
And in a separate file (let’s say it’s
dag.py
), I have:
Copy code
dag, tasks = make_airflow_dag(
    module_name="myproject.graphs.say_hello",
    job_name="say_hello_job",
)
When I run
python dag.py
, I get the following:
Copy code
Traceback (most recent call last):
  File "repo/dags/dag.py", line 34, in <module>
    job_name="say_hello_job",
  File "/usr/local/lib/python3.7/site-packages/dagster_airflow/factory.py", line 311, in make_airflow_dag
    op_kwargs=op_kwargs,
  File "/usr/local/lib/python3.7/site-packages/dagster_airflow/factory.py", line 199, in _make_airflow_dag
    pipeline = recon_repo.get_definition().get_pipeline(job_name)
  File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/repository.py", line 971, in get_pipeline
    return self._repository_data.get_pipeline(name)
  File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/repository.py", line 732, in get_pipeline
    return self._pipelines.get_definition(pipeline_name)
  File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/repository.py", line 141, in get_definition
    for found_name in self.get_definition_names()
dagster.core.errors.DagsterInvariantViolationError: Could not find pipeline 'say_hello_job'. Found: .
This makes it look like dagster-airflow can’t find the members of this module. But by chance, I found that if I change the
job_name
argument to
make_airflow_dag
to name that doesn’t exist in the module (e.g.
my_nonexistent_job
), I rerun
python dag.py
and get:
Copy code
Traceback (most recent call last):
  File "repo/dags/dag.py", line 34, in <module>
    job_name="my_nonexistent_job",
  File "/usr/local/lib/python3.7/site-packages/dagster_airflow/factory.py", line 311, in make_airflow_dag
    op_kwargs=op_kwargs,
  File "/usr/local/lib/python3.7/site-packages/dagster_airflow/factory.py", line 199, in _make_airflow_dag
    pipeline = recon_repo.get_definition().get_pipeline(job_name)
  File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/reconstructable.py", line 48, in get_definition
    return repository_def_from_pointer(self.pointer)
  File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/reconstructable.py", line 560, in repository_def_from_pointer
    target = def_from_pointer(pointer)
  File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/reconstructable.py", line 502, in def_from_pointer
    target = pointer.load_target()
  File "/usr/local/lib/python3.7/site-packages/dagster/core/code_pointer.py", line 282, in load_target
    name=self.fn_name, module=self.module, dir=dir(module)
dagster.core.errors.DagsterInvariantViolationError: my_nonexistent_job not found in module holograph.graphs.say_hello. dir: ['__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', 'graph', 'hello', 'say_hello', 'say_hello_job']
It’s clear from the output that dagster-airflow has successfully loaded the module and that
say_hello_job
is there. Similarly, if I set the
job_name
to
say_hello
, which is the name of the graph, I get a helpful error message telling me that my job is a graph, and to run
.to_job
on it. I don’t know what I can do to fix this! Any thoughts?
m
@claire @owen
r
ty thankyou
o
hi @rupert! that's a pretty unfortunate / difficult to debug way to run into this problem. When you point to a file that doesn't explicitly define which graphs/jobs/etc. that it's exporting, it will have a default behavior of a) assuming that there is only one of those such objects defined in the top level of the file and b) exporting that element as the only loadable item from that file. Only things directly annotated with @job or @graph will be picked up in this way, and so when you point
make_airflow_dag
at that file, it actually only allows you to load
say_hello
, not
say_hello_job
(because
say_hello_job
is not created using a decorator). All of this behavior is basically impossible to decipher based on the error messages you were seeing, so apologies.
r
thanks owen, that’s very helpful!
1
o
as for solutions, you could replace
@graph
with
@job
, and get rid of the to_job() call if you don't plan on making multiple jobs out of the same graph, or for a more general solution, you could create an explicit repository that contains all of the jobs you want to be able to load from that file
r
ah gotcha. assuming I make a repository (I already have one), how would I point at it with
make_airflow_dag
?
o
I think you should be able to point make_airflow_dag at the file that contains the repository, and then any job that's in that repository should be accessible
r
interesting, I’ll give that a shot
no dice! I’m seeing the same behavior as before. the repository and all the sample jobs are available in the error output that shows
dir(myproject.repository)
, which is:
['__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', 'myproject', 'my_hourly_schedule', 'my_sensor', 'repository', 'say_hello_job']
I’ll presently try the standalone-job-file approach that you suggested and see how that works
👍 the standalone job approach works.
the following also does not work:
Copy code
from myproject.graphs.say_hello import say_hello

say_hello_job = say_hello.to_job()
which is interesting, since in the above file there are no @graph or @job annotations. The error messages that gives
dir
output shows that
say_hello
and
say_hello_job
are both accessible within the module. So it seems like there’s no difference between this and my first way of trying it.
o
hm I couldn't reproduce that behavior with repository -- I created a repository that looks like: (File:
subdir/test_repo.py
)
Copy code
from dagster import job, repository


@job
def foo():
    ...


@job
def bar():
    ...


@repository
def repo():
    return [foo, bar]
and I was able to successfully load both of these jobs as dags:
Copy code
from dagster_airflow import make_airflow_dag


dag, tasks = make_airflow_dag(
    module_name="subdir.test_repo",
    job_name="foo",
)
print(dag, tasks)

dag, tasks = make_airflow_dag(
    module_name="subdir.test_repo",
    job_name="bar",
)
print(dag, tasks)
r
ah, that makes sense to me actually.
what happens if you change your
@job
to an
@graph
, and make a job with
to_job
?
o
Copy code
from dagster import graph, repository


@graph
def foo():
    ...


@graph
def bar():
    ...


@repository
def repo():
    return [foo.to_job(), bar.to_job()]
this still works for me
r
oof
hmm
o
just to double check -- what version of dagster are you on? shouldn't make a huge difference but worth checking
r
any chance I’m outdated on any of these?
Copy code
"dagster~=0.13.5",
        "dagster_aws~=0.13.5",
        "dagster_airflow~=0.13.5",
        "dagit~=0.13.5",
hah same thought ^^
o
nope that all looks good (I'm on 0.13.5 as well)
are you able to share the file containing your repository? maybe it's something weird/subtle
r
I’ll see if I can reproduce. my total environment is not shareable because I’m running all this on a gross docker container within which our airflow deployment limps along
1
Ok, I can reproduce your success case inside my sad little container! 🥲
o
nice! so if im understanding correctly, the very minified example works fine, it's just that something about your actual repository is different and doesn't work?
r
yeah, it seems that way
fwiw, I can even delete the repository definition (
@repository
and below) from your minified example and it runs fine.
sorry that’s not quite clear. This is what I can reference successfully:
Copy code
from dagster import graph

from myproject.ops.hello import hello
from dagster_aws.redshift import redshift_resource


@graph
def say_hello():
    """
    A graph definition. This example graph has a single op.

    For more hints on writing Dagster graphs, see our documentation overview on Graphs:
    <https://docs.dagster.io/concepts/ops_graphs/graphs>
    """
    hello()


friendship = say_hello.to_job(
    name="foobar",
    resource_defs={"foo": redshift_resource}
)
the job references with the name
say_hello
, which is confusing to me since it’s not named that-- unless it inherits its name from the graph used to make it?
o
so you're able to reference this with
Copy code
make_airflow_dag(
    module_name="myproject.x.y.z",
    job_name="say_hello",
)
but not:
Copy code
make_airflow_dag(
    module_name="myproject.x.y.z",
    job_name="friendship",
)
?
r
correct! and I get the error (
Could not find pipeline 'friendship'. Found: .
) that indicates the module member exists, but doesn’t work for whatever reason
If i change the
job_name
to
"friendship2"
or something similar that is not an export of that file, I get the other error from my original post, the one with
dir
output.
So, I’m revisiting the third error case from my original message, the one I didn’t go into detail about. If I reference the graph explicitly, I get the following:
Copy code
dagster.core.errors.DagsterInvalidDefinitionError: Failed attempting to coerce Graph calculate_postpaid_billing in to a Job. Use to_job instead
, passing the required information.
The bit that I didn’t notice earlier is the failed attempting to coerce part.
Here’s the full error message, including the root cause of the error which I didn’t notice earlier:
Copy code
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/graph.py", line 521, in coerce_to_job
    return self.to_job()
  File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/graph.py", line 515, in to_job
    version_strategy=version_strategy,
  File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/job.py", line 52, in __init__
    version_strategy=version_strategy,
  File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/pipeline.py", line 241, in __init__
    for mode_def in self._mode_definitions
  File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/pipeline.py", line 241, in <dictcomp>
    for mode_def in self._mode_definitions
  File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/pipeline.py", line 730, in _checked_resource_reqs_for_mode
    raise DagsterInvalidDefinitionError(error_msg)
dagster.core.errors.DagsterInvalidDefinitionError: resource key 'data_warehouse' is required by op 'get_opted_in_orgs', but is not provided. Provide a resource for key 'data_warehouse',  or change 'data_warehouse' to one of the provided resource keys: ['io_manager'].

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "repo/dags/dag.py", line 62, in <module>
    dag_kwargs={"default_args": DEFAULT_ARGS, "max_active_runs": 1},
  File "/usr/local/lib/python3.7/site-packages/dagster_airflow/factory.py", line 311, in make_airflow_dag
    op_kwargs=op_kwargs,
  File "/usr/local/lib/python3.7/site-packages/dagster_airflow/factory.py", line 199, in _make_airflow_dag
    pipeline = recon_repo.get_definition().get_pipeline(job_name)
  File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/reconstructable.py", line 48, in get_definition
    return repository_def_from_pointer(self.pointer)
  File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/reconstructable.py", line 561, in repository_def_from_pointer
    repo_def = repository_def_from_target_def(target)
  File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/reconstructable.py", line 551, in repository_def_from_target_def
    repository_data=CachingRepositoryData.from_list([target]),
  File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/repository.py", line 610, in from_list
    coerced = definition.coerce_to_job()
  File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/graph.py", line 526, in coerce_to_job
    ) from err
dagster.core.errors.DagsterInvalidDefinitionError: Failed attempting to coerce Graph mygraph in to a Job. Use to_job instead
, passing the required information.
The way I read this is,
make_airflow_dag
is finding the graph object, and coercing it into a job if possible. With our minimal example, the coercion succeeds so it looks like the job was found as expected. In my slightly larger example, I have an op with a resource requirement. The job passes the resource in via
resource_defs
, but the graph does not. As a result the graph->job coercion fails.
o
gotcha -- there does seem to be a bug in this specific path (sorry for the misplaced optimism!) we'll get out a fix for this, but in the meantime are you able to make do with the @job solution?
r
yeah, as far as I know I’ll be able to make that work. This was a helpful little exercise, thanks for helping me get to a minimal reproduction of the bug. Would you like me to file an issue?
o
that would be great, thanks!
r
cool. if you have a second to reproduce this in your toy implementation, that would help prove my hypothesis
you should be able to reproduce it by adding anything to
required_resource_keys
on one of the ops in your job
o
weird...
Copy code
from dagster import op, graph, repository, ResourceDefinition


@op(required_resource_keys={"abc"})
def x():
    ...


@graph
def foo():
    x()


foo_job = foo.to_job(
    name="foo_job", resource_defs={"abc": ResourceDefinition.hardcoded_resource("xyz")}
)
loading "foo_job" works fine for me -- I'll need to dig into this more to figure out what exactly is happening
r
ok that’s strange
o
but yes certainly what I was saying about dagster only loading top level @graphs and @jobs is incorrect (this is true for dagit, but we use a different system for this package)
r
I’m perplexed as to how I could be so close to the happy path, and yet so far… sad cowboy
o
sorry you're hitting the rough edges here, thanks for bearing with us blobl grimace
I'll put aside some time tomorrow to dig into this
r
standby, it’s maybe working now? blob confused blob confused blob confused
ok, I think I figured it out. I’m not going to make any claims yet, but try changing the
name
(but not the variable name) of your
foo_job
so that the name and variable name don’t match.
o
ah yep that definitely would do it -- the code assumes that the variable name and the "name" field on the job will be identical (which they always will be if you use a decorator). tricky stuff, which only surfaces now that there is this .to_job() option
1
thanks so much for digging into this!
r
hah no problem! I swear I wasn’t trying to figure out arcane ways to waste y’alls time. Have a good evening 🌆
blob joy 1
m
can we give a better error message in this case
o
we definitely should -- this is a pretty rough discovery process. i'll create a ticket