I have an IO manager which is a bit more complex a...
# ask-community
g
I have an IO manager which is a bit more complex and wonder how to best structure the state handling vs. business logic. In particular as the specific asset is about SCD2 (tracking changes without a CDC changelog to derive them from and relatively low volume - approx 500MB zipped CSV daily) I wonder where to best intertwine the state handling logic of the UPSERTs / MERGE INTO with the business logic. - step 1: store partitioned table - step 2: SCD2 UPSERT using MERGE INTO - if not exists insert initial - in case it exists UPSERT - local: check local file system - S3: check bucket file system in object store if it exists - step 3: optimize (ZORDER) and delete small files - step 4: delete old history (0) I am using spark with delta lake here (but the specific technological choices should not really matter). A cleaned daily partition is first stored (immutable blob storage) and then the changes are UPSERTEd into a deltalake file (mutable parquet). This leaves two questions open: - as written: Where to split the business logic vs. IO/state handling (i.e. asset vs. IO manager) - backfills (how to nicely handle stateful backfills - you cannot simply overwrite a partition)
🤖 1
In particular I am also very interested in learning how do this in a reusable way. Usually, an IO manager is super reusible/exchangable and suitable for many assets. My custom SCD2 IO manager is highly customized for the UPSERT / MERGE INTO conditions and only suitable for a particular asset (which I do not really like).
d
I have an IO manager that puts things in postgres tables and has to handle create/upsert/truncate and then insert. I can't say I've done it in the best possible way, but here is how I do it. • My `IOManager`has an init config that is used to determine insert mode (truncate vs. upsert) and some other details like what schema to put the object in • In
handle_output
I handle the cases of needing to create the destination table, and if extra information is needed I attach metadata to the asset Moving all of this into my
IOManager
made my assets much simpler to write. For the second question, I am not sure if you can detect the difference between backfill and a partitioned run
g
This sounds similar to what I have done (and I am not really happy so far) as my IO manager is very customized for a single asset (and not reusable). Is yours applicable for many? Potentially the config could be specified in some kind of higher-order-function but so far dagster only wants scalar values in the configuration (not functions).
d
Mine works for ~100 different assets with a variety of different types. Can you give an example of the difference between two asset types that don't "fit" in the same IO manager?
g
The 2nd question was more around idempotency than backfills (I just realized). In particular when reloading data.
I am currently using the
MERGE INTO
syntax from SQL/ python pyspark deltalake. And so far found no nice way to not hardcode the conditions as the configuration of dagster does not support higher order functions.
d
I'm only familiar with the SQL flavor, but are you trying to use basically membership (like equality based on primary key values), or is it really "freeform" conditions?
g
no equi-joins when the key matches and key was not present: insert, when it pre-exists expire old record (SCD2) and insert fresh one. When the key is missing delete (expire record)
d
https://codereview.stackexchange.com/questions/193655/merge-table-stored-procedure is a version that I wrote a long time ago that basically does what you want (unsure how to translate it to pyspark, but it should be possible)
You should be able to put something like that in your IOManager
g
this sounds interesting
d
In my case, if I have a set of keys for an asset, I just attach those as metadata on the output, and then the IO manager converts that into the upsert query
g
This should be doable - at least for the keys. And with a bit of meta programming perhaps also for the other conditions.
But would still (at least for me so far) leave out the considerations around idempotence
d
Do you want re-running a partition to be idempotent or not?
g
exactly. Assuming bad data was delivered yesterday I somehow would like to have the IO manager delete the old state and then recreate valid state with the new data /asset arriving today.
But this is a bit tricky for stateful assets like SCD2 (not as simple as deleting the partition) of daily delivered data.
d
so you want to "undo" the SCD2 and then redo the inserts?
g
yes
d
I think you might be able to detect the backfill by looking at the tags on (Dagster puts a
Backfill
tag on runs kicked off by a backfill), and then you can detect that in your IOManager and "do the right thing".
y
I’d recommend the same as Daniel said. When I think of state handling vs business logic, the exercise I usually do is to think of “what I don’t want to change in testing, re-executing, and backfilling”, and that will be my “business logic”. So in this case, I could think of in unit test, you may want to process the data and store the data either in a temp file or in a in-memory variable (step 1), and abstract all step 2 to 4 to a mock (no db interaction at all). in re-execution and backfilling, you may want to undo all the db interactions as well. So my recommendation is to have a “if backfill” or “if rerun” logic in your IO Manager and keep the asset simple.
as for reusing IO Manager, I believe parts of the IO handling would be common but a small part will be this case specific. So you can configure your IOManager using resource config as Daniel suggested, or have something like
Copy code
class CommonIOManager(IOManager):
    def handle_output(self, context, obj):
        ...


    def load_input(self, context):
        ...

class SpecialIOManager(CommonIOManager):
    def handle_output(self, context, obj):
        # find out if backfill
        if context.step_context.get_tags("dagster/backfill"):
            self._delete()
            ... # do something when it's in a backfill

        super().handle_output(self, context, obj)


    def load_input(self, context):
        ...

        super().load_input(self, context)

    def _delete(self):
        ...

@io_manager
def common_io_manager(_):
    return CommonIOManager()


@io_manager
def special_io_manager(_):
    return SpecialIOManager()
You can detect a backfill run using
context.step_context.get_tags("dagster/backfill")
in an IO manager.
123 Views