Bartek Marczak
12/29/2022, 1:28 PM@asset(partitions_def=HourlyPartitionsDefinition(start_date="2022-01-01-00:00"))
def blog_posts(context) -> List[Dict]:
partition_datetime_str = context.asset_partition_key_for_output()
hour = datetime.fromisoformat(partition_datetime_str)
posts = fetch_blog_posts_from_external_api(hour_when_posted=hour)
return posts
I can use the above asset as input to other assets - that’s all nice and fine.
But what I would also like to be able to do is to run a job each time a new partition of blog_posts
has been materialized in order to publish the data to yet another external API. I assumed I would be able to do this fairly easily using an asset sensor, like so:
@asset_sensor(asset_key=AssetKey("blog_posts"), job=my_job)
def new_materialized_blog_posts_sensor(context, asset_event):
yield RunRequest(
run_key=context.cursor,
run_config={
"ops": {
"my_op": {
"config": {
"asset_key_path": asset_event.dagster_event.asset_key.path,
"partition_key": asset_event.dagster_event.partition,
}
}
}
},
)
This triggers my_job
and that job executes my_op
which receives the asset_key and partition. So it seems like I’m just a step away. But the problem I have is: how do I access the underlying data? Do I need to work directly with the IOManager
class to load the input? If so, how do initialise the correct IO manager based on what has been configured for the asset storage? Once I have the IO manager, I should be able to access the data using the approach for testing:
context = build_input_context(asset_key=asset_key, partition_key=partition_key)
blog_posts = manager.load_input(context)
But this seems a bit hacky (the recommendation is to use this for tests only, I think).
So what is the best way I can access the materialized asset partition in my other job?owen
12/29/2022, 2:32 PM@asset(partitions_def=HourlyPartitionsDefinition(start_date="2022-01-01-00:00"))
def blog_posts_in_other_api(context, blog_posts: List[Dict]):
partition_datetime_str = context.asset_partition_key_for_output()
hour = datetime.fromisoformat(partition_datetime_str)
# blog_posts is a List[Dict] loaded from upstream
...
from there, it should be a lot easier to kick off runs for this asset within a sensor -- you can just do
my_job = define_asset_job("my_job", selection=AssetSelection.keys("blog_posts_in_other_api"))
@asset_sensor(asset_key=AssetKey("blog_posts"), job=my_job)
def new_materialized_blog_posts_sensor(context, asset_event):
return my_job.run_request_for_partition(asset_event.dagster_event.partition)
Bartek Marczak
12/29/2022, 2:38 PMowen
12/29/2022, 2:41 PMMatthew
03/14/2023, 11:27 PM