https://dagster.io/ logo
#dagster-support
Title
# dagster-support
d

Daniel Michaelis

05/11/2022, 1:20 PM
Hi, I have two pipelines and I want to load the output of an op from the first pipeline as input for another op in the second pipeline, using the same IO manager. I don't want to connect the pipelines by e.g. a sensor, but instead give the run ID of the first pipeline run as config to the second pipeline run. What's the most elegant way to do this, and is this concept already covered somewhere in the docs (I couldn't find it yet)? In addition, the second pipeline should reuse some of the configuration of the first pipeline, so I don't want to rewrite it but just pass/load it, ideally also just by providing the run ID of the first run as part of the config for the second pipeline. Is there an out-of-the-box way to do this?
y

yuhan

05/11/2022, 9:00 PM
For managing cross-pipeline dependencies, I’d actually recommend using asset sensors. Phil had an answer to a similar question recently: https://dagster.slack.com/archives/C01U954MEER/p1651762959644979?thread_ts=1651756747.961799&cid=C01U954MEER
cc @owen /@sandy for non-sensor approach — I believe we could model it using assets.
s

sandy

05/11/2022, 9:12 PM
Hey @Daniel Michaelis - are the pipelines partitioned?
The short answer is that there's no out-of-the-box way to do this, but I might be able to help you figure out a way to make it work. Is it resource configuration that you want to share?
d

Daniel Michaelis

05/12/2022, 8:35 AM
Hi, no the pipelines are not partitioned. Actually the first pipeline is for model development (including fitting of certain transformers, e.g. imputation, scaling, one hot encoding) and the other pipeline is for the application of the previously fitted model and transformers to new data. After the first pipeline was run, it should be possible to run the second pipeline after a previously undefined period of time, e.g. once a new row is added to the data, or by manually starting a batch process for many new data rows. The configuration I'd like to share is mainly op configuration (e.g. the column names to which a transformation method should be applied). However, it could also be resource config, e.g. a global configuration which is shared among different ops within a pipeline (using
make_values_resource
). As an example, the imputation op in the first pipeline would be called
impute_missing_training
which returns a dictionary of columns as keys and fitted imputers as values. This should be loaded into an op in the second pipeline named
impute_missing_application
. Currently I'm using a hack that works for the `fs_io_manager`:
Copy code
@op(
    out={
        "df_imputed": Out(pd.DataFrame),
        "imputer_dict": Out(dict),
    },
    config_schema={"imputation_mapping": Field(dict, is_required=False)},
)
def impute_missing_training(
    context,
    df: pd.DataFrame,
):
    imputation_mapping = context.op_config.get("imputation_mapping", {})
    df_imputed = df.copy()
    imputer_dict = {}
    for column, params in imputation_mapping.items():
        if params.get("missing_values") == "None":
            params["missing_values"] = None

        imputer = SimpleImputer(**params)
        df_imputed[column] = imputer.fit_transform(df_imputed[[column]])
        imputer_dict[column] = imputer

    return df_imputed, imputer_dict


@op(
    out={
        "df_imputed": Out(pd.DataFrame),
    },
    config_schema={"run_id_training": Field(str, is_required=True)},
)
def impute_missing_application(
    context,
    df: pd.DataFrame,
):

    io_manager = context.resources.io_manager
    df_imputed = df.copy()
    run_id_training = context.op_config['run_id_training']

    storage_path = io_manager.base_dir
    op_name = 'impute_missing_training'
    output_name = 'imputer_dict'

    filepath = Path(storage_path).joinpath(run_id_training, op_name, output_name)

    with open(filepath, io_manager.read_mode) as read_obj:
        imputer_dict = pickle.load(read_obj)

    for column, imputer in imputer_dict.items():
        df_imputed[column] = imputer.transform(df_imputed[[column]])

    return df_imputed
In this example I don't need to provide the column names as part of the second op config because I can access them via the keys in the dictionary. However, there are other cases when this doesn't work and I'd need to provide the same config to the first and the second pipeline. I'm not happy with having to load the previous outputs into the second op as part of the op logic. My solution also wouldn't work with another IO manager which would be nice, given that the IO manager used for both ops is the same. I think an approach with assets sounds reasonable to me, e.g. by providing the name of an asset as part of the decorator logic to the second op. However, I haven't figured out all of the functionality of assets yet.
I could also imagine that one way to do this is by providing a resource to the second (downstream) pipeline which contains the results of a previous (upstream) pipeline run (of a different pipeline in my case), say we call it
run_resource
. A config parameter of this resource would be the id of the run whose results it's supposed to provide. These results could then be accessed within an op in the downstream pipeline via
context.resources.run_resource.op_name.output_name
. This output would automatically be loaded (lazily) in the downstream pipeline with the same IO manager used to write it in the upstream run by saving this information in the metadata of the upstream run, and making it available to the resource. The resource could also contain the
run_config
used for the run and be accessed via
run_resource.run_config
.
s

sandy

05/13/2022, 12:22 AM
Hey Daniel - today has been a bit of a whirlwind, so I haven't had a chance to take a look at this, but I'm planning to take a look and try to get back to you on this tomorrow
d

