Cameron Cyr
07/27/2023, 1:03 PMdbt_assets
decorator. It has been working great so far to run the dbt build
command. However, I also want to run dbt source freshness
before I run dbt build
. Has anyone tried this approach of running more than one dbt command as part of a function with the new dbt_assets
decorator?
Here is the code that I thought would have worked, but it errors out whenever I materialize my dbt assets:
@dbt_assets(manifest=MANIFEST_PATH, dagster_dbt_translator=CustomDagsterDbtTranslator())
def brzw_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
dbt.cli(["source freshness"]).stream()
yield from dbt.cli(["build"], context=context).stream()
Error is in the threadCameron Cyr
07/27/2023, 1:04 PMdagster._core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "brzw_dbt_assets":
File "/Users/ccyr/.local/share/virtualenvs/data-warehouse-iy-pu8AS/lib/python3.11/site-packages/dagster/_core/execution/plan/execute_plan.py", line 273, in dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/Users/ccyr/.local/share/virtualenvs/data-warehouse-iy-pu8AS/lib/python3.11/site-packages/dagster/_core/execution/plan/execute_step.py", line 369, in core_dagster_event_sequence_for_step
for user_event in check.generator(
File "/Users/ccyr/.local/share/virtualenvs/data-warehouse-iy-pu8AS/lib/python3.11/site-packages/dagster/_core/execution/plan/execute_step.py", line 90, in _step_output_error_checked_user_event_sequence
for user_event in user_event_sequence:
File "/Users/ccyr/.local/share/virtualenvs/data-warehouse-iy-pu8AS/lib/python3.11/site-packages/dagster/_core/execution/plan/compute.py", line 192, in execute_core_compute
for step_output in _yield_compute_results(step_context, inputs, compute_fn):
File "/Users/ccyr/.local/share/virtualenvs/data-warehouse-iy-pu8AS/lib/python3.11/site-packages/dagster/_core/execution/plan/compute.py", line 161, in _yield_compute_results
for event in iterate_with_context(
File "/Users/ccyr/.local/share/virtualenvs/data-warehouse-iy-pu8AS/lib/python3.11/site-packages/dagster/_utils/__init__.py", line 443, in iterate_with_context
with context_fn():
File "/opt/homebrew/Cellar/python@3.11/3.11.4/Frameworks/Python.framework/Versions/3.11/lib/python3.11/contextlib.py", line 155, in __exit__
self.gen.throw(typ, value, traceback)
File "/Users/ccyr/.local/share/virtualenvs/data-warehouse-iy-pu8AS/lib/python3.11/site-packages/dagster/_core/execution/plan/utils.py", line 84, in op_execution_error_boundary
raise error_cls(
The above exception was caused by the following exception:
FileExistsError: [Errno 17] File exists: '/Users/ccyr/brzwy/data-warehouse/dbt/target/brzw_dbt_assets-7a4f9c3-1690462329'
File "/Users/ccyr/.local/share/virtualenvs/data-warehouse-iy-pu8AS/lib/python3.11/site-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
yield
File "/Users/ccyr/.local/share/virtualenvs/data-warehouse-iy-pu8AS/lib/python3.11/site-packages/dagster/_utils/__init__.py", line 445, in iterate_with_context
next_output = next(iterator)
^^^^^^^^^^^^^^
File "/Users/ccyr/brzwy/data-warehouse/brzw_de/assets/dbt.py", line 19, in brzw_dbt_assets
yield from dbt.cli(["build"], context=context).stream()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/ccyr/.local/share/virtualenvs/data-warehouse-iy-pu8AS/lib/python3.11/site-packages/dagster_dbt/core/resources_v2.py", line 505, in cli
return DbtCliInvocation.run(
^^^^^^^^^^^^^^^^^^^^^
File "/Users/ccyr/.local/share/virtualenvs/data-warehouse-iy-pu8AS/lib/python3.11/site-packages/dagster_dbt/core/resources_v2.py", line 176, in run
partial_parse_destination_target_path.parent.mkdir(parents=True)
File "/opt/homebrew/Cellar/python@3.11/3.11.4/Frameworks/Python.framework/Versions/3.11/lib/python3.11/pathlib.py", line 1116, in mkdir
os.mkdir(self, mode)
Stephen Bailey
07/27/2023, 1:34 PMargs
should not have spaces, so change
dbt.cli(["source freshness"]).stream()
to
dbt.cli(["source", "freshness"]).stream()
Stephen Bailey
07/27/2023, 1:36 PMCameron Cyr
07/27/2023, 2:21 PM@dbt_assets(manifest=MANIFEST_PATH, dagster_dbt_translator=CustomDagsterDbtTranslator())
def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
for raw_event in dbt.cli(["source", "freshness"], manifest=MANIFEST_PATH).stream_raw_events():
<http://context.log.info|context.log.info>(raw_event)
yield from dbt.cli(["build"], context=context).stream()