How can I create a single asset sensor with resour...
# ask-community
g
How can I create a single asset sensor with resources? The documentation showcases: https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors#asset-sensors 1. when following along with it I get an exception that the run_configuration does not match in particular the filed
read_materialization
is not accepted:
Copy code
Sensor daemon caught an error for sensor my_asset_sensor : dagster.core.errors.DagsterInvalidConfigError: Error in config for job
    Error 1: Received unexpected config entry "read_materialization" at path root:ops. Expected: "{ baz_scd2_asset?: { config?: { assets?: { input_partitions?: { } output_partitions?: { } } } inputs?: { baz_asset?: Any } outputs?: [{ result?: { json: { path: String } pickle: { path: String } } }] } }".
How can this be fixed? Further: 2. For me it is quite confusing, that resources need to be specified multiple times: - for the asset grup - for any job instanciated from an asset group And perhaps it is a misconception, but so far I understand that for each asset that needs to automatically update based on an upstream changed asset I need an accompanying sensor. Therefore following along the builder pattern I need a job https://github.com/geoHeil/dagster-ssh-demo/blob/master/SSH_DEMO/sensors/sftp_sensor_asset_real.py#L270 Can I somehow combine more steps in a single job that trigger updates directly (like ops) for assets? When trying to not specify any (additional) run_config from the asset_sensor the job fails as well as the pyspark resource (which is required to compute the asset) is not available. this leads me to the following questions: - how can I get this to work (resources in sensors in a nice way)? Will this change/get simpler in the future? Can I somehow combine more steps in a single job that trigger updates directly (like ops) for assets? - is it correct (having so many jobs i.e. one for each asset that should auto-update)
And (3) (perhaps this is a bug in dagster or a misunderstanding on my side): executing this asset manually from the asset page (without the definition of the job) works totally fine. But trying to manually run the asset from the jobs launch butten fails due to the missing resource. This is quite confusing for me.
s
if I understand correctly, you're only using the pyspark resource inside the asset / IO manager code, not inside the sensor itself. is that right? if so, the pyspark resource should go on the AssetGroup, and you shouldn't need to reference it at all inside the sensor
regarding
read_materialization
, we don't currently offer the ability for a sensor to supply configuration when creating a
RunRequest
for an assets job, but @sean is working on it
g
correct - and it is already registered in the asset group. I do not want to send additional configuration for the run request. But still it is not found. So how can I trigger an update for an asset (from a sensor) which needs the pyspark resource?
s
when you say it's not found, are you able to share the error you're hitting and the code that's triggering the error?
g
The error is: nknown resource
pyspark
. Specify
pyspark
as a required resource on the compute / config function that accessed it. and the code https://github.com/geoHeil/dagster-ssh-demo/blob/master/SSH_DEMO/sensors/sftp_sensor_asset_real.py#L277 but if you want I could briefly also join on zoom.
s
do you possibly have the full stack trace available?
g
Copy code
``
`dagster.core.errors.DagsterUnknownResourceError: Unknown resource `pyspark`. Specify `pyspark` as a required resource on the compute / config function that accessed it.`
  `File "/usr/local/Caskroom/miniconda/base/envs/dagster-ssh-demo/lib/python3.9/site-packages/dagster/core/execution/plan/execute_plan.py", line 232, in dagster_event_sequence_for_step`
    `for step_event in check.generator(step_events):`
  `File "/usr/local/Caskroom/miniconda/base/envs/dagster-ssh-demo/lib/python3.9/site-packages/dagster/core/execution/plan/execute_step.py", line 306, in core_dagster_event_sequence_for_step`
    `for event_or_input_value in ensure_gen(step_input.source.load_input_object(step_context)):`
  `File "/usr/local/Caskroom/miniconda/base/envs/dagster-ssh-demo/lib/python3.9/site-packages/dagster/core/execution/plan/inputs.py", line 163, in load_input_object`
    `yield _load_input_with_input_manager(loader, load_input_context)`
  `File "/usr/local/Caskroom/miniconda/base/envs/dagster-ssh-demo/lib/python3.9/site-packages/dagster/core/execution/plan/inputs.py", line 587, in _load_input_with_input_manager`
    `value = input_manager.load_input(context)`
  `File "/usr/local/Caskroom/miniconda/base/envs/dagster-ssh-demo/lib/python3.9/site-packages/dagster/core/storage/root_input_manager.py", line 162, in load_input`
    `return self._load_fn(context)`
  `File "/usr/local/Caskroom/miniconda/base/envs/dagster-ssh-demo/lib/python3.9/site-packages/dagster/core/asset_defs/assets_job.py", line 280, in _root_manager`
    `return io_manager.load_input(input_context_with_upstream)`
  `File "/Users/geoheil/development/tma/dagster/SSH_DEMO/SSH_DEMO/resources/duckdb_parquet_io_manager.py", line 72, in load_input`
    `return context.resources.pyspark.spark_session.read.parquet(path)`
  `File "/usr/local/Caskroom/miniconda/base/envs/dagster-ssh-demo/lib/python3.9/site-packages/dagster/core/definitions/resource_definition.py", line 410, in __getattr__`
    `raise DagsterUnknownResourceError(attr)`


``
s
ahh, I see what's going on here. I believe this is a bug. I will get a fix out for this in our next release
g
So I would not expect to call build_resources given the fact that the assetGroup`s resource_defs already contain pyspark.
s
That's right - you shouldn't need to call
build_resources
for
pyspark
g
cool then I am looking forward to a fix. Please can you share the issue with me once you have created it.
One more question: (2) written above: Is there a way to have assets auto update without writing a (separate) sensor/job for each of these?
(assuming only a basic asset changed event without any fancy extras i.e. the one already connected from the lineage)
s
we don't currently have this, but we're interested in adding it in the future
here's a PR with a fix for the resources issue: https://github.com/dagster-io/dagster/pull/7459
🎉 1
g
Awesome! Regarding the PR: I observed you already pushed some changes. Do they already fix the issue? (even if for now perhaps without additional tests) I would then think about using/testing with the new PR-version of dagster today.
s
The changes I pushed should fix the issue
🎉 1
✅ 1