https://dagster.io/ logo
#ask-ai
Title
# ask-ai
j

Joram Sbrisny

02/27/2024, 4:48 PM
@Scout: When ja load asset definitions from an yaml file, the asset only take the last values from the yaml. what could be wrong
daggy love 1
s

Scout

02/27/2024, 4:48 PM
When you're experiencing an issue where asset definitions loaded from a YAML file only take the last values, it could be due to a variety of reasons. Here are some steps you can take to debug the issue, based on the documentation: 1. Check the YAML File Structure: Ensure that the YAML file is structured correctly. YAML is sensitive to indentation, and incorrect indentation can lead to unexpected behavior. Each asset definition should be properly indented under the appropriate key. 2. Verify Asset Definitions: Make sure that the asset definitions in the YAML file are correctly specified. Each asset should have a unique key and the properties should be set according to the expected structure. 3. Review the Loading Logic: Look at the code that loads the YAML file and instantiates the asset definitions. Ensure that it's correctly parsing the YAML file and creating asset definitions for each entry rather than overwriting previous entries with the last one. 4. Check for Duplicate Keys: In YAML, having duplicate keys will result in the last value overriding the previous ones. Ensure that each asset has a unique key and that there are no duplicates that could cause earlier values to be overwritten. 5. Validate with Documentation: Cross-reference your YAML structure and loading logic with examples and guidelines provided in the documentation to ensure that you're following the recommended practices for defining and loading assets. 6. Logging and Error Messages: Add logging to your loading process to capture the values as they are read from the YAML file. This can help identify if and where values are being overwritten. Also, pay attention to any error messages or warnings that might indicate a problem with the YAML parsing or asset instantiation. If you've gone through these steps and are still facing issues, it might be helpful to provide a more detailed description of the problem, including the structure of your YAML file and the code you're using to load the asset definitions. This additional context could help in diagnosing the specific issue you're encountering.
j

Joram Sbrisny

02/27/2024, 4:52 PM
@Scout: here is the struktur of my yml:
Copy code
assets:
  - select: "tag:tag__vertriebsstruktur_ba"               #select
    op_name: "pba"                                        #prefix - pba_dbt
    dbt_parameter_key: ""
    dbt_parameter_value: ""
  - select: "tag:LV_HST"
    op_name: "lv_hst"
    dbt_parameter_key: ""
    dbt_parameter_value: ""
  - select: "tag:LV_ST"
    op_name: "lv_st"
    dbt_parameter_key: "V_STICHTAG"
    dbt_parameter_value: "20221231"
  - select: "tag:KOLUMBUS"
    op_name: "kol"
    dbt_parameter_key: ""
    dbt_parameter_value: "20221231"
and here is my code:
Copy code
try:
    from yaml import CLoader as Loader
except ImportError:
    from yaml import Loader

from dagster import AssetKey, asset
from dagster._core.pipes.subprocess import PipesSubprocessClient


def load_yaml(relative_path) -> Dict[str, Any]:
    path = os.path.join(os.path.dirname(__file__), relative_path)
    with open(path, "r", encoding="utf8") as ff:
        return yaml.load(ff, Loader=Loader)


def from_asset_entries(asset_entries: Dict[str, Any]) -> List[AssetsDefinition]:
    assets_defs = []

    for asset_entry in asset_entries["assets"]:
        select = asset_entry["select"]
        op_name = asset_entry["op_name"] + "_dbt"
        parameter_key = asset_entry["dbt_parameter_key"]
        parameter_value = str(asset_entry["dbt_parameter_value"])

        #dep_entries = asset_entry.get("deps", [])
        #description = asset_entry.get("description")
        #target_table = asset_entry.get("target_table")
        #prozess = asset_entry.get("prozess")
        #asset_key = AssetKey.from_user_string(asset_key_str)
        #deps = [AssetKey.from_user_string(dep_entry) for dep_entry in dep_entries]

        #sql = asset_entry["sql"]  # this is required
        ##parameter_value = "tt"
        @dbt_assets(manifest=dbt_manifest_path,name=op_name, select=select,
                    dagster_dbt_translator=CustomDagsterDbtTranslator())
        def _assets_def(context: OpExecutionContext,dbt: DbtCliResource):
            dbt_vars = {"V_STICHTAG": parameter_value}
            yield from dbt.cli(["build", "--vars", json.dumps(dbt_vars)],target_path=Path("target"), context=context).stream()
#run_results_json = dbt_build_invocation.get_artifact("run_results.json")
#AssetExecutionContext
        assets_defs.append(_assets_def)

    return assets_defs

def get_asset_dsl_example_defs() -> List[AssetsDefinition]:
    asset_entries = load_yaml("dbt_assets.yml")
    return from_asset_entries(asset_entries)
the problem is that when i materialized one asset he allway take the parameter_value from the last entry in the yml
s

Scout

