Rishi Kulkarni
07/04/2023, 1:24 AMobservable_source_asset
. Here's my dummy implementation that uses a bunch of internal stuff - I'm wondering if there's a more-public API for this sort of thing. As it stands, returning a SensorResult
from a @sensor
gives a type-checking error (though it does work fine).
@observable_source_asset
def check_for_new_data():
conn = connect(str(DATA_DB))
df = conn.execute("SELECT * FROM some_table").df()
hashed = hashlib.sha256(df.to_csv().encode("utf-8")).hexdigest()
return DataVersion(hashed)
new_partition_job = define_asset_job(
"some_assets_to_build_upon_new_partition",
AssetSelection.keys("asset_1", "asset_2"),
partitions_def=dynamic_partition_def,
)
@sensor(job=new_partition_job)
def new_data_sensor(context: SensorEvaluationContext):
cursor = context.cursor if context.cursor else None
asset_event = context.instance.get_latest_data_version_record(
check_for_new_data.key
)
if asset_event is None:
return SkipReason(skip_message="No observations yet.")
data_version = extract_data_version_from_entry(asset_event.event_log_entry).value
if cursor is None or data_version != cursor:
<http://context.log.info|context.log.info>("New data detected, advancing cursor")
return SensorResult(
[
RunRequest(
run_key=f"new_partition_job_{data_version}",
partition_key=data_version,
)
],
dynamic_partitions_requests=[
dynamic_partition_def.build_add_request([data_version])
],
cursor=data_version,
)
Rishi Kulkarni
07/04/2023, 1:30 AMdynamic_partitions_def.get_last_partition_key()
throws an error when called from an op, which is another strategy I tried to no avail.