https://dagster.io/ logo
#ask-community
Title
# ask-community
b

Brian Pohl

06/02/2023, 10:34 PM
hey all, i have a case that i think is good for software-defined assets, but i have yet to dip my toes in them. can you tell me what the best approach for my scenario would be? i have a series of 3 ops, each of which outputs data in S3, and then passes the S3 path to the next step in the series. i plan on using this same series of ops in multiple jobs. each time i run any of these jobs, i want new data put in S3 - not the same data overwritten - unless i had already run that job with the same launch configuration. if i am re-running a job with the same launch configuration, and therefore data would be put in the same place in S3, i’d love if the asset said “hey i already exist!” and automatically skipped to the next step in the series. is this something that is doable with assets? what worries me is that assets are one-to-one with the op, where the op continues to refresh/overwrite a single data asset. in this case, i want a one-to-one with op + launch config + data asset. as i read the docs, everything feels like it could apply to either of these scenarios.
t

Tim Castillo

06/03/2023, 12:11 AM
Is what's happening in the launch config that you're configuring a different destination? ex. you have a different bucket
customer_a
and
customer_b
and you're using the launch config to dictate that? If so, have you thought about using dynamic partitions? With the partition being the destination? That'd be cleanest way for Dagster to keep track of if you've already produced an asset with that config. You'll also still have some wiggle room and be able to make multi-partitioned if you need to.
b

Brian Pohl

06/03/2023, 1:30 AM
That's pretty close but yeah you've got the gist of it. What's really happening is these ops are Kubernetes jobs using Java to transform our data. The input to the launch config is a location in S3 of raw data, plus a few other config parameters. So if someone tries to pass the same input data with the same config parameters - and therefore make the destination the same partition, I guess - what would Dagster do?
@Tim Castillo is there an example of kicking off a job with dynamic partitions that doesn’t use a sensor? i can’t figure out how to manually specify a partition key
@Tim Castillo following up here, sorry for the quick follow. we really want to use this feature but we’re on a tight timeline!
t

Tim Castillo

06/07/2023, 6:59 PM
Oh, hmm. with the additional context you gave, you can do it with dynamic partitions, but it feels more like using a factory pattern that returns back assets + job definitions would work better. Here's an example of how you can implement a factory pattern. would this help? https://github.com/dagster-io/dagster/discussions/11045
b

Brian Pohl

06/07/2023, 7:01 PM
@Tim Castillo thanks for this! i will check it out. if we decide to go down the dynamic partition road though, do you have an example of how to create a new partition without using a sensor?
looking through this asset factory example, i’m not sure it’d work for us, because all these examples have pre-defined assets. our use case is that we are constantly creating new assets dynamically. can this asset factory work dynamically?
c

claire

06/07/2023, 9:26 PM
Hi Brian. You can create a new partition without using a sensor directly from the instance. For example, within an asset, you can do
Copy code
@asset
def my_asset(context):
    context.instance.add_dynamic_partitions(partitions_def_name, list_of_partition_keys)
b

Brian Pohl

06/07/2023, 9:28 PM
thanks @claire! CC @josh gruenberg
j

josh gruenberg

06/07/2023, 9:30 PM
interesting , thanks @claire ... but I'm still confused about how this would work dynamically. our use-case involves users interactively requesting materialization of assets, with distinct data-coordinates and (potentially?) bespoke configuration/hyperparameters for each invocation.
b

Brian Pohl

06/07/2023, 9:31 PM
we want them to be able to enter a parameter in the Dagit UI, and have that parameter become the name/attribute of a partition
t

Tim Castillo

06/07/2023, 9:34 PM
Ah, ok. Then you can have that config brought in as config in the launchpad https://docs.dagster.io/concepts/dagit/dagit#launchpad-tab and then once those configs are loaded into the asset definition, then you can make a dynamic partition off of that
b

Brian Pohl

06/07/2023, 9:41 PM
we have something like this:
Copy code
foo_def = DynamicPartitionsDefinition(name="bar")

@asset
def my_asset(context):
    ...

foo_job = define_asset_job("foo job", AssetSelection.keys("bar"), partitions_def=foo_def)

defs = Definitions(
    assets=[my_asset],
    jobs=[foo_job],
    resources={
        "a_resource": some_resource,
    }
)
given this, where do you put the command
context.instance.add_dynamic_partitions(partitions_def_name, list_of_partition_keys)
? does that have to be a separate op?
c

claire

06/07/2023, 10:11 PM
I would recommend having the add_dynamic_partitions command be in a separate op. The reason is that you want the 3 assets to materialize for a specific partition that represents the unique config, so that you can identify if the asset was run for a particular configuration. If you added it within the same op, the asset would not execute for a specific partition.
👌 1
So you could do something like:
Copy code
dynamic_partitions_def = DynamicPartitionsDefinition(name="my_partitions_def")


@asset
def add_dynamic_partition_asset(context, my_config):
    partition_key = get_unique_key_from_config(my_config)
    context.instance.add_dynamic_partitions(dynamic_partitions_def.name, [partition_key])
    return Output(metadata={"partition_key": partition_key})

@asset_sensor(asset_key=add_dynamic_partition_asset.key, job=job_containing_assets)
def my_asset_sensor(context: SensorEvaluationContext, asset_event: EventLogEntry):
    yield RunRequest(partition_key=asset_event.asset_materialization.metadata["partition_key"])


@asset(partitions_def=dynamic_partitions_def)
def one(context):
    if context.instance.get_event_records(
        EventRecordsFilter(
            event_type=DagsterEventType.ASSET_MATERIALIZATION,
            asset_key=context.asset_key_for_output(),
            asset_partitions=[context.partition_key],
        ),
        limit=1,
    ):
        pass
        # materialization has occurred for the partition
    else:
        # materialization has not occurred for the given partition yet
        write_output_to_s3()

    return path


@asset(partitions_def=dynamic_partitions_def)
def two(one):
    ...
❤️ 1
• The "add dynamic partition asset" is the asset that users kick off in the UI, that they provide config for. Within this asset, you add a unique partition representing the config and return an output containing the partition key / any additional metadata • The asset sensor allows you to detect when the "add dynamic partition asset" is run, and kick off the 3 assets for the new partition • "one" and "two" represent two of the 3 assets you want to kick off. In these assets we check if the asset has previously materialized for the given partition, and only write to s3 if it has not
b

Brian Pohl

06/07/2023, 10:38 PM
this is great, thank you Claire! we will give this a shot
2 Views