If I use a sensor to produce partitions (file base...
# ask-community
c
If I use a sensor to produce partitions (file based), how should I deal with the downstream asset? Should it be partitioned also and filter out the files of interest in the op code or should I use a partition mapper to narrow down the set of file partitions that the asset should actually process or should the asset not be partitioned and somehow have the iomanager get the upstream current partition (this is what I tried and it does not seem to work)?
Maybe just a link to an example on how this is normally done?
d
I think what you're doing is something close to a pattern that we do. I'm not sure if it's an anti-pattern because it does cause some weird bugs in dagster. But, we will use a global dynamic partition, populate it with a source asset and then have another asset partitioned by it.
i.e.
Copy code
@asset
def groups(context: OpExecutionContext) -> SQL:
    # get the groups
    current_groups = set([group["name"] for group in groups])
    partition_groups = set(
        context.instance.get_dynamic_partitions(groups_partition.name)
    )
    added_groups = current_groups - partition_groups
    removed_groups = partition_groups - current_groups
    <http://context.log.info|context.log.info>(
        f"partition {groups_partition.name}: adding {added_groups} and removing {removed_groups}"
    )
    context.instance.add_dynamic_partitions(groups_partition.name, list(added_groups))
    for group in removed_groups:
        context.instance.delete_dynamic_partition(groups_partition.name, group)

    return SQL("select * from $df", df=df)
and then
Copy code
@asset(partitions_def=groups_partition)
def asset_for_group(context: OpExecutionContext) -> SQL:
c
@Drew You thanks - I'll give it a try!
m
I am getting this error when launching a back fill.
Copy code
dagster._core.errors.DagsterDefinitionChangedDeserializationError: Asset {{assetname}} had a PartitionsDefinition at storage-time, but no longer does
I'm pretty sure I've not changed anything... any ideas? I'm on dagster 1.3.5
c
BTW, I found this example that I am trying to debug and figure out how the last asset gets all the partitions without itself being partitionned: https://github.com/dagster-io/dagster/tree/master/examples/assets_dynamic_partitions Thanks @claire for this example