Arun Kumar
09/09/2021, 10:35 PMdaniel
09/09/2021, 10:40 PMArun Kumar
09/09/2021, 10:44 PMdef dependencies_check_sensor(context: SensorExecutionContext):
"""
Sensor that checks all the dependencies for all the event sources and triggers
a event_source_pipeline run request for a event source if the dependencies are met.
"""
service = ServiceClient()
partition_date = utils.get_etl_partition_date()
has_more_sources = True
page_number = 1
while has_more_sources:
response = service.get_sources(
page_number, constants.DEFAULT_PAGE_SIZE
)
for source in response.sources:
<http://logger.info|logger.info>(f"Checking if the '{source.name}' source's dependencies are updated")
incomplete_dep_tasks, last_dependency_end_time = check_dependencies(
source.dependencies
)
if incomplete_dep_tasks:
yield SkipReason(
f"Some dependencies of event source '{source.name}' were not met - "
f"[{', '.join(incomplete_dep_tasks)}]"
)
else:
partition_set = source_partition_sets.get(source.name, None)
if partition_set:
partition = partition_set.get_partition(name=partition_date)
yield RunRequest(
run_key=f"{source.name}:{partition_date}:{last_dependency_end_time.strftime('%H:%M:%S')}",
run_config=partition_set.run_config_for_partition(partition),
tags={
**partition_set.tags_for_partition(partition),
"event_source": source.name,
"airflow_dependencies": ",".join(source.dependencies),
},
)
if response.paging.total_pages <= page_number:
has_more_sources = False
page_number += 1
daniel
09/09/2021, 10:48 PMArun Kumar
09/09/2021, 10:58 PMdaniel
09/09/2021, 11:04 PMArun Kumar
09/12/2021, 11:33 PMResourceDefinition.mock_resource()
in the pipeline definition. For every execution of the sensor the number of mock._Call
objects kept on accumulating. After removing mock_resource I am not observing the objects accumulation, the memory started increasing at a slower rate, but it is rising and crashing eventually. May be there is something else that is still causing a leak.repo/etl/sensors/check_sensors.py", line 8, in <module>
context = build_sensor_context(instance=MagicMock(spec=DagsterInstance),
File "/Users/abalasubramani/Projects/metrics-repo/venv/lib/python3.8/site-packages/dagster/core/definitions/sensor.py", line 418, in build_sensor_context
return SensorEvaluationContext(
File "/Users/abalasubramani/Projects/metrics-repo/venv/lib/python3.8/site-packages/dagster/core/definitions/sensor.py", line 55, in __init__
self._instance_ref = check.opt_inst_param(instance_ref, "instance_ref", InstanceRef)
File "/Users/abalasubramani/Projects/metrics-repo/venv/lib/python3.8/site-packages/dagster/check/__init__.py", line 207, in opt_inst_param
raise _param_type_mismatch_exception(obj, ttype, param_name)
dagster.check.ParameterCheckError: Param "instance_ref" is not a InstanceRef. Got <MagicMock name='mock.get_ref()' id='4630927968'> which is type <class 'unittest.mock.MagicMock'>.
daniel
09/12/2021, 11:59 PMbuild_sensor_context
method you can use now: https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors#testing-sensorsArun Kumar
09/13/2021, 12:03 AMevaluate_tick
method and got the following error.
File "/Users/abalasubramani/Projects/metrics-repo/etl/sensors/first_day_aggregate_sensor.py", line 102, in source_asset_check_sensor
num_source_asset_events = get_latest_asset_for_partition(
File "/Users/abalasubramani/Projects/metrics-repo/etl/sensors/helpers.py", line 11, in get_latest_asset_for_partition
return context.instance.events_for_asset_key(
File "/Users/abalasubramani/Projects/metrics-repo/venv/lib/python3.8/site-packages/dagster/core/definitions/sensor.py", line 76, in instance
raise DagsterInvariantViolationError(
dagster.core.errors.DagsterInvariantViolationError: Attempted to initialize dagster instance, but no instance reference was provided.
for i in range(0, 10000):
context = build_sensor_context(cursor="{}")
dependencies_check_sensor.evaluate_tick(context)
context = build_sensor_context(instance=MagicMock(spec=DagsterInstance),
repository_name="metrics-repo", cursor="{}")
daniel
09/13/2021, 12:08 AMResourceDefinition.mock_resource()
in tests right? i.e. the leak is happening despite the resource in question not actually being called as part of the sensor execution?Arun Kumar
09/13/2021, 12:38 AM