Averell
09/29/2022, 12:07 PMclaire
09/29/2022, 6:02 PMbefore_2022 = SourceAsset(
key=AssetKey("before_2022"), io_manager_key="before_2022_io", partitions_def=partitions_def_1
)
start_2022 = SourceAsset(
key=AssetKey("start_2022"), io_manager_key="start_2022_io", partitions_def=partitions_def_2
)
class Before2022IOManager(IOManager):
def handle_output(self, context, obj):
pass
def load_input(self, context):
...
class Start2022IOManager(IOManager):
def handle_output(self, context, obj):
pass
def load_input(self, context):
...
@io_manager
def before_2022_io(_):
return Before2022IOManager()
@io_manager
def start_2022_io(_):
return Start2022IOManager()
With regards to having a partitions definition for partitions before 2022-01-01, you can subclass DailyPartitionsDefinition
to create this partitions definitionAverell
09/30/2022, 4:01 AM@asset
def downstream(before_2022: SparkDF, start_2022: SparkDF):
upstream = before_2022.unionByName(start_2022)
...
claire
09/30/2022, 9:54 PMBefore2022IOManager
and Start2022IOManager
) to return the full dataframe in load_input
, then your downstream asset will accept both full dataframes. You can confirm this by doing something like:
@asset
def my_asset(context, before_2022, start_2022):
<http://context.log.info|context.log.info>(context.asset_partition_keys_for_input("before_2022"))
<http://context.log.info|context.log.info>(context.asset_partition_keys_for_input("start_2022"))
to confirm that the downstream asset will fetch all partitions of each upstream asset.Averell
09/30/2022, 10:31 PMclaire
09/30/2022, 10:33 PM@asset(
partitions_def=....
ins={
"before_2022": AssetIn(partition_mapping=AllPartitionMapping()),
"start_2022": AssetIn(partition_mapping=AllPartitionMapping()),
}
)
def my_asset(context, before_2022, start_2022):
<http://context.log.info|context.log.info>(context.asset_partition_keys_for_input("before_2022"))
<http://context.log.info|context.log.info>(context.asset_partition_keys_for_input("start_2022"))
Averell
09/30/2022, 10:34 PM