Oliver
12/05/2022, 5:46 AM{'a': str, 'b': str, 'partition': str}
I want to fix a and b and then let partition be decided by a partition and present that setup as an asset.
I'm having trouble figuring out how to combined partitioned_configs and assets though. Should it be supported?@static_partitioned_config(parameters)
def parameter_partition_config(keys: list[str]):
assert len(keys) == 1, 'on support single partition'
parameter, = keys
return {
"ops": {
"real_bucketed_feature_analysis": {
"config": {
"parameter": parameter,
'cohort_a': 'a',
'cohort_b': 'b'
}
}
}
}
rbfa = dm.define_dagstermill_op(
"real_bucketed_feature_analysis",
"notebooks_rendered/real_bucketed_feature_analysis.ipynb", # TODO make work with packaging
output_notebook_name="real_bucketed_feature_analysis",
config_schema={
'cohort_a': str,
'cohort_b': str,
'parameter': str
},
ins={
'real_bucketed': In(Nothing)
}
)
rbfa_asset = AssetsDefinition.from_op(
rbfa.configured(parameter_partition_config, rbfa.name),
partitions_def=params_partitions,
keys_by_input_name={'real_bucketed': AssetKey('real_bucketed')},
group_name=ASSET_GROUP
)
which throws
dagster._core.errors.DagsterConfigMappingFunctionError: The config mapping function on a `configured` OpDefinition has thrown an unexpected error during its execution.
owen
12/06/2022, 12:41 AMcontext
object available to ops/assets usually has the partition_key
available there (eliminating the need to pass it via config), but this is not the case for dagstermill.
partitioned config is meant for job-level configuration rather than asset or job level configuration, which is (I believe) why you're seeing the error. Instead, I'd recommend writing a config_mapping function to fix the two config values you care about while leaving the parameter option open for later configuration:
@configured(rbfa, config_schema={"parameter": str})
def configured_rbfa(config):
return {"parameter": config["parameter"], "cohort_a": "a", "cohort_b": "b"}
from there, you can do:
rbfa_asset = AssetsDefinition.from_op(
configured_rbfa,
partitions_def=params_partitions,
keys_by_input_name={'real_bucketed': AssetKey('real_bucketed')},
group_name=ASSET_GROUP
)
this will require the parameter to be added to the run config when launching runs. if you imagine that you'll usually launch runs of this op from the context of an asset job (i.e. define_asset_job
), this is where you could put in your partitioned config (which would just need to return something of the form {"ops": {"configured_rbfa": {"config": {"parameter": partition_key}}}}
Oliver
12/06/2022, 5:11 AM