Daniel Michaelis

05/13/2022, 11:52 AM
Hey Sandy, no worries. I came across a similar issue suggesting a Python API to reference previous run outputs. It's not the exact same thing because in my use case I'm trying to access the results from within Dagster, but I thought it worth linking here: https://github.com/dagster-io/dagster/issues/4488 As a side note: I'm wondering what would be the right way to run Dagster jobs in a "lightweight" way. I'd like to write an API that runs a preconfigured job, takes as input the input of its first op and yields as output the output of its last op without historizing any run metadata, logs or intermediate results. In my previously described use case I'd like my second/downstream pipeline to run this way (so as I mentioned this job would need access to the results of the first/upstream job). Possibly I'd like this lightweight job to run several hundreds of times a day.
s

sandy

05/13/2022, 11:02 PM
Finally able to get to this. Here are my takes: • If you want to have two separate jobs where the inputs to one of the jobs come from the output of the other job, and the same IO manager across those edges, software-defined assets are likely the right way to go. • We don't have a great way to automatically take the config from one run and use it in a run of a downstream job. The best that I can think of would be to use fetch the config off of the run object in the database. You can use
context.instance.get_run_by_id(...).run_config
to access the record in run DB. However, the API is not 100% public: https://github.com/dagster-io/dagster/blob/master/python_modules/dagster/dagster/core/instance/__init__.py#L694. If you'd be interested in filing an issue with this request, it would help us track it and potentially add it in the future. • For running Dagster jobs in a lightweight way, have you looked at
execute_in_process
?
d

Daniel Michaelis

05/16/2022, 4:23 PM
Hi @sandy, thanks for the further replies 🙂 It's been a great help already but I'm still struggling with some of it. • I've been trying quite a few different things with software-defined assets, but I can't make it work the way I'd like it. I'd like to have the logic of the upstream job and the downstream job strictly separated, so the upstream job won't be part of the downstream job's lineage (except for the input asset (SourceAsset?)). For the code below I can successfully materialize the upstream asset, but for the downstream asset I get the following error:
dagster.core.errors.DagsterInvariantViolationError: Attempting to access asset_key, but it was not provided when constructing the OutputContext
Copy code
from dagster import AssetGroup, asset, repository, SourceAsset, AssetKey


@asset
def upstream_asset() -> list[int]:
    return [1, 2, 3]


@asset
def downstream_asset(upstream_asset: list[int]) -> list[int]:
    return upstream_asset + [4]


asset_group = AssetGroup([upstream_asset, downstream_asset])
upstream_job = asset_group.build_job(name="upstream_job", selection=['upstream_asset'])
downstream_job = asset_group.build_job(name="downstream_job", selection=['downstream_asset'])

# upstream_source_asset = SourceAsset(key=AssetKey("upstream_asset"))
# upstream_asset_group = AssetGroup(assets=[upstream_asset])
# downstream_asset_group = AssetGroup(assets=[downstream_asset], source_assets=[upstream_source_asset])


@repository
def example_repo():
    return [upstream_job, downstream_job]
    # return [upstream_asset_group, downstream_asset_group]
