https://dagster.io/ logo
#ask-community
Title
# ask-community
y

Yevhen Samoilenko

07/21/2022, 12:35 PM
Hi! After the update (from v0.14.19 to v0.15.6), the launchpad logic for partitioned jobs is changed. Earlier I could manually run one job to cover multiple partitions at once by manually editing the config, e.g.:
Copy code
ops:
  load_transactions_daily:
    config:
      assets:
        input_partitions:
          extract_transactions_daily:
            end: 2022-07-19
            start: 2022-07-10
        output_partitions:
          result:
            end: 2022-07-19
            start: 2022-07-10
I know, it's kind of a hack, but sometimes it's pretty convenient (for testing purposes or after deploying new hourly partitioned assets, load all of the historical data for the last few years). But now when I select a partition, the config is not being scaffolded anymore. Is there any other way to do something like this with the latest dagster version?
j

jamie

07/21/2022, 1:39 PM
Hi @Yevhen Samoilenko one thing you could start doing is launching backfills to fill in previous partitions (this will launch a job for each partition though, so it doesn't quite match your exact use case) Are you saying that when you go to launch the job in question that there is no place to enter any config? For example, i'm thinking you might be able to enter this config manually in the launchpad (without first selecting a partition). i haven't tested that out myself though, so if it doesn't work can you give me some more details about what you're seeing in dagit or a minimal example i can use to replicate what you see? that would be super helpful
y

Yevhen Samoilenko

07/21/2022, 1:48 PM
Hi, @jamie! Thank you for the quick response! Yes, I can enter config manually, but considering the fact that jobs might have 200+ assets, it might be a little bit cumbersome))
j

jamie

07/21/2022, 1:50 PM
i see, i think i misunderstood the issue a bit. do you're saying if you take the config you would have previously entered (in your original message) that no longer works? or is the config you sent a snippet from a much larger config that would have been entered?
y

Yevhen Samoilenko

07/21/2022, 1:57 PM
Yes, it's a way larger. It was just an example. Actually, I'm not sure if it still works. I haven't tested it yet. The main issue, for now, is that picking a partition in the launchpad doesn't scaffold the config anymore.
Maybe my goal (covering a partitions range in one job run) may be achieved in some other way?
j

jamie

07/21/2022, 2:10 PM
i'm not aware of another way to run a range of partitions all in a single job execution (ie a single run). I think with the current capabilities you'd need to select the partition range in the selector and then each partition will execute as it's own run. If the ability to run a range of partitions in a single run is important to you, definitely open up a gh issue and we'll add it to the backlog!
y

Yevhen Samoilenko

07/22/2022, 9:58 AM
If someone else is interested in this functionality - I've found a solution:
Copy code
def make_resource_defs():
    return {
        "overwrite_time_window_start": ResourceDefinition(
            resource_fn=lambda context: context.resource_config
            if isinstance(context.resource_config, str)
            else None
        ),
        "warehouse_client": warehouse_client.configured(WAREHOUSE_CLIENT_CONF),
    }

def get_time_window(
    context: OpExecutionContext,
    output_name: str = "result",
):
    time_window = context.output_asset_partitions_time_window(output_name)
    overwrite_time_window_start = context.resources.overwrite_time_window_start

    start = (
        arrow.get(overwrite_time_window_start).datetime
        if overwrite_time_window_start
        else time_window.start
    )

    return TimeWindow(start, time_window.end)


@asset(
    required_resource_keys={
        "warehouse_client",
        "overwrite_time_window_start",
    },
    partitions_def=HourlyPartitionsDefinition(
        start_date=arrow.get("2022-01-01").datetime
    ),
)
def get_users(context: OpExecutionContext):
    warehouse_client: WarehouseClient = context.resources.warehouse_client
    time_window = get_time_window(context)
    table = "public.users"

    return warehouse_client.get_table_data(
        table=table,
        start_time=time_window.start,
        end_time=time_window.end,
    )
and then use this config in the launchpad to overwrite time_window.start:
Copy code
resources:
  overwrite_time_window_start:
    config: 2022-07-10
j

jamie

07/22/2022, 2:21 PM
glad you found a solution and thanks for sharing!
2 Views