https://dagster.io/ logo
#ask-community
Title
# ask-community
d

Danny Steffy

03/06/2023, 7:03 PM
we have an IO manager that we want to configure in a specific op that is used in an graph-backed asset. Is there a way to pass in the config to the io manager in the op?
we're using AssetsDefinition.from_graph to define the asset, would I need to define the io manager's config in the resource_defs param?
I tried this:
Copy code
graph_asset = AssetsDefinition.from_graph(
    full_scored_data_set,
    keys_by_input_name={
        "recruiter_teams_to_score": AssetKey("recruiter_teams_to_score"),
        "trained_model": AssetKey("trained_model"),
    },
    resource_defs={
        "merge_sproc_io_manager": sql_merge_sproc_io_manager.configured(
            {
                "sproc_name": "xxxx",
                "db_name": "xxx",
            }
        )
    },
)
but because of the way our project is set up, I can't seem to access that custom IO manager in my assets folder. Is there some way to get that resource definition from dagster's context so I can configure it?
ah we don't want to do configured because it's not in the init_context, we want to do it at run time for the op. Can I somehow set the Op's run-time output_config so the io manager is able to properly handle the data?
Tried switching the graph definition to this;
Copy code
@graph
def full_scored_data_set(recruiter_teams_to_score, trained_model):
    """score all recruiters"""
    ingest_configured = configured(ingest_data_and_score, name="ingest_configured")(
        "'outputs': {'result': {'sproc_name':'xxx','db_name': 'xxxxx',}}}}}}}"
    )

    result = (
        key_to_score(recruiter_teams_to_score)
        .map(
            lambda key: ingest_configured(
                recruiter_in_batch=key, trained_model=trained_model
            )
        )
        .collect()
    )
    return append_scores(result)
getting this error now:
Copy code
Missing required config entry "ops" at the root. Sample config for missing entry: {'ops': {'full_scored_data_set': {'ops': {'ingest_configured': {'outputs': {'result': {'db_name': '...', 'sproc_name': '...'}}}}}}}
is there somewhere else I should be defining this config?
Looks like I can define it in the launchpad... It'd be nice to have that as the "default" config that is used for the op
c

chris

03/07/2023, 7:17 PM
So are you saying that you want to configure the IO manager differently on a per-op basis? / do you want different config to be available to each call to `load_input`/`handle_output`?
d

Danny Steffy

03/07/2023, 7:23 PM
more specifically available for
handle_output
, yeah
c

chris

03/07/2023, 7:27 PM
Do you just want to handle each output differently based on which output it is? Wondering if config isn’t actually what you want here, but instead provide some output metadata during the op’s execution, and then in your io manager perform some handling based on the metadata value
d

Danny Steffy

03/07/2023, 7:30 PM
we want to reuse this IOManager for different ops that have different landing databases and use a different sproc
c

chris

03/07/2023, 8:23 PM
Yea I think you can just use metadata for that
d

Danny Steffy

03/07/2023, 8:36 PM
so add the metadata to the op and then access it in the context in
handle_output
?
c

chris

03/07/2023, 8:37 PM
In your op, wrap your output object(s) in an
Output
instance, fill out the metadata arg, then in
handle_output
it should be available on
output_context.metadata
So something like this:
Copy code
@op
def my_op():
    return Output("foo", metadata={"db": "users-table"})
and then in your io manager:
Copy code
class MyIOManager(IOManager):
    def handle_output(output_context, obj):
        if output_context.metadata["db"] == "users-table":
            ...
does that fit your use case / make sense?
d

Danny Steffy

03/07/2023, 8:40 PM
yep, that makes a lot of sense. Thanks!
🫡 1
is output_context different from context? can I use both of those in handle_output?
c

chris

03/07/2023, 9:22 PM
just use
context
I probably misremembered the default arg name
d

Danny Steffy

03/07/2023, 9:22 PM
hm I tried using
context.metadata
and I got a key error
context.metadata["db"]
returned a key error
c

chris

03/07/2023, 9:29 PM
Sigh I think that’s a bug - shall fix it shortly but in the meantime try this format instead:
Copy code
@op(out=Out(metadata={...}))
def my_op():
    return whatever
d

Danny Steffy

03/07/2023, 9:30 PM
ah I see
that worked!
thank you for your help!
follow up question on this... is there a way to access metadata in the
load_input
side of the IOManager? And if so, would I just need to provide the metadata as an
In
in the downstream op to send it correctly?
c

chris

03/08/2023, 5:57 PM
input_context.upstream_output.metadata should work
d

Danny Steffy

03/08/2023, 6:09 PM
ah, that way I can define it all in the Out of the op that's upstream?
i.e.
Copy code
out=Out(
        io_manager_key="sql_merge_sproc_table",
        metadata={
            "db_name": "xxxx",
            "sproc_name": "MergeDagsterJson_ProbabilityRecruiterLikeAthleteGivenProfileView_RunStats",
            "schema_name": "dbo",
            "table_name": "ProbabilityRecruiterLikeAthleteGivenProfileView_RunStats",
        },
    ),
c

chris

03/08/2023, 6:09 PM
Yup
d

Danny Steffy

03/08/2023, 6:09 PM
excellent, thanks so much!
6 Views