Hi all, We're testing out Dagster and are using i...
# ask-community
o
Hi all, We're testing out Dagster and are using it with a grpc server to serve a repository. We have a folder with different files containing asset definitions. We load these dynamically into a list of repository objects:
Copy code
@repository()
def test_repo():
    repository_objects = get_repository_objects()
    return repository_objects
So when the gRPC server is started these can all be fetched by dagit. We'd like to now copy new assets into the asset file directory and have the newly added files synced to dagit. It only seems to pick up the new assets when we restart the gRPC server. Is there any way to have the gRPC server pick up the changes to the repo without restarting? I could not find anything about this in the docs.
🌈 1
🤖 1
d
Hey Oscar - if your Python code hasn't changed, one possibility is to return a RepositoryData object from the repository instead - there's an example here https://docs.dagster.io/_apidocs/repositories#dagster.repository If your python code has changed and needs to be re-imported ,the only way to pick up those changes currently is to restart the gRPC server
o
Thanks Daniel. We use this pattern since we're moving away from Airflow where we have a side-cart syncing dags. All our assets are generated from config. Do you know what the common CICD pattern is for Dagster? A git repo with the code that triggers a restart of the gRPC server on change?
d
Yeah, some of the details depend on how you're deploying Dagster (whether you're using Docker images or not) but I think that would be a common setup
o
@daniel What do you mean with 'python code changes' while using RepositoryData. Don't we finally still need to use the @repository decorator? It seems this decorator caches the jobs etc so I don't see how this would allow us to add and modify jobs without having to restart the gRPC server.
d
the decorator doesn't cache the jobs - it returns an instance of that RepositoryData object, which allows you to control whether it caches
you can use get_all_jobs instead of get_all_pipelines but you'll hit the same error - It looks like we're going to need to replicate this logic from the repository decorator and return that resolved job object instead: https://github.com/dagster-io/dagster/blob/master/python_modules/dagster/dagster/_core/definitions/repository_definition.py#L847-L859
If you can share what you have now we could take a look - this may be the first time somebody has tried to use RepositoryData and define_asset_job together
but it looks like it also needs a list of AssetsDefinitions and SourceAssets and then those get passed into a resolve function that produces a job definition
o
@daniel Here we created a minimal example that reproduces the error.
Copy code
import os

from dagster import (
    op, build_schedule_from_partitioned_job, Output,
    DefaultScheduleStatus, AssetsDefinition, graph, DailyPartitionsDefinition, define_asset_job,
    AssetSelection, Nothing, RepositoryData, repository,
)

from dagster_assets.ops import extract


@op
def etl_tnt_dp_skeleton_skeletontable_end(test):
    return Output(None, metadata={"row_count": 1})

@graph
def etl_tnt_dp_skeleton_skeletontable() -> Nothing:
    r_extract = extract()
    r_end = etl_tnt_dp_skeleton_skeletontable_end(r_extract)
    return r_end

partitions_def = DailyPartitionsDefinition(start_date="2022-04-01")

etl_tnt_dp_skeleton_skeletontable_asset = AssetsDefinition.from_graph(
    etl_tnt_dp_skeleton_skeletontable,
    partitions_def=partitions_def
)

etl_tnt_dp_skeleton_skeletontable_job = define_asset_job(
    "etl_tnt_dp_skeleton_skeletontable_job",
    selection=AssetSelection.assets(etl_tnt_dp_skeleton_skeletontable_asset),
    partitions_def=partitions_def)

etl_tnt_dp_skeleton_skeletontable_schedule = build_schedule_from_partitioned_job(
    etl_tnt_dp_skeleton_skeletontable_job,
    default_status=DefaultScheduleStatus.RUNNING)


class ComplexRepositoryData(RepositoryData):
    def __init__(self):
        pass

    def get_all_pipelines(self):
        return self.get_repository_objects()

    def get_repository_objects(self):
        return [etl_tnt_dp_skeleton_skeletontable_job, etl_tnt_dp_skeleton_skeletontable_schedule]

@repository
def tnt_dp():
    return ComplexRepositoryData()
We are not yet that comfortable with all the nomenclature of Dagster so it might be that we are doing something obvious wrong 🙂
d
Try this:
Copy code
class ComplexRepositoryData(RepositoryData):
    def __init__(self):
        pass

    def get_all_pipelines(self):
        return [
            etl_tnt_dp_skeleton_skeletontable_job.resolve(
                assets=[etl_tnt_dp_skeleton_skeletontable_asset],
                source_assets=[],
            )
        ]

    def get_all_schedules(self):
        return [etl_tnt_dp_skeleton_skeletontable_schedule]
❤️ 2
bit odd that you have to specify the asset twice
You asked earlier what "'python code changes'" means - what that means is that if you had to make any changes to that .py file, you would need to restart the server for those changes to be applied. However, if those functions pulled from a file or something and the contents of that file changed (but not the python code), then you would not need to restart the server.
👍 2
o
Great, that clarifies the 'python code changes', thanks. This example seems to work! Thanks a lot for the quick and helpful feedback. Will try to get this to work tomorrow since it's end of day here. Will get back to you
We got it to work sooner then expected 🙂 Does Dagit have a polling interval to pick up the changes coming from the repository server or do you need to do a manual reload?
d
You need to do a manual reload currently
either from the UI or over graphql
👍 1
If you do restart the server, dagit will automatically pick up the changes
d
yeah
🌈 1
y
Thanks a lot!
condagster 1