Daniel Michaelis
05/11/2022, 1:20 PMyuhan
05/11/2022, 9:00 PMsandy
05/11/2022, 9:12 PMDaniel Michaelis
05/12/2022, 8:35 AMmake_values_resource
).
As an example, the imputation op in the first pipeline would be called impute_missing_training
which returns a dictionary of columns as keys and fitted imputers as values. This should be loaded into an op in the second pipeline named impute_missing_application
. Currently I'm using a hack that works for the `fs_io_manager`:
@op(
out={
"df_imputed": Out(pd.DataFrame),
"imputer_dict": Out(dict),
},
config_schema={"imputation_mapping": Field(dict, is_required=False)},
)
def impute_missing_training(
context,
df: pd.DataFrame,
):
imputation_mapping = context.op_config.get("imputation_mapping", {})
df_imputed = df.copy()
imputer_dict = {}
for column, params in imputation_mapping.items():
if params.get("missing_values") == "None":
params["missing_values"] = None
imputer = SimpleImputer(**params)
df_imputed[column] = imputer.fit_transform(df_imputed[[column]])
imputer_dict[column] = imputer
return df_imputed, imputer_dict
@op(
out={
"df_imputed": Out(pd.DataFrame),
},
config_schema={"run_id_training": Field(str, is_required=True)},
)
def impute_missing_application(
context,
df: pd.DataFrame,
):
io_manager = context.resources.io_manager
df_imputed = df.copy()
run_id_training = context.op_config['run_id_training']
storage_path = io_manager.base_dir
op_name = 'impute_missing_training'
output_name = 'imputer_dict'
filepath = Path(storage_path).joinpath(run_id_training, op_name, output_name)
with open(filepath, io_manager.read_mode) as read_obj:
imputer_dict = pickle.load(read_obj)
for column, imputer in imputer_dict.items():
df_imputed[column] = imputer.transform(df_imputed[[column]])
return df_imputed
In this example I don't need to provide the column names as part of the second op config because I can access them via the keys in the dictionary. However, there are other cases when this doesn't work and I'd need to provide the same config to the first and the second pipeline.
I'm not happy with having to load the previous outputs into the second op as part of the op logic. My solution also wouldn't work with another IO manager which would be nice, given that the IO manager used for both ops is the same. I think an approach with assets sounds reasonable to me, e.g. by providing the name of an asset as part of the decorator logic to the second op. However, I haven't figured out all of the functionality of assets yet.run_resource
. A config parameter of this resource would be the id of the run whose results it's supposed to provide. These results could then be accessed within an op in the downstream pipeline via context.resources.run_resource.op_name.output_name
. This output would automatically be loaded (lazily) in the downstream pipeline with the same IO manager used to write it in the upstream run by saving this information in the metadata of the upstream run, and making it available to the resource.
The resource could also contain the run_config
used for the run and be accessed via run_resource.run_config
.sandy
05/13/2022, 12:22 AMDaniel Michaelis
05/13/2022, 11:52 AMsandy
05/13/2022, 11:02 PMcontext.instance.get_run_by_id(...).run_config
to access the record in run DB. However, the API is not 100% public: https://github.com/dagster-io/dagster/blob/master/python_modules/dagster/dagster/core/instance/__init__.py#L694. If you'd be interested in filing an issue with this request, it would help us track it and potentially add it in the future.
• For running Dagster jobs in a lightweight way, have you looked at execute_in_process
?Daniel Michaelis
05/16/2022, 4:23 PMdagster.core.errors.DagsterInvariantViolationError: Attempting to access asset_key, but it was not provided when constructing the OutputContext
from dagster import AssetGroup, asset, repository, SourceAsset, AssetKey
@asset
def upstream_asset() -> list[int]:
return [1, 2, 3]
@asset
def downstream_asset(upstream_asset: list[int]) -> list[int]:
return upstream_asset + [4]
asset_group = AssetGroup([upstream_asset, downstream_asset])
upstream_job = asset_group.build_job(name="upstream_job", selection=['upstream_asset'])
downstream_job = asset_group.build_job(name="downstream_job", selection=['downstream_asset'])
# upstream_source_asset = SourceAsset(key=AssetKey("upstream_asset"))
# upstream_asset_group = AssetGroup(assets=[upstream_asset])
# downstream_asset_group = AssetGroup(assets=[downstream_asset], source_assets=[upstream_source_asset])
@repository
def example_repo():
return [upstream_job, downstream_job]
# return [upstream_asset_group, downstream_asset_group]
The commented out code is another attempt I made but this doesn't give me separate jobs with their own logic and lineage graphs, as I'd like it.
• If the upstream asset was materialized several times, can I configure which of the materializations to use as input for the downstream asset (in particular if I don't want to use the latest upstream materialization run)? E.g. by providing a run ID (of the upstream run) as configuration to the materialization run?
• Also, how would I define an op that returns two assets which can be reused separately? I'd like to have such an op in the upstream job, and use one of the output assets in the upstream job and the other in the downstream job. Is that possible with the asset decorator?
• Thanks, I tried execute_in_process
and it seems to be doing what I expect. Previously I thought runs started this way would also be historized.
• I didn't have time to have a look at context.instance.get_run_by_id(...).run_config
, it seems to be more or less what I expect as well. I'll try to have a closer look tomorrow.sandy
05/16/2022, 4:43 PMDaniel Michaelis
05/20/2022, 9:42 AMCould not launch run
Response not successful: Received status code 400
There's also a pop up about an unexpected GraphQL error with the following information:
Operation name: LaunchPipelineExecution
Message: Runtime Object type "InvalidSubsetError" is not a possible type for "LaunchRunResult".
Path:
Locations: [{"line":2,"column":3}]
• I also had a look at the context.instance.get_run_by_id(...).run_config
. It works so far and I was thinking about your suggestion to write an issue, but before I do that I'd like to understand better how this config could be used and where it could be read from. For instance, for my use case a preferable option would be to provide the config at job definition, e.g. by writing a graph and supplying the config of the previous run in the to_job()
call. My downstream job is supposed to completely reuse the config of the upstream job, which is why I'd like to define this dependency at a single position only. Therefore I'm still wondering if my previous suggestion to provide some sort of previous_run_resource
could be a way to solve this, more or less like this:
from dagster import previous_run_resource, ...
@graph
def my_graph():
...
downstream_job = my_graph.to_job(
resource_defs={'previous_run_resource': previous_run_resource},
config=previous_run_resource.run_config,
)
The only configuration parameter this resource would require would be the run id of the upstream run. It could also address my job input/output interdependency problem (though I'm completely open to model it with assets), and I can think of other use cases (such as configuring a downstream job based on other metadata of an upstream run / success hooks, reusing IO managers, log and performance summaries of upstream runs, ...).
I'm interested to hear your thoughts about that 🙂number_list
in the downstream graph), I rewrote it as follows:
from dagster import (
AssetGroup,
AssetsDefinition,
GraphIn,
GraphOut,
In,
Out,
graph,
op,
repository,
)
@op(
out={
"number_list": Out(list[int]),
"greeting_string": Out(str),
},
)
def upstream_op() -> tuple[list[int], str]:
number_list = [1, 2, 3]
greeting_string = "Hello"
return number_list, greeting_string
@op(ins={"number_list": In(list[int])}, out={"number_list_longer": Out(list[int])})
def downstream_op(number_list) -> list[int]:
number_list_longer = number_list + [4]
return number_list_longer
@graph(out={"number_list": GraphOut(), "greeting_string": GraphOut()})
def upstream_graph():
number_list, greeting_string = upstream_op()
return number_list, greeting_string
@graph(ins={"number_list": GraphIn()}, out={"number_list_longer": GraphOut()})
def downstream_graph(number_list):
number_list_longer = downstream_op(number_list)
return number_list_longer
upstream_assets = AssetsDefinition.from_graph(graph_def=upstream_graph)
downstream_asset = AssetsDefinition.from_graph(graph_def=downstream_graph)
asset_group = AssetGroup([upstream_assets, downstream_asset])
upstream_job = asset_group.build_job(
name="upstream_job", selection=["number_list", "greeting_string"]
)
downstream_job = asset_group.build_job(
name="downstream_job", selection=["number_list_longer"]
)
@repository
def example_repo():
return [upstream_job, downstream_job]
Maybe you could also help me with the following:
• Now the downstream op requires a config entry ops.downstream_graph.inputs.number_list
, i.e. the dependency from the upstream job/assets is not inferred. What do I need to change, so this config doesn't need to be provided?
• I have to provide the exact names of the assets in the selection (e.g. selection=["number_list", "greeting_string"]
. Is there a way to change it, so I can instead just write selection=['upstream_assets']
?
• When I untoggle the Asset Graph view the graph inputs and outputs are only shown as Any
. When I expand the graph, the actual types of op inputs and outputs are shown. Where can I define the data types for graph inputs and outputs, so they're also shown in the unexpanded view?sandy
05/20/2022, 6:18 PM• Now the downstream op requires a config entryThis is a bug that @owen is currently working on fixing. It should get fixed in next week's release., i.e. the dependency from the upstream job/assets is not inferred. What do I need to change, so this config doesn't need to be provided?ops.downstream_graph.inputs.number_list
• I have to provide the exact names of the assets in the selection (e.g.Not currently, but we'd like to enable this: by allowing you to apply a common tag or asset key prefix when you call. Is there a way to change it, so I can instead just writeselection=["number_list", "greeting_string"]
?selection=['upstream_assets']
from_graph
and then for you to be able to specify that tag in your selection
. Would that work for you?
• When I untoggle the Asset Graph view the graph inputs and outputs are only shown asThe way that I think it probably makes the most sense for this to work is for us to just show the types from the ops in the unexpanded view. Would that work for you?. When I expand the graph, the actual types of op inputs and outputs are shown. Where can I define the data types for graph inputs and outputs, so they're also shown in the unexpanded view?Any
Daniel Michaelis
06/02/2022, 7:27 AMsandy
06/02/2022, 4:26 PMFor point 3, I'm not sure if I understand you correctly. Do you mean that the types from the unexpanded view would be inferred in the expanded view, so they would also be shown in the expanded view?Exactly