I'm doing some live testing during the presentatio...
# integration-airflow
a
I'm doing some live testing during the presentation šŸ™‚ I try to load all my Dags as assets:
Copy code
dag_bag = DagBag(
        dag_folder=dag_path,
        include_examples=False,  # Exclude Airflow example dags
        safe_mode=safe_mode,
    )
    assets = []
    for dag in list(dag_bag.dags.values()):
        assets = assets + load_assets_from_airflow_dag(dag)


    return Definitions(
        assets=assets,
        executor=executor,
        loggers=loggers,
    )
It seems that this is a bad idea because I have this mistake:
Copy code
dagster._core.errors.DagsterInvalidDefinitionError: Conflicting versions of resource with key 'airflow_db' were provided to different assets. When constructing a job, all resource definitions provided to assets must match by reference equality for a given key.
But I don't get why šŸ˜•
j
@Alexandre Guitton thanks for the report, unfortunately multiple`load_assets_from_airflow_dag` calls in a repo/definitions object creates conflicting resource defs.
one of the reasons its marked as experimental, we are working on a fix
If you're just testing out the tooling i'd recommend using the jobs based apis
a
I also tested it, and I encountered some drawbacks: ā€¢ If an operator uses a pool, it "fails" (even if that step succeed) because the pool is not defined (and since retry is not implemented it stop at first try).
Copy code
[2023-02-08 18:11:00,419] {taskinstance.py:1025} INFO - Dependencies not met for <TaskInstance: wkf_github_api_rate.collect manual__2023-02-08T18:09:44.596652+00:00 [None]>, dependency 'Pool Slots Available' FAILED: ("Tasks using non-existent pool '%s' will not be scheduled", 'process_in_redshift')
[2023-02-08 18:11:00,424] {taskinstance.py:1226} WARNING - 
--------------------------------------------------------------------------------
[2023-02-08 18:11:00,424] {taskinstance.py:1227} WARNING - Rescheduling due to concurrency limits reached at task runtime. Attempt 1 of 7. State set to NONE.
ā€¢ If my job fails at a step, I can't restart over from the step that failed, because:
Copy code
[2023-02-08 18:12:31,901] {taskinstance.py:1025} INFO - Dependencies not met for <TaskInstance: wkf_github_api_rate.collect manual__2023-02-08T18:09:44.596652+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'is_hourly'}
That's why I wanted to check the behavior of the asset in these cases.
j
the asset apis will still have those same issues atm
retry-from-failure doesn't work atm because the airflow state is thrown away after each run, we are working on releasing a change that will allow this to be persisted. For pools currently you can either define them manually using
Pools
models in your definitions file or modify migrated dags manually to use dagster native concurrency limits https://docs.dagster.io/deployment/run-coordinator#limiting-run-concurrency
šŸ‘ 1
a
And there's not equivalent of clear tasks keeping upstream succeed either?
j
in dagster there isn't really a concept of clearing tasks but our retry system accomplishes a similiar thing https://docs.dagster.io/guides/dagster/re-execution but like i mentioned until dagster-airflow finds a way to persist its airflow db state it doesn't work (you'll need to rerun the full job if a failure occurs)
a
Super clear šŸ™‚ Thanks!
j
@Alexandre Guitton šŸ‘‹ 1.2.0 shipped with a way to persist airflow db state https://dagster.slack.com/archives/CH2SCAV19/p1678471767802909
a
I saw šŸŽ‰ I plan to test it this week šŸ˜Ž
@Joe a little bug for you dagster bot responded by community
Another point: when I run an airflow task with a pool, but there are not enough slots available in the pool, there is no wait or even retry but the task is marked as success without the task being executed, something voluntary or that can be managed?
Copy code
[2023-03-24 14:03:47,310] {configuration.py:854} INFO - Reading the config from /root/airflow/airflow.cfg
[2023-03-24 14:03:47,323] {settings.py:52} INFO - Configured default timezone Timezone('UTC')
[2023-03-24 14:03:48,837] {taskinstance.py:1035} INFO - Dependencies all met for <TaskInstance: wkf_airflow_db_clean.clean_log_table manual__2023-03-24T14:03:27.339365+00:00 [None]>
[2023-03-24 14:03:48,917] {taskinstance.py:1025} INFO - Dependencies not met for <TaskInstance: wkf_airflow_db_clean.clean_log_table manual__2023-03-24T14:03:27.339365+00:00 [None]>, dependency 'Pool Slots Available' FAILED: ('Not scheduling since there are %s open slots in pool %s and require %s pool slots', 0, 'process_in_redshift', 1)
[2023-03-24 14:03:49,037] {taskinstance.py:1226} WARNING - 
--------------------------------------------------------------------------------
[2023-03-24 14:03:49,037] {taskinstance.py:1227} WARNING - Rescheduling due to concurrency limits reached at task runtime. Attempt 1 of 7. State set to NONE.
[2023-03-24 14:03:49,037] {taskinstance.py:1234} WARNING - 
--------------------------------------------------------------------------------
By the way, this is a very nice improvement, which opens up more interesting scenarios for a potential migration from Airflow šŸ™