Hi all, I'm wondering if there's a cleaner way to ...
# ask-community
r
Hi all, I'm wondering if there's a cleaner way to create dynamic partitions in response to a new observation for a
observable_source_asset
. Here's my dummy implementation that uses a bunch of internal stuff - I'm wondering if there's a more-public API for this sort of thing. As it stands, returning a
SensorResult
from a
@sensor
gives a type-checking error (though it does work fine).
Copy code
@observable_source_asset
def check_for_new_data():
    conn = connect(str(DATA_DB))
    df = conn.execute("SELECT * FROM some_table").df()
    hashed = hashlib.sha256(df.to_csv().encode("utf-8")).hexdigest()

    return DataVersion(hashed)


new_partition_job = define_asset_job(
    "some_assets_to_build_upon_new_partition",
    AssetSelection.keys("asset_1", "asset_2"),
    partitions_def=dynamic_partition_def,
)


@sensor(job=new_partition_job)
def new_data_sensor(context: SensorEvaluationContext):
    cursor = context.cursor if context.cursor else None
    asset_event = context.instance.get_latest_data_version_record(
        check_for_new_data.key
    )
    if asset_event is None:
        return SkipReason(skip_message="No observations yet.")
    data_version = extract_data_version_from_entry(asset_event.event_log_entry).value

    if cursor is None or data_version != cursor:
        <http://context.log.info|context.log.info>("New data detected, advancing cursor")

        return SensorResult(
            [
                RunRequest(
                    run_key=f"new_partition_job_{data_version}",
                    partition_key=data_version,
                )
            ],
            dynamic_partitions_requests=[
                dynamic_partition_def.build_add_request([data_version])
            ],
            cursor=data_version,
        )
As a side comment - trying to call
dynamic_partitions_def.get_last_partition_key()
throws an error when called from an op, which is another strategy I tried to no avail.