https://dagster.io/ logo
#ask-community
Title
# ask-community
o

Oliver

12/05/2022, 5:46 AM
Hey all, I have an op with config schema like
Copy code
{'a': str, 'b': str, 'partition': str}
I want to fix a and b and then let partition be decided by a partition and present that setup as an asset. I'm having trouble figuring out how to combined partitioned_configs and assets though. Should it be supported?
so far I have the following
Copy code
@static_partitioned_config(parameters)
def parameter_partition_config(keys: list[str]):
    assert len(keys) == 1, 'on support single partition'
    parameter, = keys
    return {
        "ops": {
            "real_bucketed_feature_analysis": {
                "config": {
                    "parameter": parameter,
                    'cohort_a': 'a', 
                    'cohort_b': 'b'
                }
            }
        }
    }

rbfa = dm.define_dagstermill_op(
    "real_bucketed_feature_analysis",
    "notebooks_rendered/real_bucketed_feature_analysis.ipynb", # TODO make work with packaging
    output_notebook_name="real_bucketed_feature_analysis",
    config_schema={
        'cohort_a': str,
        'cohort_b': str,
        'parameter': str
    },
    ins={
        'real_bucketed': In(Nothing)
    }
)

rbfa_asset = AssetsDefinition.from_op(
    rbfa.configured(parameter_partition_config, rbfa.name),
    partitions_def=params_partitions,
    keys_by_input_name={'real_bucketed': AssetKey('real_bucketed')},
    group_name=ASSET_GROUP
)
which throws
Copy code
dagster._core.errors.DagsterConfigMappingFunctionError: The config mapping function on a `configured` OpDefinition has thrown an unexpected error during its execution.
1
o

owen

12/06/2022, 12:41 AM
hi @Oliver! This one is slightly tricky specifically in the dagstermill case, as the
context
object available to ops/assets usually has the
partition_key
available there (eliminating the need to pass it via config), but this is not the case for dagstermill. partitioned config is meant for job-level configuration rather than asset or job level configuration, which is (I believe) why you're seeing the error. Instead, I'd recommend writing a config_mapping function to fix the two config values you care about while leaving the parameter option open for later configuration:
Copy code
@configured(rbfa, config_schema={"parameter": str})
def configured_rbfa(config):
    return {"parameter": config["parameter"], "cohort_a": "a", "cohort_b": "b"}
from there, you can do:
Copy code
rbfa_asset = AssetsDefinition.from_op(
    configured_rbfa,
    partitions_def=params_partitions,
    keys_by_input_name={'real_bucketed': AssetKey('real_bucketed')},
    group_name=ASSET_GROUP
)
this will require the parameter to be added to the run config when launching runs. if you imagine that you'll usually launch runs of this op from the context of an asset job (i.e.
define_asset_job
), this is where you could put in your partitioned config (which would just need to return something of the form
{"ops": {"configured_rbfa": {"config": {"parameter": partition_key}}}}
o

Oliver

12/06/2022, 5:11 AM
ahh ok, yea thanks! that makes sense 🙂
hey 🙂 got this working today after finally coming back to it with a different notebook. I can't seem to find the notebook preview though, am I not looking hard enough? only for the partitioned asset though, previously i worked around with an asset factory and for those I can
2 Views