hi all, are partitioned observable_source_assets a...
# ask-community
b
hi all, are partitioned observable_source_assets an experimental feature? i cannot make them run via the UI only via schedules with dynamic partitions. When I try to "observe" an an asset defined as observable_source_asset, I am getting an error:
Copy code
dagster._core.errors.DagsterInvariantViolationError: Cannot access partition_key for a non-partitioned run
am i doing something wrong or should i report a github ticket?
c
Hi bx2. Observable source assets are currently experimental. Though, I just tried running this example on the latest version of dagster:
Copy code
partitions_def = DynamicPartitionsDefinition(name='fruits')


@observable_source_asset(partitions_def=partitions_def)
def source_asset():
    return DataVersionsByPartition({"apple": DataVersion("one"), "orange": DataVersion("two")})
and clicking "observe" executes the run without an error. Would you mind sharing your code for your observable source asset? And also the version you're running dagster on?
b
Thank you for your response. In the example you shared, how would I access the partitions if they are populated by a different process? When I try to access the partition key from the context in my code (similar to yours, but I'm trying to get the actual data by accessing
context.partition_key
), I face the following issues: • I get the error "cannot access partition_key for a non-partitioned run." •
context.has_partition_key
returns
False
when I use the UI and click "Observe" on the asset. Any guidance on this would be appreciated!
c
You could do this to access the partition keys:
Copy code
@observable_source_asset(partitions_def=partitions_def)
def source_asset(context):
    partitions_def.get_partition_keys(dynamic_partitions_store=context.instance)
    ...
Unfortunately it's currently not possible to execute an observable source asset for a given partition via the UI, which is why you're seeing an error when calling
context.partition_key
. You'll have to return a data version for each partition in the interim
b
thank you! i will check this out
K, so the solution you proposed works in a way, i.e., i can access the dynamically created partitions. Thank you for that! Based on what you said, there is no point then to partition the observable_source_asset in the first place, right? I somehow cannot get this to work - the SDAs that depend on the observable source asset do not register that there is a new version and they do not tell me that there was an upstream change. Take a look at this simple example:
Copy code
@observable_source_asset(partitions_def=site_partitions)
def posts(
    context: AssetExecutionContext,
    ceres_content_client: CeresContentClient,
):
    sites = site_partitions.get_partition_keys(
        dynamic_partitions_store=context.instance,
    )
    versions = {site: DataVersion("1") for site in sites}
    return DataVersionsByPartition(versions)


@asset(partitions_def=site_partitions, deps=[posts])
def sitemap(context: AssetExecutionContext):
    return {"test": "aaa"}
What I see when this is defined (screenshot) - always missing partitions for the source asset and I cannot materialize it anyway. Running "observe" will get the data version changes and observation events, i.e. I see them then in per-partition events, but the source just keeps loading for some reason and the downstream does not react to any of the version changes. I am sure that I misunderstand something or am doing something wrong, I just don't know what 😄 Thanks upfront for your time!
then after observe runs successfully:
c
@bx2 sorry about the late response, thanks for reporting! I think in our current UI, we don't display staleness for partitioned assets: https://github.com/dagster-io/dagster/issues/9374 This is something that we're working on implementing -- the main constraining factor is performance. Each partition needs to be validated independently, which causes slowdowns especially for assets with large numbers of partitions.
b
no worries! ok, i see, but the funny part is that the asset did not refresh based on the data version change either. i abandoned this approach in the meantime and decided against SDAs for that particular usecase as it was too much of a hassle. of course it would be cool to have all of this visualized and actually running, but jobs/ops/sensors work fine as-is.
c
How were you expecting the asset to refresh? If you add an auto-materialize policy to the downstream
sitemap
asset, i.e.
AutoMaterializePolicy.eager()
, the auto-materialize policy will kick off execution for
sitemap
whenever the data version of the observable source asset changes. By default, this is limited to 1 partition per evaluation for rate limiting purposes, but you can increase that number