https://dagster.io/ logo
#ask-community
Title
# ask-community
q

Qumber Ali

03/14/2022, 11:07 AM
Hi all. I'm facing an issue that I have only 20 jobs max concurrency but my CPU being consumed fully, even thought when only 1 oe 2 jobs running the consistant state of the system is below int the screen shot, I can see there are lots of processes with these line, not getting why it's happening.
/usr.bin.python3 -m dagster..grpc --lazy-load-user-code --socket
a

alex

03/14/2022, 4:00 PM
it appears that there are multiple instances of
dagit
and the
daemon
running
I can see there are lots of processes with these line, not getting why it’s happening
these are grpc server subprocesses for dagit/daemon. I am guessing you have many more than needed due to the extraneous copies of the parent dagit/daemon processes running
q

Qumber Ali

03/14/2022, 4:01 PM
lemme check that
a

alex

03/14/2022, 4:02 PM
i could just be mistaken about how
htop
displays separate threads
how many users are you serving with this
dagit
?
q

Qumber Ali

03/14/2022, 4:09 PM
only one.
a

alex

03/14/2022, 4:09 PM
are there many tabs open? im surprised to see so much cpu use for one user
q

Qumber Ali

03/14/2022, 4:11 PM
I just stopped my systemd file which trigger the dagster-daemon and now everything got stopped
a

alex

03/14/2022, 4:13 PM
what version are you on? you’ll want one copy of the daemon running assuming you have sensors/schedules, but its possible your
systemd
set-up was causing it to pile up instead of run one & only one copy
q

Qumber Ali

03/14/2022, 4:21 PM
I just reboted the server and run again, now looking better, but still running more than one is it fine?
a

alex

03/14/2022, 4:23 PM
you’ll have to check what exactly htop is displaying, im not sure if its threads/processes you should only have one dagit & daemon process, but they will have a number of threads
q

Qumber Ali

03/14/2022, 4:23 PM
it's thread
a

alex

03/14/2022, 4:24 PM
are you using sensors?
q

Qumber Ali

03/14/2022, 4:24 PM
I just saw the output of
dagit-daemon
it's throwing this error.
Copy code
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "{"created":"@1647274968.556420179","description":"Error received from peer unix:/tmp/tmp64neq5nr","file":"src/core/lib/surface/call.cc","file_line":1074,"grpc_message":"Deadline Exceeded","grpc_status":4}"
yes i'm using sensors.
a

alex

03/14/2022, 4:25 PM
i would profile the code you are running inside the sensor, it seems potentially expensive and could be the source of your problem
q

Qumber Ali

03/14/2022, 4:25 PM
I have multiple sensors.
Let me share one as an example
d

daniel

03/14/2022, 4:26 PM
More information on sensor timeouts here in the last paragraph of this section: https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors#evaluation-interval
Starting with "If a sensor evaluation function takes more than 60 seconds to return its results"
q

Qumber Ali

03/14/2022, 4:26 PM
Copy code
import random
import string
import json
from dagster import sensor, RunRequest, AssetKey, SkipReason, EventRecordsFilter


@sensor(pipeline_name="amz_process_continues_reports")
def thirty_minutes_report_sensor(context):
    events = context.instance.get_event_records(
        EventRecordsFilter(asset_key=AssetKey("thirty_minutes"))
    )
    if not events:
        yield SkipReason("No new s3 files found for bucket dsw-matcher.")
        return
    for record_id, event in events:
        print("Record_id: ", record_id)
        print("event: ", event.user_message)
        user_data = json.loads(event.user_message)
        print("USER DATA: ", user_data)
        report_run_key=str(user_data['record_id']) + '_' + str(user_data['id'])
        yield RunRequest(
            run_key=report_run_key,
            run_config={
                "resources": {"report_details": {"config": {"id": event.user_message}}},
            },
            tags={user_data["report_type"]: user_data["seller_id"]},
        )


def get_random_string(length):
    return "".join(
        random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits)
        for _ in range(length)
    )
d

daniel

03/14/2022, 4:27 PM
That sensor doesn't look particularly slow to me, do you have a sense of how many records are being returned in each sensor call? It does look like a good use case for a cursor to me
q

Qumber Ali

03/14/2022, 4:31 PM
in 100s
Everything is stopped here
Please help
a

alex

03/14/2022, 4:40 PM
the sensor you have above will get continuously slower as it is fetching all
thirty_minutes
asset key events and publishing run requests for all of them (which then get de-duped by
run_key
, only running new ones) you can use a
cursor
to only attempt to load the new events since the last sensor tick https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors#sensor-optimizations-using-cursors
q

Qumber Ali

03/14/2022, 9:33 PM
I have updated my sensor but still getting the same error.
Copy code
import random
import string
import json
from dagster import sensor, RunRequest, AssetKey, SkipReason, EventRecordsFilter


@sensor(pipeline_name="amz_process_continues_reports")
def thirty_minutes_report_sensor(context):
    last_mtime = float(context.cursor) if context.cursor else 0
    print(f"curesor: {last_mtime}")
    events = context.instance.get_event_records(
        EventRecordsFilter(asset_key=AssetKey("thirty_minutes"))
    )
    if not events:
        yield SkipReason("No new s3 files found for bucket dsw-matcher.")
        return
    max_mtime = last_mtime
    for record_id, event in events:
        print("Record_id: ", record_id)
        print("event: ", event.user_message)
        user_data = json.loads(event.user_message)
        print("USER DATA: ", user_data)
        event_mtime = float(user_data['record_id'])
        if event_mtime <= last_mtime:
            continue
        report_run_key=str(user_data['record_id']) + '_' + str(user_data['id'])
        yield RunRequest(
            run_key=report_run_key,
            run_config={
                "resources": {"report_details": {"config": {"id": event.user_message}}},
            },
            tags={user_data["report_type"]: user_data["seller_id"]},
        )
        max_mtime = max(max_mtime, event_mtime)
    context.update_cursor(str(max_mtime))
And also my cursor not being updated, it's same on each iteration of sensor.
on every thirty seconds when sensor run the cursor is None.
d

daniel

03/15/2022, 2:45 PM
What you have there looks very similar to an asset sensor: https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors#asset-sensors - which handles the cursors for you. Would using an @asset_sensor instead of an @sensor be an option?
If not, take a look at how its implemented: https://sourcegraph.com/github.com/dagster-io/dagster/-/blob/python_modules/dagster/dagster/core/definitions/sensor_definition.py?L602-628 - it passes the cursor into the get_event_records call and applies a limit and event type filter, both of which should help make the sensor run a lot faster
q

Qumber Ali

03/16/2022, 10:41 PM
Thanks @daniel it worked for me.
condagster 2