https://dagster.io/ logo
Title
m

Makoto

05/18/2021, 6:39 PM
Hi. I’m getting stumped on something that seems simple or maybe I am just getting confused. I have the following 2 dbt solids, one that only runs changed models and the other runs all models. :
state_modified_config_in_test = {"project-dir": PROJECT_DIR, "models": ["state:modified"], "target": "test"}
run_only_changed_models_in_test = dbt_cli_run.configured(state_modified_config_in_test,
                                                        name="run_only_changed_models_in_test")

run_all_config_in_test = {"project-dir": PROJECT_DIR, "target": "test"}
run_all_models_in_test = dbt_cli_run.configured(run_all_config_in_test, name="run_all_models_in_test")
I want to be able to conditionally run them depending on if the
run
directory exists under the target directory. I read up on conditional branching but what I am not sure how to do is to conditionally invoke one of the dbt solids. I tried using a composite solid to pass in the yielded output but since the dbt solid does not take any input argument, I get the unused input error for the composite solid. Is there a way to work around it or a better way to achieve it?
Here’s what I thought I could do:
...
@solid(
    input_defs=[InputDefinition("start", Nothing)],
    output_defs=[OutputDefinition(name="only_modified", is_requried=False),
                 OutputDefinition(name="all_models", is_requried=False)]
)
def determine_test_dbt_solid_run(context):
    <http://context.log.info|context.log.info>("Verifying test target directory exists...")
    target_dir = f"{PROJECT_DIR}/target_test/run"
    if os.path.isdir(target_dir):
        yield Output(True, "only_modified")
    else:
        yield Output(True, "all_models")


@composite_solid
def run_modified_only(_):
    return run_only_changed_models_in_test()


@composite_solid()
def run_all_models(_):
    return run_all_models_in_test()


@pipeline
def deploy_dbt_model_pipeline():
    only_modified, all_models = determine_test_dbt_solid_run(
                                    copy_target_db_for_testing(
                                        drop_test_db()
                                    )
                                )
    # conditional branching
    run_modified_only(only_modified)
    run_all_changed(all_models)
I hope what I am trying to achieve makes sense 🙏
a

alex

05/18/2021, 7:35 PM
dbt_cli_run
does have a
start_after
Nothing
input - if you hook up the dependency to that you should get the behavior you desire
m

Makoto

05/18/2021, 8:09 PM
@alex Ah thanks! I should’ve done more splunking. I got a little further now. I have a custom target directory specified in dbt project config so it fails at
parse_run_results()
in
dagster_dbt/cli/utils.py
, since the target directory name is hard coded. I’ll try to think of a way to work around it until the implementation is changed.
a

alex

05/18/2021, 8:11 PM
cc @owen who has been working a bit on our dbt lib
o

owen

05/18/2021, 8:28 PM
Hey @Makoto! would love to learn a bit more about your use case (I am indeed looking into making some changes to the dbt integration, and you're not the only one to run into some of these edges). 2 specific questions: are you planning on using the outputs of
run_modified_only
/
run_all_models
in any downstream solids in the future, and do you care about the asset materializations that are produced from this solid? My first thought is that you might be better off not using the
dbt_cli_run
solid at all in this case (I don’t see a clean way of getting around that parse_results() issue you noted). For now, you could instead just create your own solid that invokes
execute_cli
directly (https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-dbt/dagster_dbt/cli/utils.py#L16).
for reference, the implementation of dbt_cli_run (https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-dbt/dagster_dbt/cli/solids.py#L179) is fairly simple, and mostly just calls execute_cli, and then some helper methods to parse the results / generate materializations. if you don't care about outputting the results / generating those materializations, these bits can be omitted. Reimplementing the solid is definitely not the ideal user experience (working on making it nicer 😄) but it might unblock you for the time being
m

Makoto

05/18/2021, 9:13 PM
Hi @owen. Nice to meet you virtually. 😃 Thanks for the tips. Aside from CLI outputs and exit code to see if it failed/succeeded, I don’t think I’ll be using the outputs downstream. Just to give you a context, I am trying to create a blue/green-ish deployment pipeline where I run dbt and great_expectation against a cloned copy of target database, and if they all pass then run the same steps against the target database. In addition, I’m using their
state
method (link to doc), which resulted me to create separate directories for default and test targets so that I can keep things separate. In my dbt project yml, I have target-path set to:
target-path: target_{{ target.name }}
I’m still somewhat new to dagster and not super familiar with asset materializations, but I read up on it real quick and I don’t think I will be using it for now. I think you are right that I should go with
execute_cli
for now. Thanks for link. Just thinking out loud… It seems like we could add another config key value pair for overwriting the target path for config dictionary for dbt solid. 🤔 That seems like a simple/safe change. Happy to collaborate with you to author a PR.
So basically
def parse_run_results(path: str)
would take another argument, so it looks something like:
def parse_run_results(path: str, target_dir="target")
and in solids.py, we can call it like:
target_dir = "target"

if context.solid_config["target-dir"]:
  target_dir = context.solid_config["target-dir"]

run_results = parse_run_results(context.solid_config["project-dir"], target_dir)
maybe?
Also while thinking of the above I noticed that
parse_run_results
is also being used in
dbt_cli_test
solid, which I use so I’m motived to get the changes in 😄 . Let me know what you think.
o

owen

05/18/2021, 10:30 PM
Nice to meet you as well! Your proposal definitely makes sense, would work, and would be a clear improvement (plus we always appreciate community contributions 🙂). If you were to go down this route, I would recommend adding the option to the CLI_COMMON_OPTIONS_CONFIG_SCHEMA (https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-dbt/dagster_dbt/cli/solids.py#L74) to avoid double-specifying it for dbt_cli_run/dbt_cli_test. For more context on the things I’m thinking about, ideally, I want to work towards a world where it’s easier to construct dbt solids that have the exact functionality that you need (something like a solid factory where you could pass in functions to substitute certain behaviors). Right now, you’re sort of forced down this route of specifying the target directory, when it’s actually not necessary for what you’re trying to accomplish (I don't think the contents of run_results.json are required to see if your tests passed, so you'd really only be specifying that option to avoid the error you're seeing).
😛artydagster: 1