Jon Simpson
09/06/2022, 10:33 PMyuhan
09/06/2022, 10:40 PMJon Simpson
09/06/2022, 10:41 PMyuhan
09/06/2022, 10:48 PMstart_date
to be the partition value?
• is start_date
known at definition time, or would it be determined at run time?yuhan
09/06/2022, 10:50 PMpartitions_def
on an asset
like this example and pass the same partition def in build_schedule_from_partitioned_job
like this.yuhan
09/06/2022, 10:56 PMJon Simpson
09/06/2022, 11:59 PMdef create_campaign(campaign_id, start_date):
custom_daily = DailyPartitionsDefinition(start_date="2022-05-31-00:00")
return AssetsDefinition.from_graph(daily_campaign_analytics, partition_def=custom_daily)
)
@job(partition_def=daily_partitioned_config(start_date='2022-06-01')
def lookup_new_campaigns(context):
campaings = fetch_campaigns
created_today = [camp for camp in campaigns where camp.started_at = context.partition_key]
for camp_id in created_today:
yield Output(create_campaign(camp_id, camp.started_at))
The dynamic setting of start date is the blocker. I can’t find any way not in a decorator to provide the start_key and still materialize an assetyuhan
09/07/2022, 3:51 AMcreate_campaign
an asset factory? you’re trying to yield Output
in the body of a job, which isn’t supported — what are you trying to achieve in the lookup_new_campaigns
job?Jon Simpson
09/07/2022, 3:39 PMexecution_date
to figure out what campaigns are live for that date. Then iterates the active campaigns and inserts that data into a SQL table or writes them in a hive like partition to S3 like ‘<bucket>/emails/campaigns/<campaign_uuid>/date=2022-06-07/myfiles.csv’. Where I don’t have the ability to individually retry failed campaigns but have to refetch all active campaigns for a day. Additionally I can’t backfill a single campaign or set of campaigns. I have to backfill everything.
I’m able to recreate this style of workflow in Dagster, with a Fan out -> Fan In, but all data for campaigns are grouped together. But at least I can retry individual failed steps.Jon Simpson
09/07/2022, 3:40 PMcontext.log_event
with an AssetMaterialization. Can’t create a new asset that is partitioned. It can create a single partition on an asset. But not a new graph based asset. Is that right?
I appreciate your help, it’s just one of my main use cases that I see dagster potentially filling and I’m struggling with it. The other thread where you referred the inserting the hourly partition into a Relational Table is how I’m currently doing it on a daily schedule. But it requires that I fetch the data for all active campaigns for that day/partition_key, rather than being each campaign is independent. Which would allow backfills to process less data if only some needed it.
I have an unknown number of assets that will slowly increase every day. I would like each asset to have their own key so I can do something like AssetKey([‘email’,‘campaign’,’<source>,<uuid>]), then grab all email campaings or all campaigns for a given source.yuhan
09/07/2022, 6:38 PM<bucket>/emails/campaigns/<campaign_uuid>/date=2022-06-07/myfiles.csv
do you think 2022-06-07
is known before you kick off the job or will it be known at run time like after you iterate through the campaign?
sounds like this would be dynamically generated at run time. we currently don’t have support for runtime partitions, but we’re thinking of building it - here’s a tracking issue: https://github.com/dagster-io/dagster/issues/7943yuhan
09/07/2022, 6:39 PMAssetKey(['email','campaign','<source>,<uuid>])
, will source
and uuid
be available before the job starts?
one way to model this is to log this info in metadata attached to asset materializations. here’s an example: https://docs.dagster.io/concepts/assets/software-defined-assets#recording-materialization-metadatayuhan
09/07/2022, 8:25 PMJon Simpson
09/07/2022, 8:26 PMstart_date
of a partition config. Can an @op do something like:
def my_op(campaign_id, start_date):
custom_start = partition_config = DailyParittionConfig(start_date=start_date)
return AssetsDefinition.from_graph(AssetKey(['email','campaign',campaign_id]),build_single_day_analytics, partition_def = custom_start)
Jon Simpson
09/07/2022, 8:34 PMyuhan
09/07/2022, 8:35 PMyield AssetMaterialization
in an op body, without using AssetsDefinition
. something like:
@op(config_schema={"date": str})
def my_partitioned_asset_op(context, campaign_id):
partition_date = context.op_config["date"]
...
context.log_event(
AssetMaterialization(asset_key=AssetKey(['email','campaign',campaign_id]), partition=partition_date)
)
return ...
and then your job will look like:
@daily_partitioned_config(start_date=datetime(2022, 1, 1))
def my_partitioned_config(start: datetime, _end: datetime):
return {
"ops": {
"my_partitioned_asset_op": {"config": {"date": start.strftime("%Y-%m-%d")}}
}
}
@job(config=my_partitioned_config)
def do_stuff_partitioned():
my_partitioned_asset_op()
Jon Simpson
09/07/2022, 8:37 PMyuhan
09/07/2022, 8:37 PM['email','campaign',campaign_id]
, then i think keeping them in AssetKey does make more sense. so ^ would be my recommendation.
querying assets’ metadata isn’t super performant or native supported.yuhan
09/07/2022, 8:39 PM[email, campaign, campaign_id]
? from upstream op (runtime), or config (definition time)?Jon Simpson
09/07/2022, 8:40 PMyuhan
09/07/2022, 8:48 PM@op(out=DynamicOut())
def get_campaign_ids():
data = load_campaigns()
for campaign_id in data:
yield DynamicOutput(campaign_id, mapping_key=campaign_id)
@op(config_schema={"date": str})
def my_partitioned_asset_op(context, campaign_id):
partition_date = context.op_config["date"]
...
context.log_event(
AssetMaterialization(
asset_key=AssetKey(['email','campaign',campaign_id]),
partition=partition_date,
)
)
return ...
@job(config=my_partitioned_config)
def do_stuff_partitioned():
get_campaign_ids.map(my_partitioned_asset_op)
as for 'email','campaign'
, you can provide them via my_partitioned_config
sandy
09/07/2022, 9:19 PMJon Simpson
09/08/2022, 3:29 PM