Alexandre Guitton
02/08/2023, 5:58 PMdag_bag = DagBag(
dag_folder=dag_path,
include_examples=False, # Exclude Airflow example dags
safe_mode=safe_mode,
)
assets = []
for dag in list(dag_bag.dags.values()):
assets = assets + load_assets_from_airflow_dag(dag)
return Definitions(
assets=assets,
executor=executor,
loggers=loggers,
)
It seems that this is a bad idea because I have this mistake:
dagster._core.errors.DagsterInvalidDefinitionError: Conflicting versions of resource with key 'airflow_db' were provided to different assets. When constructing a job, all resource definitions provided to assets must match by reference equality for a given key.
But I don't get why šJoe
02/08/2023, 5:59 PMAlexandre Guitton
02/08/2023, 6:15 PM[2023-02-08 18:11:00,419] {taskinstance.py:1025} INFO - Dependencies not met for <TaskInstance: wkf_github_api_rate.collect manual__2023-02-08T18:09:44.596652+00:00 [None]>, dependency 'Pool Slots Available' FAILED: ("Tasks using non-existent pool '%s' will not be scheduled", 'process_in_redshift')
[2023-02-08 18:11:00,424] {taskinstance.py:1226} WARNING -
--------------------------------------------------------------------------------
[2023-02-08 18:11:00,424] {taskinstance.py:1227} WARNING - Rescheduling due to concurrency limits reached at task runtime. Attempt 1 of 7. State set to NONE.
⢠If my job fails at a step, I can't restart over from the step that failed, because:
[2023-02-08 18:12:31,901] {taskinstance.py:1025} INFO - Dependencies not met for <TaskInstance: wkf_github_api_rate.collect manual__2023-02-08T18:09:44.596652+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'is_hourly'}
That's why I wanted to check the behavior of the asset in these cases.Joe
02/08/2023, 6:16 PMPools
models in your definitions file or modify migrated dags manually to use dagster native concurrency limits https://docs.dagster.io/deployment/run-coordinator#limiting-run-concurrencyAlexandre Guitton
02/08/2023, 6:29 PMJoe
02/08/2023, 6:31 PMAlexandre Guitton
02/08/2023, 6:33 PMJoe
03/10/2023, 6:10 PMAlexandre Guitton
03/11/2023, 10:24 AM[2023-03-24 14:03:47,310] {configuration.py:854} INFO - Reading the config from /root/airflow/airflow.cfg
[2023-03-24 14:03:47,323] {settings.py:52} INFO - Configured default timezone Timezone('UTC')
[2023-03-24 14:03:48,837] {taskinstance.py:1035} INFO - Dependencies all met for <TaskInstance: wkf_airflow_db_clean.clean_log_table manual__2023-03-24T14:03:27.339365+00:00 [None]>
[2023-03-24 14:03:48,917] {taskinstance.py:1025} INFO - Dependencies not met for <TaskInstance: wkf_airflow_db_clean.clean_log_table manual__2023-03-24T14:03:27.339365+00:00 [None]>, dependency 'Pool Slots Available' FAILED: ('Not scheduling since there are %s open slots in pool %s and require %s pool slots', 0, 'process_in_redshift', 1)
[2023-03-24 14:03:49,037] {taskinstance.py:1226} WARNING -
--------------------------------------------------------------------------------
[2023-03-24 14:03:49,037] {taskinstance.py:1227} WARNING - Rescheduling due to concurrency limits reached at task runtime. Attempt 1 of 7. State set to NONE.
[2023-03-24 14:03:49,037] {taskinstance.py:1234} WARNING -
--------------------------------------------------------------------------------
By the way, this is a very nice improvement, which opens up more interesting scenarios for a potential migration from Airflow š