Hi everyone, I’m new to dagster and I just joined ...
# ask-community
b
Hi everyone, I’m new to dagster and I just joined to ask this question, since I’ve spent a while searching through the documentation and couldn’t find a clear answer. I have an hourly partitioned asset like so:
Copy code
@asset(partitions_def=HourlyPartitionsDefinition(start_date="2022-01-01-00:00"))
def blog_posts(context) -> List[Dict]:
    partition_datetime_str = context.asset_partition_key_for_output()
    hour = datetime.fromisoformat(partition_datetime_str)
    posts = fetch_blog_posts_from_external_api(hour_when_posted=hour)
    return posts
I can use the above asset as input to other assets - that’s all nice and fine. But what I would also like to be able to do is to run a job each time a new partition of
blog_posts
has been materialized in order to publish the data to yet another external API. I assumed I would be able to do this fairly easily using an asset sensor, like so:
Copy code
@asset_sensor(asset_key=AssetKey("blog_posts"), job=my_job)
def new_materialized_blog_posts_sensor(context, asset_event):
    yield RunRequest(
        run_key=context.cursor,
        run_config={
            "ops": {
                "my_op": {
                    "config": {
                        "asset_key_path": asset_event.dagster_event.asset_key.path,
                        "partition_key": asset_event.dagster_event.partition,
                    }
                }
            }
        },
    )
This triggers
my_job
and that job executes
my_op
which receives the asset_key and partition. So it seems like I’m just a step away. But the problem I have is: how do I access the underlying data? Do I need to work directly with the
IOManager
class to load the input? If so, how do initialise the correct IO manager based on what has been configured for the asset storage? Once I have the IO manager, I should be able to access the data using the approach for testing:
Copy code
context = build_input_context(asset_key=asset_key, partition_key=partition_key)
blog_posts = manager.load_input(context)
But this seems a bit hacky (the recommendation is to use this for tests only, I think). So what is the best way I can access the materialized asset partition in my other job?
o
hi @Bartek Marczak! If you represent the downstream external API as an asset, then all of the input loading should be handled for you, i.e.
Copy code
@asset(partitions_def=HourlyPartitionsDefinition(start_date="2022-01-01-00:00"))
def blog_posts_in_other_api(context, blog_posts: List[Dict]):
    partition_datetime_str = context.asset_partition_key_for_output()
    hour = datetime.fromisoformat(partition_datetime_str)
    # blog_posts is a List[Dict] loaded from upstream
    ...
from there, it should be a lot easier to kick off runs for this asset within a sensor -- you can just do
Copy code
my_job = define_asset_job("my_job", selection=AssetSelection.keys("blog_posts_in_other_api"))

@asset_sensor(asset_key=AssetKey("blog_posts"), job=my_job)
def new_materialized_blog_posts_sensor(context, asset_event):
    return my_job.run_request_for_partition(asset_event.dagster_event.partition)
we also have a built-in asset_reconciliation_sensor which would handle this sort of behavior for you in a more generalized way, which might be useful here -- basically any time an upstream partition is created, a run will be kicked off to update downstream partitions
b
Thanks @owen, that makes sense. I thought I should avoid representing the external API as an asset (since it’s external to our project) but it sounds like asset outputs should generally only be handled by other assets and not mixed mashed with naive ops. Would that be an accurate statement?
o
that's accurate yep! In this case, pushing data to that external API is genuinely updating the contents of a "persistent data asset", so I think it still fits pretty nicely into the asset framework, but I get how it can feel a bit different from systems that you control more closely
👍 1
m
Ohhhhhhhhhh! Thanks a bunch to you both! I've been breaking my head for nearly 2 straight days trying to understand why I couldn't get jobs to act on data in assets! My particular use-case is way less data-oriented. I have an asset "monitoring data" which represents the states of some ~50,000 devices deployed around the world. Based on the data, certain actions are performed (ex. rebooting one of the devices). To me: that was clearly a 'job'. A set of actions to be taken based on input. After reading this, I now understand that I need to remain asset-oriented!