Joram Sbrisny
02/27/2024, 3:42 PMScout
02/27/2024, 3:42 PMdbt_assets
in Dagster. This is typically done by defining a custom configuration schema for your dbt_assets
and then using that configuration within the asset function to pass variables to dbt commands. Here's an example from the documentation that demonstrates how to add a custom configuration to do a `--full-refresh`: python from pathlib import Path from dagster import AssetExecutionContext, Config from dagster_dbt import DbtCliResource, dbt_assets class MyDbtConfig(Config): full_refresh: bool @dbt_assets(manifest=Path("target", "manifest.json")) def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource, config: MyDbtConfig): dbt_build_args = ["build"] if config.full_refresh: dbt_build_args += ["--full-refresh"] yield from dbt.cli(dbt_build_args, context=context).stream()
In this example, MyDbtConfig
is a custom configuration class that includes a full_refresh
boolean variable. This variable is then used within the my_dbt_assets
function to determine whether to add the --full-refresh
flag to the dbt CLI command. You can extend this pattern to include other variables that you might want to pass to dbt, such as variables for dbt's --vars
flag. Remember to handle the configuration within your dbt_assets
function accordingly.Joram Sbrisny
02/27/2024, 3:54 PMScout
02/27/2024, 3:54 PMdbt_assets
or any other Dagster asset or op is through the use of configuration classes or dictionaries that define the configuration schema. This is because Dagster's asset and op definitions are designed to be explicit about their dependencies and configurations, which allows for better tracking, validation, and management of the execution environment. However, if you want to pass simple Python variables to dbt_assets
without defining a configuration class, you would typically do this by setting up resources or using the context object to pass parameters. Here's an example of how you might use resources to pass variables to `dbt_assets`: python from dagster import resource, AssetExecutionContext from dagster_dbt import dbt_cli_resource, dbt_assets # Define a resource that holds the variables you want to pass @resource def dbt_vars_resource(_): return {"my_variable": "value"} # Set up the dbt CLI resource with your custom resource @dbt_assets( dbt_cli_resource=dbt_cli_resource.configured({"vars": {"my_variable": "value"}}) ) def my_dbt_assets(context: AssetExecutionContext): # Access the resource within your asset dbt_vars = context.resources.dbt_vars_resource # Use the variables as needed in your dbt commands yield from context.resources.dbt.cli(["run", "--vars", dbt_vars]).stream()
In this example, the dbt_vars_resource
is a simple resource that returns a dictionary of variables. These variables are then passed to the dbt_cli_resource
through the .configured
method, which allows you to inject configuration at runtime. Please note that while it's possible to pass variables without a configuration class, using explicit configuration classes or dictionaries is the preferred and more robust method in Dagster, as it provides clear contracts for what configuration is expected and allows for validation and introspection of the configuration values.Joram Sbrisny
02/27/2024, 3:58 PMparameter_value = "tt"
@dbt_assets(manifest=dbt_manifest_path,name=op_name, select=select,
dagster_dbt_translator=CustomDagsterDbtTranslator())
def _assets_def(context: OpExecutionContext,dbt: DbtCliResource, parameter_value):
dbt_vars = {"V_STICHTAG": f"{parameter_value}"}
yield from dbt.cli(["build", "--vars", json.dumps(dbt_vars)],target_path=Path("target"), context=context).stream()
Scout
02/27/2024, 3:58 PMparameter_value
variable is defined outside the scope of the _assets_def
function and should not be included as a parameter in the function definition. Instead, you should use it directly within the function body. 2. The OpExecutionContext
class is not the correct context type for an asset definition. You should use AssetExecutionContext
instead. 3. The manifest
and select
parameters in the @dbt_assets
decorator should be provided with actual values or variables that hold the path to the manifest file and the selection criteria for dbt, respectively. 4. The name
parameter in the @dbt_assets
decorator should be provided with a string that represents the name of the operation, but op_name
is not defined in the provided snippet. 5. The json.dumps(dbt_vars)
should be converted to a string before being passed to the dbt.cli
command. 6. The target_path
parameter is not necessary when invoking dbt.cli
as it is already specified in the dbt project configuration. Here's a revised version of the code snippet that addresses these issues: python import json from pathlib import Path from dagster import AssetExecutionContext from dagster_dbt import dbt_assets, DbtCliResource # Assuming 'dbt_manifest_path' and 'select' are defined elsewhere in your code # and 'CustomDagsterDbtTranslator' is a properly defined class parameter_value = "tt" @dbt_assets( manifest=dbt_manifest_path, name="op_name", # Replace "op_name" with the actual name of the operation select=select, dagster_dbt_translator=CustomDagsterDbtTranslator() ) def _assets_def(context: AssetExecutionContext, dbt: DbtCliResource): dbt_vars = {"V_STICHTAG": parameter_value} yield from dbt.cli( ["build", "--vars", json.dumps(dbt_vars)], context=context ).stream()
Make sure to replace "op_name"
with the actual name you want to give to the operation and ensure that dbt_manifest_path
and select
are defined and hold the correct values before using them in the decorator. Additionally, ensure that CustomDagsterDbtTranslator
is defined correctly and imported if it's in a different module.