Bernardo Cortez
07/28/2022, 10:17 AMdaniel
07/28/2022, 10:44 PMBernardo Cortez
08/01/2022, 9:04 AMclass ADLSPartitionSensor:
def __init__(self, context, event_stream, listening_table, posterior_job):
self.context = context
self.event_stream = event_stream
self.listening_table = listening_table
self.posterior_job = posterior_job
def _get_date_from_folder(self, folder):
members = folder.name.split('/')
for path_member in members:
if ("data_date_part=" in path_member) and (path_member in members[-1]):
partition_date = path_member.split('=')[1]
last_partition_date = self.context.cursor if self.context.cursor else '0'
if partition_date>last_partition_date:
formatted_date = convert_date_str(partition_date, from_format='%Y-%m-%d', to_format='%Y%m%d')
yield formatted_date
def run(self):
file_system_client = get_file_system_client_with_connection_string()
partition_dates_to_yield = [date for folder in file_system_client.get_paths(path=self.listening_table['name'], recursive=False) for date in self._get_date_from_folder(folder) if date]
# yield temporally ordered RunRequests
partition_dates_to_yield.sort()
for date in partition_dates_to_yield:
if not self.context.cursor or date>self.context.cursor:
yield RunRequest(
run_key=get_unique_run_key(self.event_stream, self.posterior_job.name, date),
run_config=read_config(job_name=self.posterior_job.name, event_stream=self.event_stream, date=date)
)
self.context.update_cursor(date)
Basically, I put all partition dates of a certain table to partition_dates_to_yield
, sort it and then iterate through it comparing each date with the cursor, yielding a RunRequest if the date is higher than the cursorself.context.update_cursor(date)
, in which date
is a stringdaniel
08/09/2022, 6:21 PMprint("CURSOR: " + str(context.cursor) + " , TYPE: " + str(type(context.cursor)))
at the start of your sensor function and check the daemon logs for the output, it might give some clues about what's coming in
When I set 20220805 as a sensor cursor in the dagit UI (on 1.0.2) and print that at the beginning of a sensor function, it outputs:
CURSOR: 20220805 , TYPE: <class 'str'>Bernardo Cortez
08/10/2022, 11:45 AMdaniel
08/10/2022, 6:55 PMBernardo Cortez
08/10/2022, 6:56 PMdaniel
08/10/2022, 6:57 PMBernardo Cortez
08/11/2022, 12:38 AMprha
08/11/2022, 5:46 PMBernardo Cortez
08/11/2022, 11:46 PMprha
08/11/2022, 11:54 PMBernardo Cortez
08/12/2022, 9:58 AMprha
08/15/2022, 5:31 PM0.14.13
.Bernardo Cortez
08/22/2022, 8:03 AMprha
08/23/2022, 4:40 PMSkipReason
not being visible. w.r.t. the underlying issue, did you upgrade the version of dagster around the time that you started seeing the issue? If you can share your logs from the sensor, I might be able to help you debug the underlying issue…Bernardo Cortez
08/23/2022, 4:41 PMprha
09/13/2022, 3:15 PM