https://dagster.io/ logo
Title
a

Alexandre Guitton

02/08/2023, 5:58 PM
I'm doing some live testing during the presentation šŸ™‚ I try to load all my Dags as assets:
dag_bag = DagBag(
        dag_folder=dag_path,
        include_examples=False,  # Exclude Airflow example dags
        safe_mode=safe_mode,
    )
    assets = []
    for dag in list(dag_bag.dags.values()):
        assets = assets + load_assets_from_airflow_dag(dag)


    return Definitions(
        assets=assets,
        executor=executor,
        loggers=loggers,
    )
It seems that this is a bad idea because I have this mistake:
dagster._core.errors.DagsterInvalidDefinitionError: Conflicting versions of resource with key 'airflow_db' were provided to different assets. When constructing a job, all resource definitions provided to assets must match by reference equality for a given key.
But I don't get why šŸ˜•
j

Joe

02/08/2023, 5:59 PM
@Alexandre Guitton thanks for the report, unfortunately multiple`load_assets_from_airflow_dag` calls in a repo/definitions object creates conflicting resource defs.
one of the reasons its marked as experimental, we are working on a fix
If you're just testing out the tooling i'd recommend using the jobs based apis
a

Alexandre Guitton

02/08/2023, 6:15 PM
I also tested it, and I encountered some drawbacks: • If an operator uses a pool, it "fails" (even if that step succeed) because the pool is not defined (and since retry is not implemented it stop at first try).
[2023-02-08 18:11:00,419] {taskinstance.py:1025} INFO - Dependencies not met for <TaskInstance: wkf_github_api_rate.collect manual__2023-02-08T18:09:44.596652+00:00 [None]>, dependency 'Pool Slots Available' FAILED: ("Tasks using non-existent pool '%s' will not be scheduled", 'process_in_redshift')
[2023-02-08 18:11:00,424] {taskinstance.py:1226} WARNING - 
--------------------------------------------------------------------------------
[2023-02-08 18:11:00,424] {taskinstance.py:1227} WARNING - Rescheduling due to concurrency limits reached at task runtime. Attempt 1 of 7. State set to NONE.
• If my job fails at a step, I can't restart over from the step that failed, because:
[2023-02-08 18:12:31,901] {taskinstance.py:1025} INFO - Dependencies not met for <TaskInstance: wkf_github_api_rate.collect manual__2023-02-08T18:09:44.596652+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'is_hourly'}
That's why I wanted to check the behavior of the asset in these cases.
j

Joe

02/08/2023, 6:16 PM
the asset apis will still have those same issues atm
retry-from-failure doesn't work atm because the airflow state is thrown away after each run, we are working on releasing a change that will allow this to be persisted. For pools currently you can either define them manually using
Pools
models in your definitions file or modify migrated dags manually to use dagster native concurrency limits https://docs.dagster.io/deployment/run-coordinator#limiting-run-concurrency
šŸ‘ 1
a

Alexandre Guitton

02/08/2023, 6:29 PM
And there's not equivalent of clear tasks keeping upstream succeed either?
j

Joe

02/08/2023, 6:31 PM
in dagster there isn't really a concept of clearing tasks but our retry system accomplishes a similiar thing https://docs.dagster.io/guides/dagster/re-execution but like i mentioned until dagster-airflow finds a way to persist its airflow db state it doesn't work (you'll need to rerun the full job if a failure occurs)
a

Alexandre Guitton

02/08/2023, 6:33 PM
Super clear šŸ™‚ Thanks!
j

Joe

03/10/2023, 6:10 PM
@Alexandre Guitton šŸ‘‹ 1.2.0 shipped with a way to persist airflow db state https://dagster.slack.com/archives/CH2SCAV19/p1678471767802909
a

Alexandre Guitton

03/11/2023, 10:24 AM
I saw šŸŽ‰ I plan to test it this week šŸ˜Ž
@Joe a little bug for you :dagster-bot-responded-by-community:
Another point: when I run an airflow task with a pool, but there are not enough slots available in the pool, there is no wait or even retry but the task is marked as success without the task being executed, something voluntary or that can be managed?
[2023-03-24 14:03:47,310] {configuration.py:854} INFO - Reading the config from /root/airflow/airflow.cfg
[2023-03-24 14:03:47,323] {settings.py:52} INFO - Configured default timezone Timezone('UTC')
[2023-03-24 14:03:48,837] {taskinstance.py:1035} INFO - Dependencies all met for <TaskInstance: wkf_airflow_db_clean.clean_log_table manual__2023-03-24T14:03:27.339365+00:00 [None]>
[2023-03-24 14:03:48,917] {taskinstance.py:1025} INFO - Dependencies not met for <TaskInstance: wkf_airflow_db_clean.clean_log_table manual__2023-03-24T14:03:27.339365+00:00 [None]>, dependency 'Pool Slots Available' FAILED: ('Not scheduling since there are %s open slots in pool %s and require %s pool slots', 0, 'process_in_redshift', 1)
[2023-03-24 14:03:49,037] {taskinstance.py:1226} WARNING - 
--------------------------------------------------------------------------------
[2023-03-24 14:03:49,037] {taskinstance.py:1227} WARNING - Rescheduling due to concurrency limits reached at task runtime. Attempt 1 of 7. State set to NONE.
[2023-03-24 14:03:49,037] {taskinstance.py:1234} WARNING - 
--------------------------------------------------------------------------------
By the way, this is a very nice improvement, which opens up more interesting scenarios for a potential migration from Airflow šŸ™