https://dagster.io/ logo
#ask-community
Title
# ask-community
k

Konrad Schlatte

07/18/2022, 3:15 PM
Hi, I am looking to model a consecutive run of a sync job and a dbt run has anyone got an example of such a process. I tried to model it with a nothing dependency like that but it doesn't execute the dbt run. the dbt run op works as part of a job but not as part of the graph
Copy code
@op(ins={"start": In(Nothing)})
def dbt_rpc_run_op():
    dbt_rpc_run

sync_salesforce = airbyte_sync_op.configured(
    {"connection_id": "c1a5f"},
    name="salesforce_stronger_nudge"
)
Copy code
@graph
def run_dbt_after_airbyte():
    dbt_rpc_run_op(start=sync_salesforce())

@job(resource_defs={"airbyte":new_airbyte_resource, "dbt_rpc": test_dbt_rpc_resource})
def run_airbyte_and_then_dbt():
    run_dbt_after_airbyte()
j

jamie

07/18/2022, 4:08 PM
cc @owen
o

owen

07/18/2022, 5:21 PM
hi @Konrad Schlatte -- I think what you want here would be the built-in
dbt_run_op
, which works with either the dbt CLI or dbt RPC (depending on what kind of resource you pass in). This op has a
start_after
Nothing-type argument, so that would look like
Copy code
@graph
def run_dbt_after_airbyte():
    dbt_run_op(start_after=sync_salesforce)
also, if these are the only two ops in your job, it might be easier to just skip creating the graph, and have :
Copy code
@job(resource_defs={"airbyte":new_airbyte_resource, "dbt_rpc": test_dbt_rpc_resource})
def run_airbyte_and_then_dbt():
    dbt_run_op(start_after=sync_salesforce)
k

Konrad Schlatte

07/18/2022, 5:43 PM
ok thanks @owen I suppose this works also with the
dbt_rpc_run
? I had problems with running dbt with
dbt_run_op
o

owen

07/18/2022, 5:47 PM
hm what sort of issues were your running into? in theory,
dbt_rpc_run
and
dbt_run_op
do essentially the same thing if they're provided with a
dbt_rpc_sync_resource
. I would expect the following to work as well:
Copy code
@job(resource_defs={"airbyte":new_airbyte_resource, "dbt_rpc": test_dbt_rpc_resource})
def run_airbyte_and_then_dbt():
    dbt_rpc_run(start_after=sync_salesforce)
but would recommend the former
ah and I noticed I mis-named the resource in the first example
it should be:
Copy code
@job(resource_defs={"airbyte":new_airbyte_resource, "dbt": test_dbt_rpc_resource})
(in the example, it was
"dbt_rpc": ...
, not
"dbt": ...
)
k

Konrad Schlatte

07/18/2022, 6:04 PM
yes the problem is that dbt_run_op fails silently without creating the transformation dbt_rpc_run fails with
Copy code
File "/home/ec2-user/dagster/dagster-test/orchestrator_poc/.venv/lib64/python3.7/site-packages/dagster/core/execution/plan/compute_generator.py", line 66, in _coerce_solid_compute_fn_to_iterator
o

owen

07/18/2022, 6:14 PM
hm yeah I'm not sure exactly what would be causing that second error, but
dbt_rpc_run
is a Solid (the legacy version of the current Op), so I could imagine there being some compatibility issues when you try to use it with the newer APIs. as for the silent failing issue, does the step itself fail inside Dagster with no error message, or does it succeed and just not do anything? and how is
test_dbt_rpc_resource
being constructed? is it a configured
dbt_rpc_sync_resource
, or something else?
k

Konrad Schlatte

07/18/2022, 6:23 PM
dbt_run_op runs green in dagster but doesn't do anything in snowflake
Copy code
test_dbt_rpc_resource = dbt_rpc_resource.configured({
    "host": { "env": "DBT_HOST" },
    "port": { "env": "DBT_PORT" }
})
o

owen

07/18/2022, 6:31 PM
gotcha -- if you do
dbt_rpc_sync_resource
(instead of
dbt_rpc_resource
), what happens? the
sync
resource fires off commands and polls until they complete, whereas the regular one just initiates the command
k

