Is it possible to dynamically create Assets that h...
# ask-community
j
Is it possible to dynamically create Assets that have their own partitions. Scenario: Day 1, job materializes two assets that have daily partitions Day 2, prior two assets materialize, job finds 1 more day 3, prior three assets materialize, job makes 0 day 4, prior three assets materialize, job makes 2 more The start date is known at the time of materialization. AssetMaterilzation events don’t seem to accept any partitiondefinition options
y
Hi Jon, I was reading your question earlier. Is this related to that use case?
j
Yep
y
do you mind sharing the code you’re trying with? more specifically, i’d like to understand the following better: • how do you expect to kick off the job? is it from a sensor or a schedule? • do you expect the
start_date
to be the partition value? • is
start_date
known at definition time, or would it be determined at run time?
if it’s a schedule, you can specify
partitions_def
on an
asset
like this example and pass the same partition def in
build_schedule_from_partitioned_job
like this.
j
Thanks, I’ve managed to understand the individual concepts of Partitioned Jobs and Assets. It’s defining them dynamically that I’m struggling with. I’m having trouble describing the sequence of events but very pseudo
Copy code
def create_campaign(campaign_id, start_date):
  custom_daily  = DailyPartitionsDefinition(start_date="2022-05-31-00:00")
  return AssetsDefinition.from_graph(daily_campaign_analytics, partition_def=custom_daily)
  )  

@job(partition_def=daily_partitioned_config(start_date='2022-06-01')
def lookup_new_campaigns(context):
   campaings = fetch_campaigns
   created_today = [camp for camp in campaigns where camp.started_at = context.partition_key]
   for camp_id in created_today:
     yield Output(create_campaign(camp_id, camp.started_at))
The dynamic setting of start date is the blocker. I can’t find any way not in a decorator to provide the start_key and still materialize an asset
y
in your pseudo code, is
create_campaign
an asset factory? you’re trying to
yield Output
in the body of a job, which isn’t supported — what are you trying to achieve in the
lookup_new_campaigns
job?
j
Sorry I put too many specifics in there, I think this is a failure of my understanding of Assets and the approach to handling them vs traditional ‘airflow’ style Ops. I don’t know the method in Dagster to do what I’m looking to do so I’m trying to explain how I think it would work. But it’s a catch 22, if I could explain how to do it I could do it. In Airflow I would have an @daily job that uses the
execution_date
to figure out what campaigns are live for that date. Then iterates the active campaigns and inserts that data into a SQL table or writes them in a hive like partition to S3 like ‘<bucket>/emails/campaigns/<campaign_uuid>/date=2022-06-07/myfiles.csv’. Where I don’t have the ability to individually retry failed campaigns but have to refetch all active campaigns for a day. Additionally I can’t backfill a single campaign or set of campaigns. I have to backfill everything. I’m able to recreate this style of workflow in Dagster, with a Fan out -> Fan In, but all data for campaigns are grouped together. But at least I can retry individual failed steps.
Right now a job op using
context.log_event
with an AssetMaterialization. Can’t create a new asset that is partitioned. It can create a single partition on an asset. But not a new graph based asset. Is that right? I appreciate your help, it’s just one of my main use cases that I see dagster potentially filling and I’m struggling with it. The other thread where you referred the inserting the hourly partition into a Relational Table is how I’m currently doing it on a daily schedule. But it requires that I fetch the data for all active campaigns for that day/partition_key, rather than being each campaign is independent. Which would allow backfills to process less data if only some needed it. I have an unknown number of assets that will slowly increase every day. I would like each asset to have their own key so I can do something like AssetKey([‘email’,‘campaign’,’<source>,<uuid>]), then grab all email campaings or all campaigns for a given source.
y
ah got it. the workflow makes total sense. when you say
<bucket>/emails/campaigns/<campaign_uuid>/date=2022-06-07/myfiles.csv
do you think
2022-06-07
is known before you kick off the job or will it be known at run time like after you iterate through the campaign? sounds like this would be dynamically generated at run time. we currently don’t have support for runtime partitions, but we’re thinking of building it - here’s a tracking issue: https://github.com/dagster-io/dagster/issues/7943
same as for
AssetKey(['email','campaign','<source>,<uuid>])
, will
source
and
uuid
be available before the job starts? one way to model this is to log this info in metadata attached to asset materializations. here’s an example: https://docs.dagster.io/concepts/assets/software-defined-assets#recording-materialization-metadata
cc @sandy i think these are great use cases for improving asset observability.
j
With airflow I’d template the path to the date string in s3 location like that. The date would be known because it’d be a partition date. For each campaign I’d know the
start_date
of a partition config. Can an @op do something like:
Copy code
def my_op(campaign_id, start_date):
  custom_start = partition_config = DailyParittionConfig(start_date=start_date)
  return AssetsDefinition.from_graph(AssetKey(['email','campaign',campaign_id]),build_single_day_analytics, partition_def = custom_start)
Thank Yuhan, for the materialization metadata method is there a selection syntax to fetch assets that have particular metadata?
y
i think you can
yield AssetMaterialization
in an op body, without using
AssetsDefinition
. something like:
Copy code
@op(config_schema={"date": str})
def my_partitioned_asset_op(context, campaign_id):
    partition_date = context.op_config["date"]

    ...

    context.log_event(
        AssetMaterialization(asset_key=AssetKey(['email','campaign',campaign_id]), partition=partition_date)
    )
    return ...
and then your job will look like:
Copy code
@daily_partitioned_config(start_date=datetime(2022, 1, 1))
def my_partitioned_config(start: datetime, _end: datetime):
    return {
        "ops": {
            "my_partitioned_asset_op": {"config": {"date": start.strftime("%Y-%m-%d")}}
        }
    }

@job(config=my_partitioned_config)
def do_stuff_partitioned():
    my_partitioned_asset_op()
j
Does the Asset [‘email’,‘campaign’,campaign_id] needs to already exist as a daily partitioned asset for that to work?
y
ah if you’d like to filter or select assets based on
['email','campaign',campaign_id]
, then i think keeping them in AssetKey does make more sense. so ^ would be my recommendation. querying assets’ metadata isn’t super performant or native supported.
how do you expect to provide the values of
[email, campaign, campaign_id]
? from upstream op (runtime), or config (definition time)?
j
The campaign_id It would be runtime via an op, the others would be known
y
got it. you can use dynamic graphs and pass campaign_id as an input from upstream op:
Copy code
@op(out=DynamicOut())
def get_campaign_ids():
    data = load_campaigns()
    for campaign_id in data:
        yield DynamicOutput(campaign_id, mapping_key=campaign_id)


@op(config_schema={"date": str})
def my_partitioned_asset_op(context, campaign_id):
    partition_date = context.op_config["date"]

    ...

    context.log_event(
        AssetMaterialization(
            asset_key=AssetKey(['email','campaign',campaign_id]),
            partition=partition_date,
        )
    )
    return ...

@job(config=my_partitioned_config)
def do_stuff_partitioned():
    get_campaign_ids.map(my_partitioned_asset_op)
as for
'email','campaign'
, you can provide them via
my_partitioned_config
s
@Jon Simpson if you'd be open to a call some time, I'd be curious to hear more about your use case
j
Thanks Yuhan. I’ll try to give that a shot later in the day. Sounds good Sandy, I’ll message you with some times we could chat and see what works
blob thumbs up 1