Hey everyone! :dagsir: I'm using the most recent ...
# integration-dbt
c
Hey everyone! dagsir I'm using the most recent release of Dagster so that I could migrate all of our dbt assets to use the
dbt_assets
decorator. It has been working great so far to run the
dbt build
command. However, I also want to run
dbt source freshness
before I run
dbt build
. Has anyone tried this approach of running more than one dbt command as part of a function with the new
dbt_assets
decorator? Here is the code that I thought would have worked, but it errors out whenever I materialize my dbt assets:
Copy code
@dbt_assets(manifest=MANIFEST_PATH, dagster_dbt_translator=CustomDagsterDbtTranslator())
def brzw_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
    dbt.cli(["source freshness"]).stream()
    yield from dbt.cli(["build"], context=context).stream()
Error is in the thread
dagster bot responded by community 1
Error:
Copy code
dagster._core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "brzw_dbt_assets":

  File "/Users/ccyr/.local/share/virtualenvs/data-warehouse-iy-pu8AS/lib/python3.11/site-packages/dagster/_core/execution/plan/execute_plan.py", line 273, in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/Users/ccyr/.local/share/virtualenvs/data-warehouse-iy-pu8AS/lib/python3.11/site-packages/dagster/_core/execution/plan/execute_step.py", line 369, in core_dagster_event_sequence_for_step
    for user_event in check.generator(
  File "/Users/ccyr/.local/share/virtualenvs/data-warehouse-iy-pu8AS/lib/python3.11/site-packages/dagster/_core/execution/plan/execute_step.py", line 90, in _step_output_error_checked_user_event_sequence
    for user_event in user_event_sequence:
  File "/Users/ccyr/.local/share/virtualenvs/data-warehouse-iy-pu8AS/lib/python3.11/site-packages/dagster/_core/execution/plan/compute.py", line 192, in execute_core_compute
    for step_output in _yield_compute_results(step_context, inputs, compute_fn):
  File "/Users/ccyr/.local/share/virtualenvs/data-warehouse-iy-pu8AS/lib/python3.11/site-packages/dagster/_core/execution/plan/compute.py", line 161, in _yield_compute_results
    for event in iterate_with_context(
  File "/Users/ccyr/.local/share/virtualenvs/data-warehouse-iy-pu8AS/lib/python3.11/site-packages/dagster/_utils/__init__.py", line 443, in iterate_with_context
    with context_fn():
  File "/opt/homebrew/Cellar/python@3.11/3.11.4/Frameworks/Python.framework/Versions/3.11/lib/python3.11/contextlib.py", line 155, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/Users/ccyr/.local/share/virtualenvs/data-warehouse-iy-pu8AS/lib/python3.11/site-packages/dagster/_core/execution/plan/utils.py", line 84, in op_execution_error_boundary
    raise error_cls(

The above exception was caused by the following exception:
FileExistsError: [Errno 17] File exists: '/Users/ccyr/brzwy/data-warehouse/dbt/target/brzw_dbt_assets-7a4f9c3-1690462329'

  File "/Users/ccyr/.local/share/virtualenvs/data-warehouse-iy-pu8AS/lib/python3.11/site-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
    yield
  File "/Users/ccyr/.local/share/virtualenvs/data-warehouse-iy-pu8AS/lib/python3.11/site-packages/dagster/_utils/__init__.py", line 445, in iterate_with_context
    next_output = next(iterator)
                  ^^^^^^^^^^^^^^
  File "/Users/ccyr/brzwy/data-warehouse/brzw_de/assets/dbt.py", line 19, in brzw_dbt_assets
    yield from dbt.cli(["build"], context=context).stream()
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ccyr/.local/share/virtualenvs/data-warehouse-iy-pu8AS/lib/python3.11/site-packages/dagster_dbt/core/resources_v2.py", line 505, in cli
    return DbtCliInvocation.run(
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ccyr/.local/share/virtualenvs/data-warehouse-iy-pu8AS/lib/python3.11/site-packages/dagster_dbt/core/resources_v2.py", line 176, in run
    partial_parse_destination_target_path.parent.mkdir(parents=True)
  File "/opt/homebrew/Cellar/python@3.11/3.11.4/Frameworks/Python.framework/Versions/3.11/lib/python3.11/pathlib.py", line 1116, in mkdir
    os.mkdir(self, mode)
s
first thing i see is that the individual values in
args
should not have spaces, so change
Copy code
dbt.cli(["source freshness"]).stream()
to
Copy code
dbt.cli(["source", "freshness"]).stream()
facepalmcry 1
second thing is that you may not get any events from this (not sure -- maybe this will yield AssetObservation events), and you might need to log / handle the results individually, like here https://dagster.slack.com/archives/C04CW71AGBW/p1690165244256119?thread_ts=1690162823.644269&cid=C04CW71AGBW
👀 1
c
@Stephen Bailey -- Amazing thank you. I was able to get I needed with your examples 🚀 For anyone else who might by interested in running source freshness checks before your build/run commands here is what worked for me:
Copy code
@dbt_assets(manifest=MANIFEST_PATH, dagster_dbt_translator=CustomDagsterDbtTranslator())
def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
    for raw_event in dbt.cli(["source", "freshness"], manifest=MANIFEST_PATH).stream_raw_events():
        <http://context.log.info|context.log.info>(raw_event)
    yield from dbt.cli(["build"], context=context).stream()
party pikachu 3