Hi :dagster:, I am working on creating a simple M...
# ask-community
d
Hi D, I am working on creating a simple MySQL IO manager and need help passing the partition column name (
partition_expr
) to the IO Manager I am re-using the snowflake_io_manager as the base from the Fully Featured Project. This IO manager is working great and I am even able to use the partitioning. Just require guidance on how to pass the value for
partition_expr
to the IO manager. This is so I can use it in the
_time_window_where_clause
. The
context
has the partition keys or the time window but I am not able to find the
partition_expr
value. From reviewing other IO managers (Duckdb, Snowflake, etc) I understand that
TableSlice
is being used in the IO managers to pass
partition_expr
to the `_time_window_where_clause`function but I don't fully understand how to correctly use this in my IO manager. Any help will be appreciated
j
hey @Dipesh Kumar are you trying to figure out how to specify
partition_expr
on your asset, or how to retrieve the value in the IO manager itself? For the first one, you do it like this
Copy code
@asset(
    metadata={"partition_expr": "MY_COLUMN"}
)
def my_asset():
    ...
d
Hi @jamie, I am looking for the latter. How to retrieve the
partition_expr
value in the IO Manager
j
Here’s where we do that for the existing DB IO managers (which the snowflake IO manager is) https://github.com/dagster-io/dagster/blob/master/python_modules/dagster/dagster/_core/storage/db_io_manager.py#LL173C1-L174C1 and then accessing that particular metadata field https://github.com/dagster-io/dagster/blob/master/python_modules/dagster/dagster/_core/storage/db_io_manager.py#L195 . Note that in your
handle_output
function, the
context
that is passed by dagster is an
OutputContext
but in the
load_input
function, the
context
is an
InputContext
. For that case you will need to get the output context using
context.upstream_output
d
Hi @jamie, Thanks for the clarification , I understand now. I hadn't considered how to get this in the
load_input
I can get the metadata value from the Output
context
output_context_metadata = output_context.metadata or {}
partition_expr = output_context_metadata.get("partition_expr")
I can now make my io manager abit more generic
j
yep! let me know if you run into any other questions!
👍 1
m
Hey @jamie - thanks for the above links. I am working on an IO manager myself and I found the code very informative. Maybe a higher level question behind this but I note that in the code for generating a table slice when the context does not have an
asset_key
(ie. it is an OP output) there is no real logic there for resolving partitions - could you explain why that is I wonder? I'm building a single IO manager to handle op IO as well as asset IO for context
j
the main reason for that is that op partitions are a bit trickier. So taking databases and partitions out of it (let’s say we’re storing in a local file system) - assets and ops are stored very differently. the output for each op is stored in a unique location keyed by run id. So the first run of an op would store an output at
1/my_op/result
and the second run would store at
2/my_op/result
. For assets, the same location is overwritten for each run. And that has implications for partitions. For an op you might end up with all of these files
1/my_op/result_2023_05_06
1/my_op/result_2023_05_07
2/my_op/result_2023_05_06
but for an asset you would only have one file per partition per asset. Trying to then translate that to a database gets tricky - do we make a new schema for each run and then add partitions to the table like we would in an asset-centric world? do we make a new table for each partition? do we adopt the asset approach entirely and just keep one copy of the data? it’s tough to decide which of these approaches is the “correct” approach, and from what i can tell, most users of the database IO managers are using assets. so we haven’t prioritized filling this gap.
m
Brilliant @jamie thanks for that extra colour! It makes me rethink my approach though! I essentially treat ops and assets the same with my io manager (once I have parsed a location from the context object). By that I mean that I write files that are produced from an op without run id. The same for tables - I can configure the behaviour on my IO manager to be append only / overwrite for op outputs but default would be to recreate the table. I do see how that appears to be exactly what assets are supposed to be but I need to be able to let users select a subset of assets in dagit and "stub" upstream assets via config and I can only seem to achieve that b using ops.
j
ah yeah that makes sense. i think it’s totally fine to write a custom I/O manager that essentially treats the storage of ops and assets the same so that you can take advantage of stubbed values
D 1
👍 1