https://dagster.io/ logo
#dagster-support
Title
# dagster-support
b

Brian Pohl

02/07/2023, 12:33 AM
i've got a question about a specific scenario that, unfortunately, it would be hard for me recreate. but i am hoping we can find the answer even without recreating this. i'll give you the question first, then the context. when running a Dagster step, does Dagster check the exit code for Python processes, or does it simply wait for the Python process to end? context: I had a Dagster step running a fairly long Python script that was loading many files. i forgot to close the connections to my files, so i suspect that i was leaking a lot of memory. my Dagster step completed successfully, showing green in the Dagit UI and moving on to the next step. but looking at the logs of my Kubernetes pod, I noticed that the script appeared to be interrupted. the
print
statements at the end of my script never ran. it just stopped somewhere in the middle. surely enough, not all of my files were loaded, confirming that we never reached the end of my script. i will paste the last few lines of the logs in the thread 🧵 i had an issue like this recently with an unrelated Python script - i think memory leaks cause the Python kernel to crash. when this happens, the Python process ends but does not raise any error messages. the exit code for the process is 247.
here are the last few lines of the logs from this process. as far as I can see, the DagsterEvent lines don't report anything as being amiss.
d

daniel

02/07/2023, 3:04 AM
hi brian - which executor is this using?
typically it will be your op function finishing that triggers the step to be marked as succeeded in dagit
b

Brian Pohl

02/07/2023, 10:21 PM
i'm using the
k8s_job_executor
. i just updated my original message, as i managed to run an experiment that caused my Python kernel to crash as it did not too long ago. i found that the exit code here is 247. if that were to happen in my Kubernetes pod, would Dagster pick it up as a failure? here's a screenshot of my terminal at the end of my experiment. again, the script i was running here is not related to Dagster or the script that failed, but i suspect the same thing happened. the script just stops running with no error reported.
@daniel any update on this?
d

daniel

02/21/2023, 10:34 PM
hey brian - did the crash here happen after the op function finished? I believe the executor waits for a STEP_SUCCESS event which is only fired after the step finishes, but it's possible that some kind of crash during the cleanup after the op function could cause these symptoms
I don't think it specifically looks at the exit code of the pod that it spins up for each step
b

Brian Pohl

02/21/2023, 10:34 PM
no, the crash happened in the middle of the op's execution. but then Dagster reported the step as successful and moved onto the next step
d

daniel

02/21/2023, 10:35 PM
hmmm, in general I would not expect that to happen - the thing that emits the STEP_SUCCESS event happens within the process. It wasn't in a subprocess launched from within the op or something like that? Any chance you could share a code snippet that reproduces?
b

Brian Pohl

02/21/2023, 10:36 PM
it's pretty complex - we are exporting data to S3, downloading it as a CSV, and loading the data into our Typesense cluster. i'd have to send over several files.
but there are no subprocesses
d

daniel

02/21/2023, 10:37 PM
OK - i think there would have to be something slightly subtle or out of the ordinary here in order for the step to succeed but the pod to crash. In general, a crash or exception within the op body should cause the step to fail
In those logs you sent i can see some other events that only happen after the op body finishes (e.g. persisting the output of the op function) - could this have happened while a resource was cleaning itself up or something?
b

Brian Pohl

02/21/2023, 10:40 PM
oh sorry, so the pod didn't crash. the Python process within the pod crashed.
d

daniel

02/21/2023, 10:40 PM
ok, yeah, that's what i meant too
b

Brian Pohl

02/21/2023, 10:40 PM
but in doing so, the Python process "ends", and so I imagine that, if Dagster was waiting for the end of my Python process, then it met that condition
could this have happened while a resource was cleaning itself up or something?
hmmm like what? can you give an example of that?
d

daniel

02/21/2023, 10:41 PM
ah no, if you're using the k8s_job_executor it's actually scanning the event log to know when it finished
so i think the mystery is how the step managed to somehow still write out a STEP_SUCCESS event despite crashing
b

Brian Pohl

02/21/2023, 10:42 PM
yeah, that's what's got me confused. i can't say for sure, but if this crash is like the one i experienced a while ago (outside of Dagster), then exit code would've been 247.
d

daniel

02/21/2023, 10:43 PM
Here's an example of a resource that fails on cleanup:
Copy code
@resource
@contextmanager
def db_connection():
    try:
        db_conn = get_db_connection()
        yield db_conn
    finally:
        # crash here
the only way I can imagine this happening is if there was some work that was kicked off within the op body that kept on running after the op finished (without raising an exception) that then crashed
You mentioend at the beginning "I had a Dagster step running a fairly long Python script" - do you have code for the part that runs the Python script? Independent of the details of whatever goes on that script
b

Brian Pohl

02/21/2023, 10:46 PM
regarding resources, i don't have any custom resources defined. my resources are
dbt_cli_resource
,
snowflake_resource
,
s3_resource
, and
s3_pickle_io_manager
. regarding it continuing after the finish, i wish it had. but we noticed that not all the files were loaded into our Typesense cluster, and surely enough when i pulled the logs, it made sense that the data stopped loading right where the logs had suddenly stopped.
ok yeah, give me a second while i try to summarize/psuedo-ize the code
d

daniel

02/21/2023, 10:48 PM
cool, the main thing i'll want to check is that it actually waits for the python script to finish - if it didn't, that would explain what you're seeing here, so I just want to rule that out
b

Brian Pohl

02/21/2023, 10:49 PM
Copy code
def ingest_data(export_dict, host, collection_name, api_key):
    '''Downloads data from s3 based on the configurations in export_dict. Then iterates
    through each row of the downloaded data, writing them as documents into the specified
    Typesense collection.'''

    s3, response = get_files(export_dict)
    client = setup_typesense_client(host, api_key)
    start = time.time()

    if 'Contents' in response.keys():
        for item in response['Contents']:
            print('downloading file: ' + item['Key'])

            s3.download_file(EXPORT_BUCKET, item['Key'], 'tmp/address.csv'). # overwrites address.csv with each downloaded file

            print('finished downloading file')

            with open('tmp/address.csv') as f:
                reader = csv.DictReader(f)
                records = []
                i = 0
                # iterate through CSV, create Documents, write them into Typesense in batches of 100,000
                for row in reader:
                    record = create_document(row)
                    i += 1
                    records.append(record)
                    if len(records) == 100000:
                        # Write this batch of 100,000
                        print("Sending", len(records))
                        client.collections[collection_name].documents.import_(records, {'action': 'upsert'})
                        records = []
                # Write any leftovers that didn't make it up to 100,000
                print("Sending", len(records))
                client.collections[collection_name].documents.import_(records, {'action': 'upsert'})

    print('done in: ' + str(time.time() - start))

@op(required_resource_keys={'input_values','s3'})
def typesense_address_handler(context, logged_export_timestamp):
    # there is a lot of variable declaration stuff, and then:

    # Download exported S3 data and load it into Typesense.
    ingest_data(address_export_dict, ts_host, ts_collection, ts_key)
so you can see in the logs i posted originally where the
Sending
and
downloading file
print statements are happening. but it never prints
done in:
d

daniel

02/21/2023, 10:50 PM
do you have a link to a run in cloud where this happened?
b

Brian Pohl

02/21/2023, 10:51 PM
oh yeah, i think i can find that. let me track it down. though the logs i pasted in are copied from the K8s pod itself. they do not get exposed in Dagit.