Hi all! I have a shell script that assigns Select ...
# ask-community
o
Hi all! I have a shell script that assigns Select permissions to the tables in my destination. It supposes to execute immediately after the Airtable sync is complete. Currently, the AirByte graph asset is connected directly to the dbt model. Which fails due to a lack of permissions after the sync. How should I link the Airbyte sources to the shell script, and connect the dbt model to it?
Hi @Dagster Jarred and @Tim Castillo, any chance you could look into this question, please? I could not find any examples for this scenario. Thanks.
y
cc @ben mind taking a look?
đź‘Ť 1
🙏 1
b
Hi Ohad, currently I think the best approach is to create a Python asset which runs your shell script, and locate it in between your airbyte and dbt assets.
Copy code
@asset(non_argument_deps={
    AssetKey(["src_airbyte_table", "foobar"])
})
def transform_airbyte_tables():
    subprocess.check_output(["sh", "my_file.sh"])
Such an asset could look something like this
🙏 1
One other approach would be to subclass either the DBT resource or Airbyte resource to include your transformation logic in between the two runs
o
Thank you @ben I will give it a go.
b
Happy to elaborate more on either approach, let me know if I can help!
o
Hi @ben, I tried to add the following code. But I could not see any new link created on the UI. After executing these assets, I've inspected the logs, and it does not appear that the shell script was executed.
Copy code
@asset(non_argument_deps={
    AssetKey(["src_airtable__absence_reasons"])
})
def transform_airbyte_tables(airbyte_airtable_test_assets):
    subprocess.check_output(["sh", "grant_table_permissions.sh"])
b
Hi Ohad, you will need to include
transform_airbyte_tables
in your Dagster repository to have it appear in the UI and be executed, alongside the Airbyte/DBT assets - just want to make sure, is it in the repository?
if so you should be able to see it in the assets page in the UI
o
Sorry, but I'm still learning Dagster. How can I add it to my Repository? I've tried to add it to my assets definition, but I got an error.
b
What error are you seeing? that looks like the right place to put it to me (in the list after
airbyte_airtable_test_assets
)
o
I'm getting this error:
b
I see, it looks like the
AssetKey
we’re supplying to the asset doesn’t match the key of the Airbyte asset. If you click on the Airbyte asset in the UI (the top one in your screenshot), you should be able to see the asset key in the sidebar at the top. Here, for example, the asset key
airbyte / cloud_prod / onboarding_checklsit
corresponds to
AssetKey(["airbyte", "cloud_prod", "onboarding_checklist"])
Oh, on second glance, I also see that you have a parameter for your function - try removing that (
airbyte_airtable_test_assets
)
The
non_argument_deps
here is doing the work of telling Dagster that your new asset depends on the Airbyte asset - no need to also specify it as a parameter
đź‘Ť 1
o
Thank you Ben 🙏 I think we are getting closer 🙂, this time, it didn't link the Shell with the dbt asset.
b
Great! This might be a little tricky so bear with me - DBT assets determine their upstream assets by parsing the model SQL files. What we’ll want to do is add in another dependency to your model through a commented-out line: In
mymodel.sql
Copy code
-- {{ source("transform_airbyte_tables", "transform_airbyte_tables") }}
Then, we’ll add a dummy entry to our sources file `sources.yml`:
Copy code
- name: transform_airbyte_tables
    tables:
      - name: transform_airbyte_tables
Finally, we’ll need to tweak our asset to add a “key prefix”:
Copy code
@asset(
    key_prefix="transform_airbyte_tables",
    non_argument_deps={
        AssetKey(["src_airbyte_table", "foobar"])
    }
)
def transform_airbyte_tables():
    ...
