Joram Sbrisny
02/27/2024, 4:48 PMScout
02/27/2024, 4:48 PMJoram Sbrisny
02/27/2024, 4:52 PMassets:
- select: "tag:tag__vertriebsstruktur_ba" #select
op_name: "pba" #prefix - pba_dbt
dbt_parameter_key: ""
dbt_parameter_value: ""
- select: "tag:LV_HST"
op_name: "lv_hst"
dbt_parameter_key: ""
dbt_parameter_value: ""
- select: "tag:LV_ST"
op_name: "lv_st"
dbt_parameter_key: "V_STICHTAG"
dbt_parameter_value: "20221231"
- select: "tag:KOLUMBUS"
op_name: "kol"
dbt_parameter_key: ""
dbt_parameter_value: "20221231"
and here is my code:
try:
from yaml import CLoader as Loader
except ImportError:
from yaml import Loader
from dagster import AssetKey, asset
from dagster._core.pipes.subprocess import PipesSubprocessClient
def load_yaml(relative_path) -> Dict[str, Any]:
path = os.path.join(os.path.dirname(__file__), relative_path)
with open(path, "r", encoding="utf8") as ff:
return yaml.load(ff, Loader=Loader)
def from_asset_entries(asset_entries: Dict[str, Any]) -> List[AssetsDefinition]:
assets_defs = []
for asset_entry in asset_entries["assets"]:
select = asset_entry["select"]
op_name = asset_entry["op_name"] + "_dbt"
parameter_key = asset_entry["dbt_parameter_key"]
parameter_value = str(asset_entry["dbt_parameter_value"])
#dep_entries = asset_entry.get("deps", [])
#description = asset_entry.get("description")
#target_table = asset_entry.get("target_table")
#prozess = asset_entry.get("prozess")
#asset_key = AssetKey.from_user_string(asset_key_str)
#deps = [AssetKey.from_user_string(dep_entry) for dep_entry in dep_entries]
#sql = asset_entry["sql"] # this is required
##parameter_value = "tt"
@dbt_assets(manifest=dbt_manifest_path,name=op_name, select=select,
dagster_dbt_translator=CustomDagsterDbtTranslator())
def _assets_def(context: OpExecutionContext,dbt: DbtCliResource):
dbt_vars = {"V_STICHTAG": parameter_value}
yield from dbt.cli(["build", "--vars", json.dumps(dbt_vars)],target_path=Path("target"), context=context).stream()
#run_results_json = dbt_build_invocation.get_artifact("run_results.json")
#AssetExecutionContext
assets_defs.append(_assets_def)
return assets_defs
def get_asset_dsl_example_defs() -> List[AssetsDefinition]:
asset_entries = load_yaml("dbt_assets.yml")
return from_asset_entries(asset_entries)
the problem is that when i materialized one asset he allway take the parameter_value from the last entry in the ymlScout
02/27/2024, 4:52 PM@dbt_assets
decorator is being used within a loop. When you define the _assets_def
function inside the loop, it captures the parameter_value
from the last iteration because of the way closures work in Python. This is a common pitfall known as a "late binding closure." To resolve this issue, you can modify the function definition to accept parameter_value
as an argument, ensuring that each asset definition gets its own distinct value. Here's how you can adjust your code: python from functools import partial @dbt_assets(manifest=dbt_manifest_path, name=op_name, select=select, dagster_dbt_translator=CustomDagsterDbtTranslator()) def _assets_def(context: OpExecutionContext, dbt: DbtCliResource, parameter_value): dbt_vars = {"V_STICHTAG": parameter_value} yield from dbt.cli(["build", "--vars", json.dumps(dbt_vars)], target_path=Path("target"), context=context).stream() for asset_entry in asset_entries["assets"]: # ... (other code remains the same) parameter_value = str(asset_entry["dbt_parameter_value"]) assets_defs.append(partial(_assets_def, parameter_value=parameter_value))
In this adjusted code, partial
is used from the functools
module to create a new function that has parameter_value
pre-filled with the correct value for each asset. This way, each asset will use the correct parameter_value
when it's materialized. Make sure to import partial
at the beginning of your script: python from functools import partial
After making these changes, each asset should correctly use the parameter_value
specified for it in the YAML file when it's materialized.Joram Sbrisny
02/27/2024, 4:56 PMScout
02/27/2024, 4:56 PMAssetsDefinition
, SourceAsset
, or CacheableAssetsDefinition
object, but it's receiving a functools.partial
object instead. This is because the partial
function does not return an AssetsDefinition
object directly; it returns a callable that, when invoked, will return the desired AssetsDefinition
. To resolve this issue, you need to ensure that the list returned by get_asset_dsl_example_defs
contains instances of AssetsDefinition
rather than partial
objects. One way to do this is to invoke the partial
objects to create the AssetsDefinition
instances before adding them to the list. Here's how you can adjust your code: python # ... (other code remains the same) def from_asset_entries(asset_entries: Dict[str, Any]) -> List[AssetsDefinition]: assets_defs = [] for asset_entry in asset_entries["assets"]: # ... (other code remains the same) parameter_value = str(asset_entry["dbt_parameter_value"]) # Create a new function with parameter_value bound to it def make_assets_def(parameter_value): @dbt_assets(manifest=dbt_manifest_path, name=op_name, select=select, dagster_dbt_translator=CustomDagsterDbtTranslator()) def _assets_def(context: OpExecutionContext, dbt: DbtCliResource): dbt_vars = {"V_STICHTAG": parameter_value} yield from dbt.cli(["build", "--vars", json.dumps(dbt_vars)], target_path=Path("target"), context=context).stream() return _assets_def # Invoke the function to create an AssetsDefinition instance assets_defs.append(make_assets_def(parameter_value)) return assets_defs # ... (other code remains the same)
In this adjusted code, make_assets_def
is a function that creates a new _assets_def
function with the parameter_value
bound to it. This function is then invoked to create an AssetsDefinition
instance, which is appended to the assets_defs
list. This approach should resolve the error and ensure that each asset definition is created with the correct parameter_value
.Joram Sbrisny
02/27/2024, 5:07 PMScout
02/27/2024, 5:07 PMTypeError: reduce() of empty sequence with no initial value
suggests that you are using the reduce
function from the functools
module on an empty sequence without providing an initial value. However, based on the code snippets you've provided, there is no explicit use of reduce
in the code.
This error might be indirectly related to the way assets are being defined or materialized within Dagster. If Dagster internally uses reduce
during the process of asset materialization, and if the list of assets is empty or not properly constructed, it could lead to this error.
To debug this issue, you should ensure that:
1. The YAML file is being read correctly and that asset_entries
contains the expected data.
2. The from_asset_entries
function is being called and that it returns a non-empty list of AssetsDefinition
objects.
3. There are no other parts of your code or Dagster's internal code that might be using reduce
on an empty sequence.
Here are some steps you can take to debug the issue:
- Add logging or print statements to verify that the YAML file is loaded correctly and that asset_entries
contains the expected list of assets.
- Check that the from_asset_entries
function is indeed being called and that it returns a non-empty list.
- If you are using reduce
elsewhere in your code, ensure that you provide an initial value when working with sequences that could potentially be empty.
If you are still encountering issues, please provide more context or any additional code that might be relevant to the problem, especially if there's a use of reduce
that we're not seeing in the provided snippets. This will help in diagnosing the specific cause of the error you're experiencing. Type error when passing Nothing
as `da... button backfills page shows error if asset targ... button returning empty dataframes from assets/o... button Dagster Docs button Thank you for your feedback!