Carl Almklev
01/26/2023, 8:06 PMRepositoryData
, which looks to work how i want. However I can't seem to find any way to plug assets into it, i have lots of them (some loaded from dbt, some manually created) and then using with_resources
to return from @repository
. Any pointers would be much appreciated 🙂yuhan
01/30/2023, 7:33 PMCarl Almklev
01/31/2023, 10:20 AMdef load_partitions() -> PartitionsDefinition:
# Load partitions from external source
from_database = [1, 2, 3]
return StaticPartitionsDefinition(partition_keys=[from_database])
def create_asset1(partitions: PartitionsDefinition) -> AssetsDefinition:
@asset(partitions_def=partitions)
def some_asset():
return [1, 2]
return some_asset
def create_asset2(partitions: PartitionsDefinition) -> AssetsDefinition:
@asset(partitions_def=partitions)
def another_asset():
return [3]
return another_asset
def create_job(partitions: PartitionsDefinition) -> JobDefinition:
return define_asset_job(name="some_job", selection="some_asset", partitions_def=partitions)
@repository
def basic_repo():
partitions = load_partitions()
resourced_assets = with_resources([
create_asset1(partitions),
create_asset2(partitions)
], resource_defs={
"io_manager": some_io_manager
})
return resourced_assets
When reloading the repository, load_partitions() is only called when running locally, when deployed to kubernetes, the user repo is connected to via GRPC and the only way to reload the repository properly is to restart the container.
Changing @repository to:
class ReloadableRepo(RepositoryData):
def get_all_jobs(self) -> Sequence[JobDefinition]:
partitions = load_partitions()
return [create_job(partitions)]
@repository
def dynamic_repo():
return ReloadableRepo()
Works great, however I can't figure out how to fit Assets into RepositoryData. Hope that makes sense, thanks! 🙂Carl Almklev
02/03/2023, 12:15 PMyuhan
02/09/2023, 11:30 PMyuhan
02/09/2023, 11:38 PMassets_defs_by_key
attribute to RepositoryData
where the key is the asset key and value is the asset definitionyuhan
02/09/2023, 11:39 PMclass ReloadableRepo(RepositoryData):
def get_all_jobs(self) -> Sequence[JobDefinition]:
partitions = load_partitions()
return [create_job(partitions)]
def get_assets_defs_by_key(self) -> Mapping[AssetKey, AssetsDefinition]:
...
yuhan
02/09/2023, 11:40 PMCarl Almklev
02/14/2023, 6:27 PM