Julius
03/27/2023, 2:55 AMMatt Clarke
03/27/2023, 8:06 AMJulius
03/27/2023, 8:09 AMdef get_last_sync_timestamp():
conn = connect(
account=SNOWFLAKE_ACCOUNT,
user=SNOWFLAKE_USER,
password=SNOWFLAKE_PASSWORD,
warehouse=SNOWFLAKE_WAREHOUSE,
database=SNOWFLAKE_DATABASE,
schema=SNOWFLAKE_SCHEMA,
)
try:
cursor = conn.cursor()
cursor.execute(
f"SELECT MAX(TO_TIMESTAMP(UPDATE_STARTED)) FROM {SNOWFLAKE_TABLE} WHERE \"TABLE\" = '{CHECKING_TABLE}';")
result = cursor.fetchone()
return result[0] if result[0] else datetime.min
finally:
cursor.close()
conn.close()
@sensor(
name="snowflake_sensor_run_process",
job_name="run_from_ingest",
minimum_interval_seconds=30
)
def snowflake_sensor(context: SensorEvaluationContext):
last_run_time = context.last_run_key
#
if last_run_time is None:
last_run_time = datetime.min.strftime("%m/%d/%Y, %H:%M:%S")
last_sync_timestamp = get_last_sync_timestamp().strftime("%m/%d/%Y, %H:%M:%S")
if last_sync_timestamp > last_run_time:
<http://context.log.info|context.log.info>(f"New data detected, triggering pipeline at {last_sync_timestamp}")
return run_request_for_partition(
partition_key=partitions_def_daily,
run_key=last_sync_timestamp,
run_config={},
)
else:
<http://context.log.info|context.log.info>(f"last_run_time: {last_run_time}. "
f"last_sync_timestamp: {last_sync_timestamp}. "
f"No new data detected")
sean
04/03/2023, 5:05 PMJulius
04/20/2023, 11:08 AM