https://dagster.io/ logo
Title
a

Alessandro Cantarelli

09/28/2022, 9:14 AM
Hi all, I have an asset that I am trying to run with a config file on:
@asset(
    partitions_def=daily, 
    io_manager_key="parquet_io_manager", 
    ins={'test': AssetIn(metadata={'allow_missing_partitions': True})}
)
def load_data(test: pandas.DataFrame) -> pandas.DataFrame:
    return test

@asset(
    partitions_def=daily,
    io_manager_key="parquet_io_manager",
    ins={"load_data": AssetIn(metadata={'allow_missing_partitions': True}, partition_mapping=NDaysPartitionMapping(days=1, offset=0))}, 
    # days - determins number of extra days of data loaded (i.e. days=1 for partition 2018-01-03, will return data from 2018-01-03 and 2018-01-02)
    # offset - offsets the data retrieved by date (i.e. offset = 1 for partition 2018-01-03, will return data from 2018-01-02 )
    config_schema={'cellsSeries' : int, 'cellsParallel' : int}
)
def trailing_window(context, load_data: pandas.DataFrame) -> pandas.DataFrame:
    df = load_data
    <http://context.log.info|context.log.info>(f'Days of data loaded: {df.date.unique()}')
    tidied_raw_data = two_day_data_trial.tidy_raw_data(context, database=df)
    soc_ocv_lut = wr.s3.read_parquet("<s3://base-infra-bostonmodeldevelopment-lq00f0lfzytq/dagster/single_event_data/ocv_soc_lut.parquet>")
    return tidied_raw_data




test_invoked = with_resources(
    [test, load_data, trailing_window],
    RESOURCES_PROD, 
)


@repository
def source_repo():
    return [test_invoked, schedule]
The first asset just loads in data from an s3 bucket and returns a DataFrame, This DataFrame is then inputted into the second asset that partitions it and does some operations. This second asset requires two config schemas ('cellsSeries' : int, 'cellsParallel' : int). How do I pass these? This is the source asset I use:
test = SourceAsset(
    key=AssetKey("test"),
    description="Example partitioned data",
    io_manager_key="parquet_io_manager",
    partitions_def=daily,
)
This is the job scheduler I use:
schedule = build_schedule_from_partitioned_job(
    define_asset_job("main", partitions_def=daily)
)
These are the resources I am using:
RESOURCES_PROD = {
    "s3_bucket": ResourceDefinition.hardcoded_resource(
        "some-url"
    ),
    "s3": s3_resource,
    "s3_prefix": ResourceDefinition.hardcoded_resource("dagster"),
    "parquet_io_manager": s3_partitioned_parquet_io_manager,
}
How can I pass a config file into this setup so that ops called inside the "trailing_window" asset can access values using for example context.op_config['cellsParallel']? Thanks!
c

claire

09/28/2022, 6:09 PM
Hi Alessandro. Here is one possible way to do this:
from dagster import config_from_files, ....

@daily_partitioned_config(start_date="2022-09-01")
def my_config(start, _end):
    config = {
        "ops": {
            "trailing_window": {
                "config": config_from_files([file_relative_path(__file__, "config.yaml")])
            }
        }
    }
    return config


@repository
def source_repo():
    return [
        trailing_window,
        define_asset_job(
            "my_partitioned_job",
            config=my_config,
        ),
    ]
With the
config.yaml
file containing:
cellsSeries: 5
cellsParallel: 5
If you don't want to specify configuration for your entire job within the yaml file. Note that the partition config must match your partitions definition--so the same partition type (daily, weekly, etc.) and the same start date.
❤️ 1
a

Alessandro Cantarelli

09/29/2022, 9:00 AM
@claire Thank you so much, very helpful