Martin Carlsson
01/18/2023, 10:10 AMasset
names as parameters in the transform/multi_asset
function.
This means that when I add a new company or endpoint I have to hardcode it multiple places in my code.
Are there any way to get around that?
@multi_asset(
outs={
transformed_asset_name(company, endpoint): AssetOut(
dagster_type=pd.DataFrame,
io_manager_key="default_io_manager",
description=f"Transform {company} {endpoint} to dataframe",
is_required=False,
)
for company, endpoint in product(COMPANIES, ENDPOINTS)
},
internal_asset_deps={
transformed_asset_name(company, endpoint): {AssetKey(extract_asset_name(company, endpoint))}
for company, endpoint in product(COMPANIES, ENDPOINTS)
},
can_subset=True,
)
def transform(
context,
extract_dk_glentry_get,
extract_dk_dimensionsetentry_get,
extract_usa_glentry_get,
extract_usa_dimensionsetentry_get,
):
...
Daniel Gafni
01/18/2023, 12:07 PMMartin Carlsson
01/18/2023, 2:00 PMasset factory
in the documentation. Could you point me in the right direction?Daniel Gafni
01/18/2023, 2:04 PMdef get_asset_for_company(company: str) -> AssetDefinition
@asset(name=company)
def company_asset()
return get_data(company)
return company_asset
2. Use it multiple times (in a for loop)
assets = []
for company in COMPANIES:
assets.append(get_asset_for_company(company))
It's crusial to write the def get_asset_for_company
OUTSIDE of the for loop, otherwise Python will bake the last iterating value in the asset function (they all will be the same).
Typed this in slack so may have some errors but you should get the ideaCasper Weiss Bang
01/18/2023, 2:31 PMMartin Carlsson
01/18/2023, 2:35 PMMartin Carlsson
01/18/2023, 2:37 PM@multi_asset(
ins={
extract_asset_name(company, endpoint): AssetIn(key=extract_asset_name(company, endpoint))
for company, endpoint in product(COMPANIES, ENDPOINTS)
},
outs={
transformed_asset_name(company, endpoint): AssetOut(
dagster_type=pd.DataFrame,
io_manager_key="default_io_manager",
description=f"Transform {company} {endpoint} to dataframe",
is_required=False,
)
for company, endpoint in product(COMPANIES, ENDPOINTS)
},
internal_asset_deps={
transformed_asset_name(company, endpoint): {AssetKey(extract_asset_name(company, endpoint))}
for company, endpoint in product(COMPANIES, ENDPOINTS)
},
can_subset=True,
)
def transform(context, **extracts):
transformed = {}
for company, endpoint in product(COMPANIES, ENDPOINTS):
if transformed_asset_name(company, endpoint) in context.selected_output_names:
transformed[transformed_asset_name(company, endpoint)] = transform_to_dataframe(
extracts.get(extract_asset_name(company, endpoint))
)
yield Output(
value=transformed[transformed_asset_name(company, endpoint)],
output_name=transformed_asset_name(company, endpoint),
metadata={"Number of rows": len(transformed[transformed_asset_name(company, endpoint)].index)},
)
Casper Weiss Bang
01/18/2023, 2:38 PMDaniel Gafni
01/18/2023, 2:38 PM@repository
or Definitions
or even load them from the module with the corresponding dagster functionDaniel Gafni
01/18/2023, 2:39 PMCasper Weiss Bang
01/18/2023, 2:40 PMDaniel Gafni
01/18/2023, 2:41 PM