hi all, are partitioned observable_source_assets a...
# ask-community
hi all, are partitioned observable_source_assets an experimental feature? i cannot make them run via the UI only via schedules with dynamic partitions. When I try to "observe" an an asset defined as observable_source_asset, I am getting an error:
Copy code
dagster._core.errors.DagsterInvariantViolationError: Cannot access partition_key for a non-partitioned run
am i doing something wrong or should i report a github ticket?
Hi bx2. Observable source assets are currently experimental. Though, I just tried running this example on the latest version of dagster:
Copy code
partitions_def = DynamicPartitionsDefinition(name='fruits')

def source_asset():
    return DataVersionsByPartition({"apple": DataVersion("one"), "orange": DataVersion("two")})
and clicking "observe" executes the run without an error. Would you mind sharing your code for your observable source asset? And also the version you're running dagster on?
Thank you for your response. In the example you shared, how would I access the partitions if they are populated by a different process? When I try to access the partition key from the context in my code (similar to yours, but I'm trying to get the actual data by accessing
), I face the following issues: • I get the error "cannot access partition_key for a non-partitioned run." •
when I use the UI and click "Observe" on the asset. Any guidance on this would be appreciated!
You could do this to access the partition keys:
Copy code
def source_asset(context):
Unfortunately it's currently not possible to execute an observable source asset for a given partition via the UI, which is why you're seeing an error when calling
. You'll have to return a data version for each partition in the interim
thank you! i will check this out
K, so the solution you proposed works in a way, i.e., i can access the dynamically created partitions. Thank you for that! Based on what you said, there is no point then to partition the observable_source_asset in the first place, right? I somehow cannot get this to work - the SDAs that depend on the observable source asset do not register that there is a new version and they do not tell me that there was an upstream change. Take a look at this simple example:
Copy code
def posts(
    context: AssetExecutionContext,
    ceres_content_client: CeresContentClient,
    sites = site_partitions.get_partition_keys(
    versions = {site: DataVersion("1") for site in sites}
    return DataVersionsByPartition(versions)

@asset(partitions_def=site_partitions, deps=[posts])
def sitemap(context: AssetExecutionContext):
    return {"test": "aaa"}
What I see when this is defined (screenshot) - always missing partitions for the source asset and I cannot materialize it anyway. Running "observe" will get the data version changes and observation events, i.e. I see them then in per-partition events, but the source just keeps loading for some reason and the downstream does not react to any of the version changes. I am sure that I misunderstand something or am doing something wrong, I just don't know what 😄 Thanks upfront for your time!
then after observe runs successfully:
@bx2 sorry about the late response, thanks for reporting! I think in our current UI, we don't display staleness for partitioned assets: https://github.com/dagster-io/dagster/issues/9374 This is something that we're working on implementing -- the main constraining factor is performance. Each partition needs to be validated independently, which causes slowdowns especially for assets with large numbers of partitions.
no worries! ok, i see, but the funny part is that the asset did not refresh based on the data version change either. i abandoned this approach in the meantime and decided against SDAs for that particular usecase as it was too much of a hassle. of course it would be cool to have all of this visualized and actually running, but jobs/ops/sensors work fine as-is.
How were you expecting the asset to refresh? If you add an auto-materialize policy to the downstream
asset, i.e.
, the auto-materialize policy will kick off execution for
whenever the data version of the observable source asset changes. By default, this is limited to 1 partition per evaluation for rate limiting purposes, but you can increase that number