https://dagster.io/ logo
#ask-community
Title
# ask-community
c

Carl Almklev

01/26/2023, 8:06 PM
Hey! I need to periodically reload partitions for assets, I'm calling the graphql API to reload the repository in a sensor, which works great locally, but when deployed over GRPC it doesn't work. I've seen people ask this and get pointed to
RepositoryData
, which looks to work how i want. However I can't seem to find any way to plug assets into it, i have lots of them (some loaded from dbt, some manually created) and then using
with_resources
to return from
@repository
. Any pointers would be much appreciated 🙂
y

yuhan

01/30/2023, 7:33 PM
Hi Carl, sorry about the delay. mind sharing what you have now so we can better advice?
c

Carl Almklev

01/31/2023, 10:20 AM
Hey Yuhan, thanks for getting back to me. I've not made much progress with this since, here's a more concrete example of the problem i'm having:
Copy code
def load_partitions() -> PartitionsDefinition:
    # Load partitions from external source
    from_database = [1, 2, 3]
    return StaticPartitionsDefinition(partition_keys=[from_database])


def create_asset1(partitions: PartitionsDefinition) -> AssetsDefinition:
    @asset(partitions_def=partitions)
    def some_asset():
        return [1, 2]

    return some_asset

def create_asset2(partitions: PartitionsDefinition) -> AssetsDefinition:
    @asset(partitions_def=partitions)
    def another_asset():
        return [3]

    return another_asset


def create_job(partitions: PartitionsDefinition) -> JobDefinition:
    return define_asset_job(name="some_job", selection="some_asset", partitions_def=partitions)


@repository
def basic_repo():
    partitions = load_partitions() 
    resourced_assets = with_resources([
        create_asset1(partitions),
        create_asset2(partitions)
    ], resource_defs={
        "io_manager": some_io_manager
    })
    return resourced_assets
When reloading the repository, load_partitions() is only called when running locally, when deployed to kubernetes, the user repo is connected to via GRPC and the only way to reload the repository properly is to restart the container. Changing @repository to:
Copy code
class ReloadableRepo(RepositoryData):
    def get_all_jobs(self) -> Sequence[JobDefinition]:
        partitions = load_partitions()
        return [create_job(partitions)]


@repository
def dynamic_repo():
    return ReloadableRepo()
Works great, however I can't figure out how to fit Assets into RepositoryData. Hope that makes sense, thanks! 🙂
Hey @yuhan any chance you've had some time to look at this? 😄
y

yuhan

02/09/2023, 11:30 PM
sorry sorry - this totally slipped!
there’s an
assets_defs_by_key
attribute to
RepositoryData
where the key is the asset key and value is the asset definition
in your case, it could be something like:
Copy code
class ReloadableRepo(RepositoryData):
    def get_all_jobs(self) -> Sequence[JobDefinition]:
        partitions = load_partitions()
        return [create_job(partitions)]

    def get_assets_defs_by_key(self) -> Mapping[AssetKey, AssetsDefinition]:
        ...
c

Carl Almklev

02/14/2023, 6:27 PM
Thanks for getting back to me! Sorry, I missed the notification 😄 That looks like exactly what I need, I think I was slightly behind the version of when it was added which explains why I couldn't see anything (moved up from CachingRepositoryData). Thanks Yuhan! 🙂
👍 1
10 Views