Ohad
01/27/2023, 4:27 AMyuhan
01/30/2023, 7:28 PMben
01/30/2023, 10:03 PM@asset(non_argument_deps={
AssetKey(["src_airbyte_table", "foobar"])
})
def transform_airbyte_tables():
subprocess.check_output(["sh", "my_file.sh"])
Such an asset could look something like thisOhad
01/30/2023, 10:13 PMben
01/30/2023, 10:13 PMOhad
01/30/2023, 10:43 PM@asset(non_argument_deps={
AssetKey(["src_airtable__absence_reasons"])
})
def transform_airbyte_tables(airbyte_airtable_test_assets):
subprocess.check_output(["sh", "grant_table_permissions.sh"])
ben
01/30/2023, 10:55 PMtransform_airbyte_tables
in your Dagster repository to have it appear in the UI and be executed, alongside the Airbyte/DBT assets - just want to make sure, is it in the repository?Ohad
01/30/2023, 11:10 PMben
01/30/2023, 11:11 PMairbyte_airtable_test_assets
)Ohad
01/30/2023, 11:15 PMben
01/30/2023, 11:20 PMAssetKey
we’re supplying to the asset doesn’t match the key of the Airbyte asset. If you click on the Airbyte asset in the UI (the top one in your screenshot), you should be able to see the asset key in the sidebar at the top. Here, for example, the asset key airbyte / cloud_prod / onboarding_checklsit
corresponds to AssetKey(["airbyte", "cloud_prod", "onboarding_checklist"])
airbyte_airtable_test_assets
)non_argument_deps
here is doing the work of telling Dagster that your new asset depends on the Airbyte asset - no need to also specify it as a parameterOhad
01/30/2023, 11:27 PMben
01/30/2023, 11:34 PMmymodel.sql
-- {{ source("transform_airbyte_tables", "transform_airbyte_tables") }}
Then, we’ll add a dummy entry to our sources file `sources.yml`:
- name: transform_airbyte_tables
tables:
- name: transform_airbyte_tables
Finally, we’ll need to tweak our asset to add a “key prefix”:
@asset(
key_prefix="transform_airbyte_tables",
non_argument_deps={
AssetKey(["src_airbyte_table", "foobar"])
}
)
def transform_airbyte_tables():
...
transform_airbyte_tables / transform_airbyte_tables
- it won’t affect the DBT compilation/run.
The change to the asset will just add a prefix, so that it’s at transform_airbyte_tables / transform_airbyte_tables
and matches up with what we told DBT.Ohad
01/30/2023, 11:44 PMtransform_airbyte_tables
script, and I got the following error. I know that Shell script is working because I got it working using a graph asset called grantTableAccess
-- {{ source("transform_airbyte_tables", "transform_airbyte_tables") }}
ben
01/30/2023, 11:57 PMOhad
01/30/2023, 11:58 PMben
01/31/2023, 12:04 AMOhad
01/31/2023, 12:20 AMrun
I get a path name error. I have tried a few variations of the path names, but I haven't found the correct one yet.ben
01/31/2023, 12:21 AMfile_relative_path
utility useful for that, e.g.
from dagster import file_relative_path
shell_script_location = file_relative_path(__file__, "../my_script.sh")
which resolves a path relative to the source fileOhad
01/31/2023, 12:23 AMben
01/31/2023, 12:29 AMfrom dagster import Permissive
from dagster import resource
from dagster_dbt.cli.constants import CLI_COMMON_FLAGS_CONFIG_SCHEMA, CLI_COMMON_OPTIONS_CONFIG_SCHEMA
from dagster_dbt.cli.resources import DbtCliResource
from dagster_dbt.cli.types import DbtCliOutput
class UpdatePermissionsDBTCliResource(DbtCliResource):
def run(
self,
*args,
**kwargs,
) -> DbtCliOutput:
# Execute shell script here
super().run(*args, **kwargs)
@resource(
config_schema=Permissive(
{
k.replace("-", "_"): v
for k, v in dict(
**CLI_COMMON_FLAGS_CONFIG_SCHEMA, **CLI_COMMON_OPTIONS_CONFIG_SCHEMA
).items()
}
)
)
def update_permissions_dbt_cli_resource(context) -> UpdatePermissionsDBTCliResource:
"""This resource issues dbt CLI commands against a configured dbt project."""
# set of options in the config schema that are not flags
non_flag_options = {k.replace("-", "_") for k in CLI_COMMON_OPTIONS_CONFIG_SCHEMA}
# all config options that are intended to be used as flags for dbt commands
default_flags = {k: v for k, v in context.resource_config.items() if k not in non_flag_options}
return UpdatePermissionsDBTCliResource(
executable=context.resource_config["dbt_executable"],
default_flags=default_flags,
warn_error=context.resource_config["warn_error"],
ignore_handled_error=context.resource_config["ignore_handled_error"],
target_path=context.resource_config["target_path"],
logger=context.log,
docs_url=context.resource_config.get("docs_url"),
capture_logs=context.resource_config["capture_logs"],
json_log_format=context.resource_config["json_log_format"],
)
...
Definitions(
resources={
"dbt": update_permissions_dbt_cli_resource.configured(...)
}
)
Ohad
01/31/2023, 12:52 AMben
01/31/2023, 1:07 AMOhad
01/31/2023, 1:08 AMben
01/31/2023, 5:59 PM