What this will do is tell DBT to add a dependency to the asset
transform_airbyte_tables / transform_airbyte_tables
- it won’t affect the DBT compilation/run. The change to the asset will just add a prefix, so that it’s at
transform_airbyte_tables / transform_airbyte_tables
and matches up with what we told DBT.
o
Yeah, that makes perfect sense. I had to do something similar to this a few times before. I'll try to set it up. But, one more thing I need your help with. I tried to execute
transform_airbyte_tables
script, and I got the following error. I know that Shell script is working because I got it working using a graph asset called
grantTableAccess
Sorry, one more thing. This is just a toy example. In my production use case, I'd like to run the shell script immediately after the Airbyte sync finish and before all the other downstream dbt models start. In this scenario, would I still need to go and update all my staging models (more than 100) and add the comment for the shell script
Copy code
-- {{ source("transform_airbyte_tables", "transform_airbyte_tables") }}
My dagster project looks like this
I've updated my model as you explained above, and it looks like this now
b
That looks good, for this toy case
o
Yes, that's right, thanks for that! How should I go about resolving this issue for the other use case? Should I add the comment to all my staging scripts?
b
For the production use case there’s probably a better solution (though that would certainly work)
Let me see if I can put something together
o
Thank you Ben! Re the error that I am having with executing the shell script, I believe it has to do with the file path. Because when I change `check_output`to
run
I get a path name error. I have tried a few variations of the path names, but I haven't found the correct one yet.
b
You might find the
file_relative_path
utility useful for that, e.g.
Copy code
from dagster import file_relative_path
shell_script_location = file_relative_path(__file__, "../my_script.sh")
which resolves a path relative to the source file
đź‘Ť 1
o
That's a great idea. I also found that the script is working if I use the full path name.
Thanks again Ben for this file path suggestion, I manage to get it working.
b
Great!
Here is a different approach which might be more useful for your production use-case. It relies on modifying the DBT resource to run your shell script before each execution (meaning you don’t need that Python asset). It’s a bit more boilerplate but lets you avoid modifying all your model files
Copy code
from dagster import Permissive
from dagster import resource
from dagster_dbt.cli.constants import CLI_COMMON_FLAGS_CONFIG_SCHEMA, CLI_COMMON_OPTIONS_CONFIG_SCHEMA
from dagster_dbt.cli.resources import DbtCliResource
from dagster_dbt.cli.types import DbtCliOutput

class UpdatePermissionsDBTCliResource(DbtCliResource):
    def run(
        self,
        *args,
        **kwargs,
    ) -> DbtCliOutput:
        # Execute shell script here
        super().run(*args, **kwargs)


@resource(
    config_schema=Permissive(
        {
            k.replace("-", "_"): v
            for k, v in dict(
                **CLI_COMMON_FLAGS_CONFIG_SCHEMA, **CLI_COMMON_OPTIONS_CONFIG_SCHEMA
            ).items()
        }
    )
)
def update_permissions_dbt_cli_resource(context) -> UpdatePermissionsDBTCliResource:
    """This resource issues dbt CLI commands against a configured dbt project."""
    # set of options in the config schema that are not flags
    non_flag_options = {k.replace("-", "_") for k in CLI_COMMON_OPTIONS_CONFIG_SCHEMA}
    # all config options that are intended to be used as flags for dbt commands
    default_flags = {k: v for k, v in context.resource_config.items() if k not in non_flag_options}
    return UpdatePermissionsDBTCliResource(
        executable=context.resource_config["dbt_executable"],
        default_flags=default_flags,
        warn_error=context.resource_config["warn_error"],
        ignore_handled_error=context.resource_config["ignore_handled_error"],
        target_path=context.resource_config["target_path"],
        logger=context.log,
        docs_url=context.resource_config.get("docs_url"),
        capture_logs=context.resource_config["capture_logs"],
        json_log_format=context.resource_config["json_log_format"],
    )



...

Definitions(
    resources={
        "dbt": update_permissions_dbt_cli_resource.configured(...)
    }
)
âś… 1
In the custom resource class at the top, you can see there’s a place to insert some code to run before each DBT invocation - in this case, to execute a shell script
o
Thank you @ben for putting this together. I have removed all dbt model changes and the previous python function. And I've implemented the code you provided, but now it seems we are back to Airbyte node connected to dbt node.
Nice! It did work! I executed the run and I can see the shell script is running before the dbt model!! 🙌
b
fantastic! yup, this will not show up in the asset graph (it’s baked into the dbt steps) but should scale up a lot better with more models (e.g. in your production case)
D 1
o
Awesome! Thank you so much again for your help!!
b
Of course! Happy to help in case you run into anything else