The commented out code is another attempt I made but this doesn't give me separate jobs with their own logic and lineage graphs, as I'd like it. • If the upstream asset was materialized several times, can I configure which of the materializations to use as input for the downstream asset (in particular if I don't want to use the latest upstream materialization run)? E.g. by providing a run ID (of the upstream run) as configuration to the materialization run? • Also, how would I define an op that returns two assets which can be reused separately? I'd like to have such an op in the upstream job, and use one of the output assets in the upstream job and the other in the downstream job. Is that possible with the asset decorator? • Thanks, I tried
execute_in_process
and it seems to be doing what I expect. Previously I thought runs started this way would also be historized. • I didn't have time to have a look at
context.instance.get_run_by_id(...).run_config
, it seems to be more or less what I expect as well. I'll try to have a closer look tomorrow.
s

sandy

05/16/2022, 4:43 PM
yikes - that error you're hitting is a bug. I will look into it
@sean is currently working on making config work with software-defined assets
d

Daniel Michaelis

05/20/2022, 9:42 AM
Hi @sandy, thanks for fixing the bug! • I just updated to 0.14.16 and launching the run via the Launchpad is working. However, I'm now receiving a new error when I try to materialize the downstream asset in Dagit using the "Rematerialize All" or "Rematerialize Selected" buttons:
Copy code
Could not launch run
Response not successful: Received status code 400
There's also a pop up about an unexpected GraphQL error with the following information:
Copy code
Operation name: LaunchPipelineExecution

Message: Runtime Object type "InvalidSubsetError" is not a possible type for "LaunchRunResult".

Path: 

Locations: [{"line":2,"column":3}]
• I also had a look at the
context.instance.get_run_by_id(...).run_config
. It works so far and I was thinking about your suggestion to write an issue, but before I do that I'd like to understand better how this config could be used and where it could be read from. For instance, for my use case a preferable option would be to provide the config at job definition, e.g. by writing a graph and supplying the config of the previous run in the
to_job()
call. My downstream job is supposed to completely reuse the config of the upstream job, which is why I'd like to define this dependency at a single position only. Therefore I'm still wondering if my previous suggestion to provide some sort of
previous_run_resource
could be a way to solve this, more or less like this:
Copy code
from dagster import previous_run_resource, ...


@graph
def my_graph():
    ...


downstream_job = my_graph.to_job(
    resource_defs={'previous_run_resource': previous_run_resource},
    config=previous_run_resource.run_config,
)
The only configuration parameter this resource would require would be the run id of the upstream run. It could also address my job input/output interdependency problem (though I'm completely open to model it with assets), and I can think of other use cases (such as configuring a downstream job based on other metadata of an upstream run / success hooks, reusing IO managers, log and performance summaries of upstream runs, ...). I'm interested to hear your thoughts about that 🙂
Sorry for the many and long messages but I've continued trying to build a minimum example with assets for my use case. As I want ops which return multiple assets instead of only one (out of which I only want to reuse
number_list
in the downstream graph), I rewrote it as follows:
Copy code
from dagster import (
    AssetGroup,
    AssetsDefinition,
    GraphIn,
    GraphOut,
    In,
    Out,
    graph,
    op,
    repository,
)


@op(
    out={
        "number_list": Out(list[int]),
        "greeting_string": Out(str),
    },
)
def upstream_op() -> tuple[list[int], str]:
    number_list = [1, 2, 3]
    greeting_string = "Hello"
    return number_list, greeting_string


@op(ins={"number_list": In(list[int])}, out={"number_list_longer": Out(list[int])})
def downstream_op(number_list) -> list[int]:
    number_list_longer = number_list + [4]
    return number_list_longer


@graph(out={"number_list": GraphOut(), "greeting_string": GraphOut()})
def upstream_graph():
    number_list, greeting_string = upstream_op()
    return number_list, greeting_string


@graph(ins={"number_list": GraphIn()}, out={"number_list_longer": GraphOut()})
def downstream_graph(number_list):
    number_list_longer = downstream_op(number_list)
    return number_list_longer


upstream_assets = AssetsDefinition.from_graph(graph_def=upstream_graph)
downstream_asset = AssetsDefinition.from_graph(graph_def=downstream_graph)


asset_group = AssetGroup([upstream_assets, downstream_asset])
upstream_job = asset_group.build_job(
    name="upstream_job", selection=["number_list", "greeting_string"]
)
downstream_job = asset_group.build_job(
    name="downstream_job", selection=["number_list_longer"]
)


@repository
def example_repo():
    return [upstream_job, downstream_job]
Maybe you could also help me with the following: • Now the downstream op requires a config entry
ops.downstream_graph.inputs.number_list
, i.e. the dependency from the upstream job/assets is not inferred. What do I need to change, so this config doesn't need to be provided? • I have to provide the exact names of the assets in the selection (e.g.
selection=["number_list", "greeting_string"]
. Is there a way to change it, so I can instead just write
selection=['upstream_assets']
? • When I untoggle the Asset Graph view the graph inputs and outputs are only shown as
Any
. When I expand the graph, the actual types of op inputs and outputs are shown. Where can I define the data types for graph inputs and outputs, so they're also shown in the unexpanded view?
s

sandy

05/20/2022, 6:18 PM
• Now the downstream op requires a config entry
ops.downstream_graph.inputs.number_list
, i.e. the dependency from the upstream job/assets is not inferred. What do I need to change, so this config doesn't need to be provided?
This is a bug that @owen is currently working on fixing. It should get fixed in next week's release.
• I have to provide the exact names of the assets in the selection (e.g.
selection=["number_list", "greeting_string"]
. Is there a way to change it, so I can instead just write
selection=['upstream_assets']
?
Not currently, but we'd like to enable this: by allowing you to apply a common tag or asset key prefix when you call
from_graph
and then for you to be able to specify that tag in your
selection
. Would that work for you?
• When I untoggle the Asset Graph view the graph inputs and outputs are only shown as
Any
. When I expand the graph, the actual types of op inputs and outputs are shown. Where can I define the data types for graph inputs and outputs, so they're also shown in the unexpanded view?
The way that I think it probably makes the most sense for this to work is for us to just show the types from the ops in the unexpanded view. Would that work for you?
d

Daniel Michaelis

06/02/2022, 7:27 AM
Hi Sandy, sorry for the late reply. I've been quite busy and haven't had time to thoroughly check your suggestions. For the first point, I saw that some issues were closed that were related, so I'll try it once I have time. For point 2, yes that sounds reasonable to me. For point 3, I'm not sure if I understand you correctly. Do you mean that the types from the unexpanded view would be inferred in the expanded view, so they would also be shown in the expanded view? I think this also sounds like the best solution as far as I can judge. I'll be on holidays now, so I won't be able to reply for a while again. But thanks so much for your help. For my previous use case, I've decided to stick with regular ops and write myself a resource that can provide the previous outputs, but I'll switch to assets in the future step by step.
s

sandy

06/02/2022, 4:26 PM
For point 3, I'm not sure if I understand you correctly. Do you mean that the types from the unexpanded view would be inferred in the expanded view, so they would also be shown in the expanded view?
Exactly
3 Views