https://dagster.io/ logo
#ask-community
Title
# ask-community
a

Averell

09/29/2022, 12:07 PM
hi everyone, I have an asset which depends on two (source) assets, one source for all partitions before 2022-01-01, and the other source for all partitions from 2022-01-01 onward. Is there a support for this scenario? I'm thinking of modifying the io_resource which loads the source asset from different locations depending on values of the partition key, but wonder whether we have a built-in support? Thanks!
c

claire

09/29/2022, 6:02 PM
I think this might be the solution that you mentioned, but you could create custom IO managers to load from the source assets:
Copy code
before_2022 = SourceAsset(
    key=AssetKey("before_2022"), io_manager_key="before_2022_io", partitions_def=partitions_def_1
)
start_2022 = SourceAsset(
    key=AssetKey("start_2022"), io_manager_key="start_2022_io", partitions_def=partitions_def_2
)


class Before2022IOManager(IOManager):
    def handle_output(self, context, obj):
        pass

    def load_input(self, context):
        ...


class Start2022IOManager(IOManager):
    def handle_output(self, context, obj):
        pass

    def load_input(self, context):
        ...


@io_manager
def before_2022_io(_):
    return Before2022IOManager()


@io_manager
def start_2022_io(_):
    return Start2022IOManager()
With regards to having a partitions definition for partitions before 2022-01-01, you can subclass
DailyPartitionsDefinition
to create this partitions definition
a

Averell

09/30/2022, 4:01 AM
Thanks Claire. However, I'm more interested in defining the downstream asset. Using that two source assets approach, will I need to something like this?
Copy code
@asset
def downstream(before_2022: SparkDF, start_2022: SparkDF):
    upstream = before_2022.unionByName(start_2022)
    ...
c

claire

09/30/2022, 9:54 PM
The solution I posted above actually will work for the code snippet you posted! By default, if an unpartitioned asset depends on partitioned assets, it will depend on all of the partitions of each upstream asset. So as long as you define your IO managers (in my code snippet above,
Before2022IOManager
and
Start2022IOManager
) to return the full dataframe in
load_input
, then your downstream asset will accept both full dataframes. You can confirm this by doing something like:
Copy code
@asset
def my_asset(context, before_2022, start_2022):
    <http://context.log.info|context.log.info>(context.asset_partition_keys_for_input("before_2022"))
    <http://context.log.info|context.log.info>(context.asset_partition_keys_for_input("start_2022"))
to confirm that the downstream asset will fetch all partitions of each upstream asset.
If you wanted to map the downstream asset to a subset of the upstream partitions, you could define your own PartitionMapping: https://docs.dagster.io/_apidocs/partitions#dagster.PartitionMapping
a

Averell

09/30/2022, 10:31 PM
Thanks Claire. My downstream asset is also partitioned (all 3 assets are daily partitioned. Is partition_mapping the best option for me?
c

claire

09/30/2022, 10:33 PM
Yep, partition mapping is what you're looking for. If you want the downstream asset to depend on all partitions of each upstream asset, you could do something like this:
Copy code
@asset(
    partitions_def=....
    ins={
        "before_2022": AssetIn(partition_mapping=AllPartitionMapping()),
        "start_2022": AssetIn(partition_mapping=AllPartitionMapping()),
    }
)
def my_asset(context, before_2022, start_2022):
    <http://context.log.info|context.log.info>(context.asset_partition_keys_for_input("before_2022"))
    <http://context.log.info|context.log.info>(context.asset_partition_keys_for_input("start_2022"))
🎉 1
a

Averell

09/30/2022, 10:34 PM
Thanks a lot
🌈 1
2 Views