chrispc
03/31/2022, 6:25 PM@sensor(job=stock_prediction_sensor,
minimum_interval_seconds=30)
def stock_sensor(context):
path_file = os.path.dirname(__file__)
test_config = PipelineConfiguration()
config_stocks = test_config.get_preset(name_preset='stock_prediction_api',
name_yaml='APPL_sensor.yaml').run_config
file_path = config_stocks["ops"]["read_excel_file"]["config"]["path_input"]
file_path = os.path.abspath(os.path.join(path_file, "..", "data_logs", file_path))
last_mtime = float(context.cursor) if context.cursor else 0
max_mtime = last_mtime
run_config = config_stocks
if os.path.isfile(file_path):
fstats = os.stat(file_path)
file_mtime = fstats.st_mtime
if file_mtime <= last_mtime:
yield SkipReason(f"There is not new changes in {file_path}.")
else:
# the run key should include mtime if we want to kick off new runs based on file modifications
run_key = f"ExRef_mapping:{str(file_mtime)}"
yield RunRequest(run_key=run_key, run_config=run_config)
max_mtime = max(max_mtime, file_mtime)
context.update_cursor(str(max_mtime))
prha
03/31/2022, 6:48 PMSkipped
tag in the table show anything? Do the daemon logs show anything interesting there?
If it’s not finding the SkipReason
, I wonder if it’s not hitting the right filepath and it’s failing the isfile
check.