Konrad Schlatte

07/18/2022, 6:53 PM
dagster._check.CheckError: Expected non-None value: None
o

owen

07/18/2022, 6:55 PM
gotcha -- would you mind sharing the trace as well?
k

Konrad Schlatte

07/18/2022, 6:56 PM
Copy code
run_airbyte_and_then_dbt - 77fa0f23-2924-4ce3-9a09-0b78280a5a01 - 13996 - dbt_rpc_run - STEP_FAILURE - Execution of step "dbt_rpc_run" failed.

dagster.core.errors.DagsterExecutionStepExecutionError: Error occurred while executing solid "dbt_rpc_run"::

dagster._check.CheckError: Expected non-None value: None

Stack Trace:
  File "/home/ec2-user/dagster/dagster-test/orchestrator_poc/.venv/lib64/python3.7/site-packages/dagster/core/execution/plan/utils.py", line 47, in solid_execution_error_boundary
    yield
  File "/home/ec2-user/dagster/dagster-test/orchestrator_poc/.venv/lib64/python3.7/site-packages/dagster/utils/__init__.py", line 406, in iterate_with_context
    next_output = next(iterator)
  File "/home/ec2-user/dagster/dagster-test/orchestrator_poc/.venv/lib64/python3.7/site-packages/dagster/core/execution/plan/compute_generator.py", line 66, in _coerce_solid_compute_fn_to_iterator
    result = fn(context, **kwargs) if context_arg_provided else fn(**kwargs)
  File "/home/ec2-user/dagster/dagster-test/orchestrator_poc/.venv/lib64/python3.7/site-packages/dagster_dbt/rpc/solids.py", line 154, in dbt_rpc_run
    models=context.solid_config["models"], exclude=context.solid_config["exclude"]
  File "/home/ec2-user/dagster/dagster-test/orchestrator_poc/.venv/lib64/python3.7/site-packages/dagster_dbt/rpc/resources.py", line 292, in run
    return self._get_result(data=json.dumps(data))
  File "/home/ec2-user/dagster/dagster-test/orchestrator_poc/.venv/lib64/python3.7/site-packages/dagster_dbt/rpc/resources.py", line 526, in _get_result
    request_token: str = check.not_none(out.result.get("request_token"))
  File "/home/ec2-user/dagster/dagster-test/orchestrator_poc/.venv/lib64/python3.7/site-packages/dagster/_check/__init__.py", line 955, in not_none
    raise CheckError(f"Expected non-None value: {additional_message}")
o

owen

07/18/2022, 6:58 PM
what version of dbt-rpc are you running on? it's definitely possible that we have a compatibility issue with newer versions of dbt-rpc
k

Konrad Schlatte

07/18/2022, 7:00 PM
ok just downloaded the most recent one 0.1.2
what version do you recommend?
o

owen

07/18/2022, 7:05 PM
I'm going to check to see if i can replicate this issue and get back to you (I believe this library was created when the rpc commands were built into the dbt package itself, so something might have changed then)
k

Konrad Schlatte

07/18/2022, 7:19 PM
ok great - thanks!
o

owen

07/18/2022, 7:25 PM
ok that version of the dbt rpc library should work as far as I can see, and I think what's happening might be a result of the legacy
dbt_rpc_run
incorrectly invoking the proper method. If you use the standard
dbt_run_op
with the
dbt_rpc_sync_resource
, i.e.:
Copy code
test_dbt_rpc_resource = dbt_rpc_sync_resource.configured({
    "host": { "env": "DBT_HOST" },
    "port": { "env": "DBT_PORT" }
})

@job(resource_defs={"airbyte":new_airbyte_resource, "dbt": test_dbt_rpc_resource})
def run_airbyte_and_then_dbt():
    dbt_run_op(start_after=sync_salesforce)
then I think you should be able to avoid that error, but let me know if that's not the case
k

Konrad Schlatte

07/18/2022, 8:34 PM
unfortunately gives me the same error as above
oh it worked now after restarting the server 👍
🤖 1