https://dagster.io/ logo
#ask-community
Title
# ask-community
p

Peter Davidson

12/14/2022, 12:12 PM
I'm trying to set up a config mapping for ops. Some ops I am reusing and creating an alias
Copy code
@op
def generate_sample1() -> pd.DataFrame:
    # <http://context.log.info|context.log.info>("config_param: " + context.op_config["config_param"])
    return sample_data()

@graph
def graph_multi_sample():
    n_samples = 5
    samples = []
    for i in range(n_samples):
        samples.append(generate_sample1.alias(f"sample_{i}")())
    return concat_samples(samples)

job_from_graph = graph_multi_sample.to_job(resource_defs=resource_defs, config=ops_output_config)
Where the ops_output_config is a conf mapping to generate file paths for each op:
Copy code
@config_mapping(config_schema={"param_id": int})
def ops_output_config(val):

    conf = load_conf_from_csv(val["param_id"])
    workspace_root = os.path.join(conf.get('run_type'), conf.get('rep_date'), conf.get('nickname'))
    ops_output_config_schema = {}
    for output in ['concat_samples', 'generate_sample1', 'generate_sample2']:
        output_path = os.path.join(workspace_root, 'result', f"{output}.pkl")
        ops_output_config_schema[output] = {'outputs': {'result': {'output_path': output_path}}}

    return {"ops": ops_output_config_schema}
Now the config mapping is not very complex -> is there a way to pass the required ops to the config mapping, so it knows to create records for all of the sample aliases?
z

Zach

12/14/2022, 11:16 PM
If I'm understanding the goal I think you could use a factory function to parameterize the config mapping. something like
Copy code
def config_mapping_factory(outputs: List[str]):
    @config_mapping(config_schema={"param_id": int})
    def ops_output_config(val):

        conf = load_conf_from_csv(val["param_id"])
        workspace_root = os.path.join(conf.get('run_type'), conf.get('rep_date'), conf.get('nickname'))
        ops_output_config_schema = {}
        for output in outputs:
            output_path = os.path.join(workspace_root, 'result', f"{output}.pkl")
            ops_output_config_schema[output] = {'outputs': {'result': {'output_path': output_path}}}

        return {"ops": ops_output_config_schema}
    return ops_output_config
Then you just call the factory when configuring the job:
Copy code
job_from_graph = graph_multi_sample.to_job(resource_defs=resource_defs, config=config_mapping_factory(['concat_samples', 'generate_sample1', 'generate_sample2']))
If you're trying to get all the ops in the graph dynamically I think you could get all the ops by querying the
node_defs
or
node_dict
properties on the graph, something like
Copy code
node_names = [n.name for n in graph_multi_sample.node_defs]
job_from_graph = graph_multi_sample.to_job(resource_defs=resource_defs, config=config_mapping_factory(node_names))
One thing I'm unsure of though is whether the
node_defs
attribute will have the alias value set as the
name
❤️ 1
🌈 1
p

Peter Davidson

12/15/2022, 10:16 AM
Hey @Zach this is awesome thank you so much! I tested for alias, it doesn't seem to work like this -> the nodes aren't generated for the aliases:
Copy code
samples.append(generate_sample1.alias(f'generate_sample1_s{i}')())
But if i use configured on the ops, specifying a new name, it seems to work as expected:
Copy code
sample = configured(generate_sample1, name=f"generate_sample1_s{i}")({})()
samples.append(sample)
IDK the difference between alias and configured but I have a solution that works 🙂 Thanks again!
z

Zach

12/15/2022, 4:14 PM
interesting, I wouldn't have expected that. glad you found something, happy to help!
D 1
13 Views