Carl Almklev
02/20/2023, 12:11 PMdef partitioned_asset(partitions: List[str]) -> AssetsDefinition:
@asset(partitions_def=StaticPartitionsDefinition(partitions))
def some_asset():
logger = get_dagster_logger()
<http://logger.info|logger.info>("running asset")
return some_asset
class ReloadableRepository(RepositoryData):
def get_all_pipelines(self) -> Sequence[PipelineDefinition]:
some_asset = partitioned_asset(["partition1", "partition2"])
unresolved_asset_job = define_asset_job(name="asset_job", selection=[some_asset])
resovled_asset_job = unresolved_asset_job.resolve(asset_graph=AssetGraph.from_assets([some_asset]))
return [resovled_asset_job]
def get_top_level_resources(self) -> Mapping[str, ResourceDefinition]:
return {}
@repository(name="test")
def repo():
return ReloadableRepository()
Any help would be much appreciated 🙂def get_all_partition_sets(self) -> Sequence[PartitionSetDefinition]: