EDIT: Had another error here i thought was related...
# ask-community
c
EDIT: Had another error here i thought was related but was actually able to resolve and it was unrelated. Keeping the original issue I was concerned about: I'm trying to launch ~50 mult-asset-sensors which each will launch a single asset-job based on the partition materializations of the respective assets they're listening to, however most ticks are failing with rpc errors or
Copy code
dagster._core.errors.DagsterUserCodeUnreachableError: The sensor tick timed out due to taking longer than 60 seconds to execute the sensor function. One way to avoid this error is to break up the sensor work into chunks, using cursors to let subsequent sensor calls pick up where the previous call left off.
These sensors aren't particularly complex, they're mostly a copy/paste of this example, so i'm not sure why they're taking so long to execute. My dagster-daemon is running on a pod with 8cpu and 16gb memory maybe it's just under-provisioned?
I'm not sure what all needs to happen during the sensor tick invocation and why it would take so long
Even if i just make an essentially no-op body to the sensor i still hit the timeout often, i think it's because of how many sensors I have and how the sensor invocation loop is implemented? Is there some way to increase that time-out passed 60s
What's interesting is these sensors are all operating fine when i run locally via
Copy code
dagster dev -m #module
Something of note, our
Definitions
takes some time to build, is it possible this is being re-built every time?
Okay my other issue was unrelated (editted original comment to reflect), but i'm still having this timeout issue with my sensors
Bumping on this, would appreciate some advice: It seems to be when there are a lot of materializations (in this case ~50) that exist that have yet to be consumed by the sensor (for example when turning a sensor on for the first time against existing assets), the call to
Copy code
context.latest_materialization_records_by_partition_and_asset()
Is actually pretty inefficient, and can take a good amount of time to execute just by itself, let alone any other logic you might be trying to run in the sensor tick invocation
Hence why the timeout wasn't happening on my local which had an ephemeral instance and hence no existing partition materializations for the monitored assets
v
What storage did you set up for event logs on prod? It might be a basic database performance issues.
I had some success with the following pattern: 1. Create all job definitions first 2. Create one super-sensor pointing to all jobs. 3. In sensor code call ‘.get_asset_records’ and ‘.get_partition_counts’ for all assets, only once per tick. 4. Find and check matching parent/child partitions using asset_graph methods. 5. Yield RunRequests for specific combinations of jobs and partition keys. No cursors. No event log full scanning. Least possible number of queries per tick (so far). Unfortunately, this method does not provide info about last mat timestamp for each partition. But hopefully we’ll be able to fix it soon.
c
We're using postgres for event log storage. Is all the pertinent info in the event log storage, perhaps our postgres is underprovisioned, or the schema/query is inefficient for the look-up needed for this. Have you stress-tested something similar to what i'm describing above? Sorry let me try to understand better what your alternative is, you're saying we create a super sensor monitoring basically all assets. What are
.get_asset_records
and
get_partition_counts
methods of? I don't see them on a
MultiAssetSensorEvaluationContext
or a regular
SensorEvaluationContext
What asset_graph methods you referring to, i'm assuming there are some utilities I can use to do basic dag filtering operations on an asset graph as defined in the code location
Definitions
object.
v
Copy code
context.repository_def.asset_graph

context.instance.get_asset_records()

partitioned_asset_keys = [asset_key for asset_key in self.asset_graph.all_asset_keys if self.asset_graph.is_partitioned(asset_key)]

context.instance.get_materialization_count_by_partition(partitioned_asset_keys)
A lot of stuff is available in
context
.