geoHeil
09/05/2022, 1:33 PMDaniel Mosesson
09/05/2022, 1:53 PMhandle_output
I handle the cases of needing to create the destination table, and if extra information is needed I attach metadata to the asset
Moving all of this into my IOManager
made my assets much simpler to write.
For the second question, I am not sure if you can detect the difference between backfill and a partitioned rungeoHeil
09/05/2022, 1:55 PMDaniel Mosesson
09/05/2022, 1:56 PMgeoHeil
09/05/2022, 1:56 PMMERGE INTO
syntax from SQL/ python pyspark deltalake. And so far found no nice way to not hardcode the conditions as the configuration of dagster does not support higher order functions.Daniel Mosesson
09/05/2022, 1:59 PMgeoHeil
09/05/2022, 2:00 PMDaniel Mosesson
09/05/2022, 2:02 PMgeoHeil
09/05/2022, 2:07 PMDaniel Mosesson
09/05/2022, 2:13 PMgeoHeil
09/05/2022, 2:18 PMDaniel Mosesson
09/05/2022, 2:19 PMgeoHeil
09/05/2022, 2:20 PMDaniel Mosesson
09/05/2022, 2:21 PMgeoHeil
09/05/2022, 2:21 PMDaniel Mosesson
09/05/2022, 2:40 PMBackfill
tag on runs kicked off by a backfill), and then you can detect that in your IOManager and "do the right thing".yuhan
09/06/2022, 10:08 PMclass CommonIOManager(IOManager):
def handle_output(self, context, obj):
...
def load_input(self, context):
...
class SpecialIOManager(CommonIOManager):
def handle_output(self, context, obj):
# find out if backfill
if context.step_context.get_tags("dagster/backfill"):
self._delete()
... # do something when it's in a backfill
super().handle_output(self, context, obj)
def load_input(self, context):
...
super().load_input(self, context)
def _delete(self):
...
@io_manager
def common_io_manager(_):
return CommonIOManager()
@io_manager
def special_io_manager(_):
return SpecialIOManager()
context.step_context.get_tags("dagster/backfill")
in an IO manager.