Oscar Delicaat
01/04/2023, 2:58 PM@repository()
def test_repo():
repository_objects = get_repository_objects()
return repository_objects
So when the gRPC server is started these can all be fetched by dagit. We'd like to now copy new assets into the asset file directory and have the newly added files synced to dagit. It only seems to pick up the new assets when we restart the gRPC server. Is there any way to have the gRPC server pick up the changes to the repo without restarting? I could not find anything about this in the docs.daniel
01/04/2023, 3:16 PMOscar Delicaat
01/04/2023, 3:24 PMdaniel
01/04/2023, 3:25 PMOscar Delicaat
01/05/2023, 2:48 PMdaniel
01/05/2023, 2:49 PMdaniel
01/05/2023, 3:51 PMdaniel
01/05/2023, 3:52 PMdaniel
01/05/2023, 3:55 PMOscar Delicaat
01/05/2023, 4:07 PMimport os
from dagster import (
op, build_schedule_from_partitioned_job, Output,
DefaultScheduleStatus, AssetsDefinition, graph, DailyPartitionsDefinition, define_asset_job,
AssetSelection, Nothing, RepositoryData, repository,
)
from dagster_assets.ops import extract
@op
def etl_tnt_dp_skeleton_skeletontable_end(test):
return Output(None, metadata={"row_count": 1})
@graph
def etl_tnt_dp_skeleton_skeletontable() -> Nothing:
r_extract = extract()
r_end = etl_tnt_dp_skeleton_skeletontable_end(r_extract)
return r_end
partitions_def = DailyPartitionsDefinition(start_date="2022-04-01")
etl_tnt_dp_skeleton_skeletontable_asset = AssetsDefinition.from_graph(
etl_tnt_dp_skeleton_skeletontable,
partitions_def=partitions_def
)
etl_tnt_dp_skeleton_skeletontable_job = define_asset_job(
"etl_tnt_dp_skeleton_skeletontable_job",
selection=AssetSelection.assets(etl_tnt_dp_skeleton_skeletontable_asset),
partitions_def=partitions_def)
etl_tnt_dp_skeleton_skeletontable_schedule = build_schedule_from_partitioned_job(
etl_tnt_dp_skeleton_skeletontable_job,
default_status=DefaultScheduleStatus.RUNNING)
class ComplexRepositoryData(RepositoryData):
def __init__(self):
pass
def get_all_pipelines(self):
return self.get_repository_objects()
def get_repository_objects(self):
return [etl_tnt_dp_skeleton_skeletontable_job, etl_tnt_dp_skeleton_skeletontable_schedule]
@repository
def tnt_dp():
return ComplexRepositoryData()
Oscar Delicaat
01/05/2023, 4:08 PMdaniel
01/05/2023, 4:11 PMclass ComplexRepositoryData(RepositoryData):
def __init__(self):
pass
def get_all_pipelines(self):
return [
etl_tnt_dp_skeleton_skeletontable_job.resolve(
assets=[etl_tnt_dp_skeleton_skeletontable_asset],
source_assets=[],
)
]
def get_all_schedules(self):
return [etl_tnt_dp_skeleton_skeletontable_schedule]
daniel
01/05/2023, 4:12 PMdaniel
01/05/2023, 4:14 PMOscar Delicaat
01/05/2023, 4:16 PMOscar Delicaat
01/05/2023, 4:27 PMdaniel
01/05/2023, 4:28 PMdaniel
01/05/2023, 4:28 PMdaniel
01/05/2023, 4:29 PMYeachan Park
01/05/2023, 4:31 PMdaniel
01/05/2023, 4:31 PMYeachan Park
01/05/2023, 4:32 PM