Hi team, our user code deployment is constantly cr...
# ask-community
a
Hi team, our user code deployment is constantly crashing due to OOM error. The k8s user code pod memory limit is set to 1 GB and the its just running 3 sensors which is calling some external grpc APIs. The memory is constantly increasing and crashing periodically. When I disable all the sensors the memory stays constant. Any recommended way to debug this?
d
Hi arun - is sharing the code an option? It seems pretty likely that this is a memory leak within the sensor code itself. Are there any resources left open that should be contextmanagers, that kind of thing?
I’ve had some success in the past using objgraph to identify memory leaks in my code: https://mg.pov.lt/objgraph/
a
Yes, but could not identify the leak. I enabled only one sensor now and checked the memory, but the memory still looks to be rising very slowly. Here is the my sensor code. It just polls the airflow dependencies DB for different sources, let me know if you could find anything suspicious.
Copy code
def dependencies_check_sensor(context: SensorExecutionContext):
    """
    Sensor that checks all the dependencies for all the event sources and triggers
    a event_source_pipeline run request for a event source if the dependencies are met.
    """
    service = ServiceClient()
    partition_date = utils.get_etl_partition_date()

    has_more_sources = True
    page_number = 1
    while has_more_sources:
        response = service.get_sources(
            page_number, constants.DEFAULT_PAGE_SIZE
        )
        for source in response.sources:
            <http://logger.info|logger.info>(f"Checking if the '{source.name}' source's dependencies are updated")
            incomplete_dep_tasks, last_dependency_end_time = check_dependencies(
                source.dependencies
            )

            if incomplete_dep_tasks:
                yield SkipReason(
                    f"Some dependencies of event source '{source.name}' were not met - "
                    f"[{', '.join(incomplete_dep_tasks)}]"
                )
            else:
                partition_set = source_partition_sets.get(source.name, None)
                if partition_set:
                    partition = partition_set.get_partition(name=partition_date)
                    yield RunRequest(
                        run_key=f"{source.name}:{partition_date}:{last_dependency_end_time.strftime('%H:%M:%S')}",
                        run_config=partition_set.run_config_for_partition(partition),
                        tags={
                            **partition_set.tags_for_partition(partition),
                            "event_source": source.name,
                            "airflow_dependencies": ",".join(source.dependencies),
                        },
                    )

        if response.paging.total_pages <= page_number:
            has_more_sources = False
        page_number += 1
d
Is ServiceClient something that could be a context manager / needs to clean up any resources on destruction?
That’s the only thing that jumps out
Running objgraph on each run might give some clues, I haven’t run it within a sensor before
a
ServiceClient is a normal grpc client. Let me double check if any resources need to destroyed. Thanks for the inputs. A quick follow up: Is there any way that we can now access the resource instances from with in the sensors? Or we should always create a fresh instance for every sensor run?
d
It’s need to be a fresh instance for each run currently - dagster resources can only be accessed within solids at the moment, not within sensors
thankyou 1
a
After some profiling, I suspect memory leak was happening due to the usage of
ResourceDefinition.mock_resource()
in the pipeline definition. For every execution of the sensor the number of
mock._Call
objects kept on accumulating. After removing mock_resource I am not observing the objects accumulation, the memory started increasing at a slower rate, but it is rising and crashing eventually. May be there is something else that is still causing a leak.
Is there any way to run the sensor locally with context similar to prod? I tried this approach, but got the following error
Copy code
repo/etl/sensors/check_sensors.py", line 8, in <module>
    context = build_sensor_context(instance=MagicMock(spec=DagsterInstance),
  File "/Users/abalasubramani/Projects/metrics-repo/venv/lib/python3.8/site-packages/dagster/core/definitions/sensor.py", line 418, in build_sensor_context
    return SensorEvaluationContext(
  File "/Users/abalasubramani/Projects/metrics-repo/venv/lib/python3.8/site-packages/dagster/core/definitions/sensor.py", line 55, in __init__
    self._instance_ref = check.opt_inst_param(instance_ref, "instance_ref", InstanceRef)
  File "/Users/abalasubramani/Projects/metrics-repo/venv/lib/python3.8/site-packages/dagster/check/__init__.py", line 207, in opt_inst_param
    raise _param_type_mismatch_exception(obj, ttype, param_name)
dagster.check.ParameterCheckError: Param "instance_ref" is not a InstanceRef. Got <MagicMock name='mock.get_ref()' id='4630927968'> which is type <class 'unittest.mock.MagicMock'>.
d
Hm, will look into the memory leak tomorrow, that's surprising. For sensor testing, there's a
build_sensor_context
method you can use now: https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors#testing-sensors
a
Thanks Daniel for the response. I tried it and directly calling the sensor was not executing the sensor code. So I tried calling the
evaluate_tick
method and got the following error.
Copy code
File "/Users/abalasubramani/Projects/metrics-repo/etl/sensors/first_day_aggregate_sensor.py", line 102, in source_asset_check_sensor
    num_source_asset_events = get_latest_asset_for_partition(
  File "/Users/abalasubramani/Projects/metrics-repo/etl/sensors/helpers.py", line 11, in get_latest_asset_for_partition
    return context.instance.events_for_asset_key(
  File "/Users/abalasubramani/Projects/metrics-repo/venv/lib/python3.8/site-packages/dagster/core/definitions/sensor.py", line 76, in instance
    raise DagsterInvariantViolationError(
dagster.core.errors.DagsterInvariantViolationError: Attempted to initialize dagster instance, but no instance reference was provided.
Copy code
for i in range(0, 10000):
    context = build_sensor_context(cursor="{}")
    dependencies_check_sensor.evaluate_tick(context)
So I tried to provide mock instance, but still could not make it work
Copy code
context = build_sensor_context(instance=MagicMock(spec=DagsterInstance),
                                   repository_name="metrics-repo", cursor="{}")
d
got it, I think I see the problem. Although even once you got it to work - since your sensor uses the instance, you would also need to mock out the result of events_for_asset_key right? In order for the sensor to work?
(Still testing but I believe https://github.com/dagster-io/dagster/pull/4799/files will fix the get_ref mock issue in a testing context)
Back to the memory leak for a second - just to double-check, you're only actually using the mode that uses
ResourceDefinition.mock_resource()
in tests right? i.e. the leak is happening despite the resource in question not actually being called as part of the sensor execution?
The strange thing about this is that sensors typically don't load resources, or even do much with the pipeline definition at all. Unless your sensor code references the pipeline or its resources directly within its code? actually if its possible to share the sensor code (over DM would be fine too?) that would be helpful
a
Yes, I might have to mock the result of events_for_asset_key and not sure about how to do it. It has been quite hard to profile sensors with in docker / prod K8s environment. So trying to mimic the sensors loop behavior in a single python process which will be easier for profiling. Let me know if you have thoughts on any other ways to do it.
Yes, I am not using that mode at all. I am not sure how the pipeline definitions are instantiated. I will DM the sensor code