Alessandro Cantarelli
09/28/2022, 9:14 AM@asset(
partitions_def=daily,
io_manager_key="parquet_io_manager",
ins={'test': AssetIn(metadata={'allow_missing_partitions': True})}
)
def load_data(test: pandas.DataFrame) -> pandas.DataFrame:
return test
@asset(
partitions_def=daily,
io_manager_key="parquet_io_manager",
ins={"load_data": AssetIn(metadata={'allow_missing_partitions': True}, partition_mapping=NDaysPartitionMapping(days=1, offset=0))},
# days - determins number of extra days of data loaded (i.e. days=1 for partition 2018-01-03, will return data from 2018-01-03 and 2018-01-02)
# offset - offsets the data retrieved by date (i.e. offset = 1 for partition 2018-01-03, will return data from 2018-01-02 )
config_schema={'cellsSeries' : int, 'cellsParallel' : int}
)
def trailing_window(context, load_data: pandas.DataFrame) -> pandas.DataFrame:
df = load_data
<http://context.log.info|context.log.info>(f'Days of data loaded: {df.date.unique()}')
tidied_raw_data = two_day_data_trial.tidy_raw_data(context, database=df)
soc_ocv_lut = wr.s3.read_parquet("<s3://base-infra-bostonmodeldevelopment-lq00f0lfzytq/dagster/single_event_data/ocv_soc_lut.parquet>")
return tidied_raw_data
test_invoked = with_resources(
[test, load_data, trailing_window],
RESOURCES_PROD,
)
@repository
def source_repo():
return [test_invoked, schedule]
The first asset just loads in data from an s3 bucket and returns a DataFrame, This DataFrame is then inputted into the second asset that partitions it and does some operations. This second asset requires two config schemas ('cellsSeries' : int, 'cellsParallel' : int). How do I pass these?
This is the source asset I use:
test = SourceAsset(
key=AssetKey("test"),
description="Example partitioned data",
io_manager_key="parquet_io_manager",
partitions_def=daily,
)
This is the job scheduler I use:
schedule = build_schedule_from_partitioned_job(
define_asset_job("main", partitions_def=daily)
)
These are the resources I am using:
RESOURCES_PROD = {
"s3_bucket": ResourceDefinition.hardcoded_resource(
"some-url"
),
"s3": s3_resource,
"s3_prefix": ResourceDefinition.hardcoded_resource("dagster"),
"parquet_io_manager": s3_partitioned_parquet_io_manager,
}
How can I pass a config file into this setup so that ops called inside the "trailing_window" asset can access values using for example context.op_config['cellsParallel']?
Thanks!claire
09/28/2022, 6:09 PMfrom dagster import config_from_files, ....
@daily_partitioned_config(start_date="2022-09-01")
def my_config(start, _end):
config = {
"ops": {
"trailing_window": {
"config": config_from_files([file_relative_path(__file__, "config.yaml")])
}
}
}
return config
@repository
def source_repo():
return [
trailing_window,
define_asset_job(
"my_partitioned_job",
config=my_config,
),
]
With the config.yaml
file containing:
cellsSeries: 5
cellsParallel: 5
If you don't want to specify configuration for your entire job within the yaml file.
Note that the partition config must match your partitions definition--so the same partition type (daily, weekly, etc.) and the same start date.Alessandro Cantarelli
09/29/2022, 9:00 AM