Noah Newton
06/27/2023, 8:40 PM@sensor(job=j01_dml_ops, minimum_interval_seconds=60)
def new_file_sensor(context):
now = datetime.now()
for filename in context.resources.ssh.get_connection().listdir("./in"):
if filename == "Socket*" + now.strftime("%Y-%m-%d") + ".csv":
context.resources.ssh.sftp_get("./in/Socket*" + now.strftime("%Y-%m-%d") + ".csv", "./socket_load_stage/socket-homes-passed.csv")
yield RunRequest()
Here is where I have the ssh resource defined:
defs = Definitions(
jobs = [j01_dml_ops, j02_copy_csv, j03_load_new, j04_copy_new, j05_load_old, j06_load_updated_address, j07_copy_updated, j08_load_updated_other],
schedules = [full_job_schedule],
sensors = [full_job_sensor, new_file_sensor, slack_alert, email_on_run_failure],
resources = {
"ssh": ssh_resource.configured({
"remote_host": "remote_host",
"username": "username",
"password": "password",
})
}
)
However, I am receiving the following error:
Sensor daemon caught an error for sensor new_file_sensor
Traceback (most recent call last):
File "/Users/noahnewton/mambaforge/envs/dag3.10/lib/python3.10/site-packages/dagster/_daemon/sensor.py", line 520, in _process_tick_generator
yield from _evaluate_sensor(
File "/Users/noahnewton/mambaforge/envs/dag3.10/lib/python3.10/site-packages/dagster/_daemon/sensor.py", line 583, in _evaluate_sensor
sensor_runtime_data = code_location.get_external_sensor_execution_data(
File "/Users/noahnewton/mambaforge/envs/dag3.10/lib/python3.10/site-packages/dagster/_core/host_representation/code_location.py", line 845, in get_external_sensor_execution_data
return sync_get_external_sensor_execution_data_grpc(
File "/Users/noahnewton/mambaforge/envs/dag3.10/lib/python3.10/site-packages/dagster/_api/snapshot_sensor.py", line 78, in sync_get_external_sensor_execution_data_grpc
raise DagsterUserCodeProcessError.from_error_info(result.error)
dagster._core.errors.DagsterUserCodeProcessError: dagster._core.errors.DagsterUnknownResourceError: Unknown resource `ssh`. Specify `ssh` as a required resource on the compute / config function that accessed it.
Stack Trace:
File "/Users/noahnewton/mambaforge/envs/dag3.10/lib/python3.10/site-packages/dagster/_grpc/impl.py", line 369, in get_external_sensor_execution
return sensor_def.evaluate_tick(sensor_context)
File "/Users/noahnewton/mambaforge/envs/dag3.10/lib/python3.10/site-packages/dagster/_core/definitions/sensor_definition.py", line 690, in evaluate_tick
result = list(self._evaluation_fn(context))
File "/Users/noahnewton/mambaforge/envs/dag3.10/lib/python3.10/site-packages/dagster/_core/definitions/sensor_definition.py", line 969, in _wrapped_fn
for item in result:
File "/Users/noahnewton/dev/noah-dagster/socket-load-stage/socket_load_stage/__init__.py", line 101, in new_file_sensor
for filename in context.resources.ssh.get_connection().listdir("./in"):
File "/Users/noahnewton/mambaforge/envs/dag3.10/lib/python3.10/site-packages/dagster/_core/definitions/scoped_resources_builder.py", line 29, in __getattr__
raise DagsterUnknownResourceError(name)
I was trying to follow the documentation for Dagster's SSH/SFTP integration, but there isn't a lot of examples available for it and I can't figure out how to use the ssh resource within the sensor. What am I doing wrong?sean
06/27/2023, 9:23 PMrequired_resource_keys={"ssh"}
as an argument to @sensor
it should work.sean
06/27/2023, 11:27 PMNoah Newton
06/27/2023, 11:28 PMHarry Park
06/28/2023, 12:49 PM