02/27/2024, 4:52 PM
Based on the YAML structure and the code you've provided, it seems that the issue might be related to the way the
@dbt_assets
decorator is being used within a loop. When you define the
_assets_def
function inside the loop, it captures the
parameter_value
from the last iteration because of the way closures work in Python. This is a common pitfall known as a "late binding closure." To resolve this issue, you can modify the function definition to accept
parameter_value
as an argument, ensuring that each asset definition gets its own distinct value. Here's how you can adjust your code:
Copy code
python from functools import partial  @dbt_assets(manifest=dbt_manifest_path, name=op_name, select=select,             dagster_dbt_translator=CustomDagsterDbtTranslator()) def _assets_def(context: OpExecutionContext, dbt: DbtCliResource, parameter_value):     dbt_vars = {"V_STICHTAG": parameter_value}     yield from dbt.cli(["build", "--vars", json.dumps(dbt_vars)], target_path=Path("target"), context=context).stream()  for asset_entry in asset_entries["assets"]:     # ... (other code remains the same)     parameter_value = str(asset_entry["dbt_parameter_value"])     assets_defs.append(partial(_assets_def, parameter_value=parameter_value))
In this adjusted code,
partial
is used from the
functools
module to create a new function that has
parameter_value
pre-filled with the correct value for each asset. This way, each asset will use the correct
parameter_value
when it's materialized. Make sure to import
partial
at the beginning of your script:
Copy code
python from functools import partial
After making these changes, each asset should correctly use the
parameter_value
specified for it in the YAML file when it's materialized.
j

Joram Sbrisny

02/27/2024, 4:56 PM
@Scout: I get the following error: dagster._check.CheckError: Member of iterable mismatches type. Expected (<class 'dagster._core.definitions.assets.AssetsDefinition'>, <class 'dagster._core.definitions.source_asset.SourceAsset'>, <class 'dagster._core.definitions.cacheable_assets.CacheableAssetsDefinition'>). Got functools.partial(<dagster._core.definitions.assets.AssetsDefinition object at 0x00000214E51E5970>, parameter_value='') of type <class 'functools.partial'>.
s

Scout

02/27/2024, 4:56 PM
The error message you're encountering indicates that Dagster is expecting an
AssetsDefinition
,
SourceAsset
, or
CacheableAssetsDefinition
object, but it's receiving a
functools.partial
object instead. This is because the
partial
function does not return an
AssetsDefinition
object directly; it returns a callable that, when invoked, will return the desired
AssetsDefinition
. To resolve this issue, you need to ensure that the list returned by
get_asset_dsl_example_defs
contains instances of
AssetsDefinition
rather than
partial
objects. One way to do this is to invoke the
partial
objects to create the
AssetsDefinition
instances before adding them to the list. Here's how you can adjust your code:
Copy code
python # ... (other code remains the same)  def from_asset_entries(asset_entries: Dict[str, Any]) -> List[AssetsDefinition]:     assets_defs = []      for asset_entry in asset_entries["assets"]:         # ... (other code remains the same)         parameter_value = str(asset_entry["dbt_parameter_value"])          # Create a new function with parameter_value bound to it         def make_assets_def(parameter_value):             @dbt_assets(manifest=dbt_manifest_path, name=op_name, select=select,                         dagster_dbt_translator=CustomDagsterDbtTranslator())             def _assets_def(context: OpExecutionContext, dbt: DbtCliResource):                 dbt_vars = {"V_STICHTAG": parameter_value}                 yield from dbt.cli(["build", "--vars", json.dumps(dbt_vars)], target_path=Path("target"), context=context).stream()             return _assets_def          # Invoke the function to create an AssetsDefinition instance         assets_defs.append(make_assets_def(parameter_value))      return assets_defs  # ... (other code remains the same)
In this adjusted code,
make_assets_def
is a function that creates a new
_assets_def
function with the
parameter_value
bound to it. This function is then invoked to create an
AssetsDefinition
instance, which is appended to the
assets_defs
list. This approach should resolve the error and ensure that each asset definition is created with the correct
parameter_value
.
j

Joram Sbrisny

02/27/2024, 5:07 PM
@Scout: i get TypeError: reduce() of empty sequence with no initial value
s

Scout

02/27/2024, 5:07 PM
The error message
TypeError: reduce() of empty sequence with no initial value
suggests that you are using the
reduce
function from the
functools
module on an empty sequence without providing an initial value. However, based on the code snippets you've provided, there is no explicit use of
reduce
in the code. This error might be indirectly related to the way assets are being defined or materialized within Dagster. If Dagster internally uses
reduce
during the process of asset materialization, and if the list of assets is empty or not properly constructed, it could lead to this error. To debug this issue, you should ensure that: 1. The YAML file is being read correctly and that
asset_entries
contains the expected data. 2. The
from_asset_entries
function is being called and that it returns a non-empty list of
AssetsDefinition
objects. 3. There are no other parts of your code or Dagster's internal code that might be using
reduce
on an empty sequence. Here are some steps you can take to debug the issue: - Add logging or print statements to verify that the YAML file is loaded correctly and that
asset_entries
contains the expected list of assets. - Check that the
from_asset_entries
function is indeed being called and that it returns a non-empty list. - If you are using
reduce
elsewhere in your code, ensure that you provide an initial value when working with sequences that could potentially be empty. If you are still encountering issues, please provide more context or any additional code that might be relevant to the problem, especially if there's a use of
reduce
that we're not seeing in the provided snippets. This will help in diagnosing the specific cause of the error you're experiencing. Type error when passing
Nothing
as `da... button backfills page shows error if asset targ... button returning empty dataframes from assets/o... button Dagster Docs button Thank you for your feedback!