Qumber Ali
03/14/2022, 11:07 AM/usr.bin.python3 -m dagster..grpc --lazy-load-user-code --socket
alex
03/14/2022, 4:00 PMdagit
and the daemon
running
I can see there are lots of processes with these line, not getting why it’s happeningthese are grpc server subprocesses for dagit/daemon. I am guessing you have many more than needed due to the extraneous copies of the parent dagit/daemon processes running
Qumber Ali
03/14/2022, 4:01 PMalex
03/14/2022, 4:02 PMhtop
displays separate threadsdagit
?Qumber Ali
03/14/2022, 4:09 PMalex
03/14/2022, 4:09 PMQumber Ali
03/14/2022, 4:11 PMalex
03/14/2022, 4:13 PMsystemd
set-up was causing it to pile up instead of run one & only one copyQumber Ali
03/14/2022, 4:21 PMalex
03/14/2022, 4:23 PMQumber Ali
03/14/2022, 4:23 PMalex
03/14/2022, 4:24 PMQumber Ali
03/14/2022, 4:24 PMdagit-daemon
it's throwing this error.
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.DEADLINE_EXCEEDED
details = "Deadline Exceeded"
debug_error_string = "{"created":"@1647274968.556420179","description":"Error received from peer unix:/tmp/tmp64neq5nr","file":"src/core/lib/surface/call.cc","file_line":1074,"grpc_message":"Deadline Exceeded","grpc_status":4}"
alex
03/14/2022, 4:25 PMQumber Ali
03/14/2022, 4:25 PMdaniel
03/14/2022, 4:26 PMQumber Ali
03/14/2022, 4:26 PMimport random
import string
import json
from dagster import sensor, RunRequest, AssetKey, SkipReason, EventRecordsFilter
@sensor(pipeline_name="amz_process_continues_reports")
def thirty_minutes_report_sensor(context):
events = context.instance.get_event_records(
EventRecordsFilter(asset_key=AssetKey("thirty_minutes"))
)
if not events:
yield SkipReason("No new s3 files found for bucket dsw-matcher.")
return
for record_id, event in events:
print("Record_id: ", record_id)
print("event: ", event.user_message)
user_data = json.loads(event.user_message)
print("USER DATA: ", user_data)
report_run_key=str(user_data['record_id']) + '_' + str(user_data['id'])
yield RunRequest(
run_key=report_run_key,
run_config={
"resources": {"report_details": {"config": {"id": event.user_message}}},
},
tags={user_data["report_type"]: user_data["seller_id"]},
)
def get_random_string(length):
return "".join(
random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits)
for _ in range(length)
)
daniel
03/14/2022, 4:27 PMQumber Ali
03/14/2022, 4:31 PMalex
03/14/2022, 4:40 PMthirty_minutes
asset key events and publishing run requests for all of them (which then get de-duped by run_key
, only running new ones)
you can use a cursor
to only attempt to load the new events since the last sensor tick
https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors#sensor-optimizations-using-cursorsQumber Ali
03/14/2022, 9:33 PMimport random
import string
import json
from dagster import sensor, RunRequest, AssetKey, SkipReason, EventRecordsFilter
@sensor(pipeline_name="amz_process_continues_reports")
def thirty_minutes_report_sensor(context):
last_mtime = float(context.cursor) if context.cursor else 0
print(f"curesor: {last_mtime}")
events = context.instance.get_event_records(
EventRecordsFilter(asset_key=AssetKey("thirty_minutes"))
)
if not events:
yield SkipReason("No new s3 files found for bucket dsw-matcher.")
return
max_mtime = last_mtime
for record_id, event in events:
print("Record_id: ", record_id)
print("event: ", event.user_message)
user_data = json.loads(event.user_message)
print("USER DATA: ", user_data)
event_mtime = float(user_data['record_id'])
if event_mtime <= last_mtime:
continue
report_run_key=str(user_data['record_id']) + '_' + str(user_data['id'])
yield RunRequest(
run_key=report_run_key,
run_config={
"resources": {"report_details": {"config": {"id": event.user_message}}},
},
tags={user_data["report_type"]: user_data["seller_id"]},
)
max_mtime = max(max_mtime, event_mtime)
context.update_cursor(str(max_mtime))
daniel
03/15/2022, 2:45 PMQumber Ali
03/16/2022, 10:41 PM