Moataz Ghobashy
03/01/2024, 2:06 PMdef factory_asset(
*,
key: AssetKey,
description: Optional[str] = None,
auto_materialize_policy: Optional[AutoMaterializePolicy] = EAGER_AMP,
deps: Optional[list[AssetsDefinition]] = [],
freshness_policy: Optional[FreshnessPolicy] = DEFAULT_FP,
partitions_def: Optional[PartitionsDefinition] = DEFAULT_DAILY_PARTITION_DEF,
retry_policy: Optional[RetryPolicy] = RETRY_POLICY,
group_name: Optional[str] = "default",
):
def wrapper(func: Callable):
@asset(
key=key,
description=description,
auto_materialize_policy=auto_materialize_policy,
deps=deps,
freshness_policy=freshness_policy,
partitions_def=partitions_def,
retry_policy=retry_policy,
group_name=group_name,
)
def inner_func(
context: AssetExecutionContext,
mongodb: MongoDB,
datalake: DataLake,
):
return func(context=context, mongodb=mongodb, datalake=datalake)
return inner_func
return wrapper