geoHeil
04/14/2022, 1:45 PMread_materialization
is not accepted:
Sensor daemon caught an error for sensor my_asset_sensor : dagster.core.errors.DagsterInvalidConfigError: Error in config for job
Error 1: Received unexpected config entry "read_materialization" at path root:ops. Expected: "{ baz_scd2_asset?: { config?: { assets?: { input_partitions?: { } output_partitions?: { } } } inputs?: { baz_asset?: Any } outputs?: [{ result?: { json: { path: String } pickle: { path: String } } }] } }".
How can this be fixed? Further:
2. For me it is quite confusing, that resources need to be specified multiple times:
- for the asset grup
- for any job instanciated from an asset group
And perhaps it is a misconception, but so far I understand that for each asset that needs to automatically update based on an upstream changed asset I need an accompanying sensor. Therefore following along the builder pattern I need a job
https://github.com/geoHeil/dagster-ssh-demo/blob/master/SSH_DEMO/sensors/sftp_sensor_asset_real.py#L270
Can I somehow combine more steps in a single job that trigger updates directly (like ops) for assets?
When trying to not specify any (additional) run_config from the asset_sensor the job fails as well as the pyspark resource (which is required to compute the asset) is not available.
this leads me to the following questions:
- how can I get this to work (resources in sensors in a nice way)? Will this change/get simpler in the future? Can I somehow combine more steps in a single job that trigger updates directly (like ops) for assets?
- is it correct (having so many jobs i.e. one for each asset that should auto-update)geoHeil
04/14/2022, 2:10 PMsandy
04/14/2022, 5:28 PMsandy
04/14/2022, 5:29 PMread_materialization
, we don't currently offer the ability for a sensor to supply configuration when creating a RunRequest
for an assets job, but @sean is working on itgeoHeil
04/14/2022, 9:17 PMsandy
04/14/2022, 9:21 PMgeoHeil
04/14/2022, 9:22 PMpyspark
. Specify pyspark
as a required resource on the compute / config function that accessed it. and the code https://github.com/geoHeil/dagster-ssh-demo/blob/master/SSH_DEMO/sensors/sftp_sensor_asset_real.py#L277 but if you want I could briefly also join on zoom.sandy
04/14/2022, 9:25 PMgeoHeil
04/14/2022, 9:26 PM``
`dagster.core.errors.DagsterUnknownResourceError: Unknown resource `pyspark`. Specify `pyspark` as a required resource on the compute / config function that accessed it.`
`File "/usr/local/Caskroom/miniconda/base/envs/dagster-ssh-demo/lib/python3.9/site-packages/dagster/core/execution/plan/execute_plan.py", line 232, in dagster_event_sequence_for_step`
`for step_event in check.generator(step_events):`
`File "/usr/local/Caskroom/miniconda/base/envs/dagster-ssh-demo/lib/python3.9/site-packages/dagster/core/execution/plan/execute_step.py", line 306, in core_dagster_event_sequence_for_step`
`for event_or_input_value in ensure_gen(step_input.source.load_input_object(step_context)):`
`File "/usr/local/Caskroom/miniconda/base/envs/dagster-ssh-demo/lib/python3.9/site-packages/dagster/core/execution/plan/inputs.py", line 163, in load_input_object`
`yield _load_input_with_input_manager(loader, load_input_context)`
`File "/usr/local/Caskroom/miniconda/base/envs/dagster-ssh-demo/lib/python3.9/site-packages/dagster/core/execution/plan/inputs.py", line 587, in _load_input_with_input_manager`
`value = input_manager.load_input(context)`
`File "/usr/local/Caskroom/miniconda/base/envs/dagster-ssh-demo/lib/python3.9/site-packages/dagster/core/storage/root_input_manager.py", line 162, in load_input`
`return self._load_fn(context)`
`File "/usr/local/Caskroom/miniconda/base/envs/dagster-ssh-demo/lib/python3.9/site-packages/dagster/core/asset_defs/assets_job.py", line 280, in _root_manager`
`return io_manager.load_input(input_context_with_upstream)`
`File "/Users/geoheil/development/tma/dagster/SSH_DEMO/SSH_DEMO/resources/duckdb_parquet_io_manager.py", line 72, in load_input`
`return context.resources.pyspark.spark_session.read.parquet(path)`
`File "/usr/local/Caskroom/miniconda/base/envs/dagster-ssh-demo/lib/python3.9/site-packages/dagster/core/definitions/resource_definition.py", line 410, in __getattr__`
`raise DagsterUnknownResourceError(attr)`
``
sandy
04/14/2022, 9:29 PMgeoHeil
04/14/2022, 9:31 PMsandy
04/14/2022, 9:32 PMbuild_resources
for pyspark
geoHeil
04/14/2022, 9:33 PMgeoHeil
04/14/2022, 10:05 PMgeoHeil
04/14/2022, 10:07 PMsandy
04/14/2022, 10:33 PMsandy
04/14/2022, 10:57 PMgeoHeil
04/15/2022, 3:48 AMsandy
04/15/2022, 4:13 AM