Hello everyone, I am just wondering if there is a ...
# ask-community
l
Hello everyone, I am just wondering if there is a methodology/best practice around creating parameterized assets and asset graphs? I have an a scenario where I'm essentially having to call an API multiple times but want to declare the returned data individually for auditing. The data pipeline is the same for each so was wondering if I have to manually create each of these pipelines or if they can be autogenerated? asset-a1 -> asset-a1-processing -> asset-combined asset-a2 -> asset-a2-processing -> asset-combined asset-a3 -> asset-a3-processing -> asset-combined
d
I think I have a similar issue: I have many very similar external API calls that materialize different assets. I went with an asset factory pattern:
Copy code
from typing import Dict, Any, List
from dagster import asset, 
AssetsDefinition, AssetExecutionContext, ConfigurableResource
 
class MyExternalApiResource(ConfigurableResource):
    def call_api(self, api_opts: Dict[str, Any]) -> None:
        ...
 
def generate_asset(asset_key: str, api_opts: Dict[str, Any], deps: Optional[List[str]] = None) -> AssetsDefinition:
    @asset(name=asset_key, deps=deps)
    def _asset(context: AssetExecutionContext, external_api: MyExternalApiResource) -> None:
        external_api.call_api(api_opts)
 
    return _asset
 
assets: List[AssetsDefinition] = []
 
# populate the assets list with calls to the generate_assets function and use it in defs
s
Roughly how many of these do you have?
l
@sandy 10 consisting of three assets; getting data from URL -> pushing data to S3 -> copying data from S3 to redshift (am thinking on moving this last step to airbyte). Then these tables seed the imported dbt project for transformation (is the plan). Thanks for the help!
d
I think the asset factory pattern I described would be quite suitable for this, if all you have to change is the URL and the destinations. For your 3 substeps you could either make a factory for graph-backed assets, or create 3 different asset factories and connect the 10 flows appropriately via the deps arguments. Or, alternatively, if you don't care much about the data you get from the URL or the data on S3, you could also put all 3 steps directly into a single asset.
👍 1
l
Thank you @DB, this is a little over my head but I do appreciate it though! I'll have a go at implementing it! Do you perhaps have an more fleshed out example, maybe a github link to your project? Just so I can see like the full asset.py? Thank you! 🙂
d
Unfortunately I cannot make my project public, but there is a discussion about this pattern here: https://github.com/dagster-io/dagster/discussions/11045
l
@DB just wanted to say thank you! Got this up and working in the end! Was a smart solution! Thanks! 👌🏼