Noud Jaspers
05/25/2023, 3:19 PMResourceDependency
in the IOmanager's class but it says that it is of type None.owen
05/25/2023, 4:23 PMNoud Jaspers
05/25/2023, 4:31 PMclass PartitionedParquetObjectGCSIOManager(ConfigurableIOManager):
postgres: ResourceDependency[PostgresResource]
def load_input(self, context: InputContext) -> Any:
...
def handle_output(self, context: OutputContext, tables: Any) -> None:
...
@asset()
def polaris(postgres: PostgresResource):
return {k: postgres.get_table_data(k) for k in postgres.get_tables()}
defs = Definitions(
assets=[polaris],
resources={
"io_manager": PartitionedParquetObjectGCSIOManager(),
"postgres": PostgresResource(
connection_string=get_connection_string(config["polaris_db"])
),
},
)
from dagster import (
Definitions,
asset,
ConfigurableResource,
ResourceDependency,
ConfigurableIOManager,
InputContext,
OutputContext,
)
from typing import Any
class PostgresResource(ConfigurableResource):
project: str
def get_test():
return [1, 2, 3, 45]
class PartitionedParquetObjectGCSIOManager(ConfigurableIOManager):
gcs: ResourceDependency[PostgresResource]
def load_input(self, context: InputContext) -> Any:
...
def handle_output(self, context: OutputContext, tables: Any) -> None:
print(self.gcs.get_test())
@asset
def asset1():
# create df ...
return [1, 2, 3, 4, 5, 6]
@asset
def asset2(asset1):
return asset1[:5]
defs = Definitions(
assets=[asset1, asset2],
resources={
"io_manager": PartitionedParquetObjectGCSIOManager(),
"gcs": PostgresResource(project="my-cool-project"),
},
)
dagster._core.errors.DagsterExecutionHandleOutputError: Error occurred while handling output "result" of step "asset1":
File "/home/noud/Documents/Projects/parthenon/.venv/lib/python3.10/site-packages/dagster/_core/execution/plan/execute_plan.py", line 262, in dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/home/noud/Documents/Projects/parthenon/.venv/lib/python3.10/site-packages/dagster/_core/execution/plan/execute_step.py", line 375, in core_dagster_event_sequence_for_step
for evt in _type_check_and_store_output(step_context, user_event):
File "/home/noud/Documents/Projects/parthenon/.venv/lib/python3.10/site-packages/dagster/_core/execution/plan/execute_step.py", line 428, in _type_check_and_store_output
for evt in _store_output(step_context, step_output_handle, output):
File "/home/noud/Documents/Projects/parthenon/.venv/lib/python3.10/site-packages/dagster/_core/execution/plan/execute_step.py", line 603, in _store_output
for elt in iterate_with_context(
File "/home/noud/Documents/Projects/parthenon/.venv/lib/python3.10/site-packages/dagster/_utils/__init__.py", line 443, in iterate_with_context
with context_fn():
File "/usr/lib/python3.10/contextlib.py", line 153, in __exit__
self.gen.throw(typ, value, traceback)
File "/home/noud/Documents/Projects/parthenon/.venv/lib/python3.10/site-packages/dagster/_core/execution/plan/utils.py", line 84, in op_execution_error_boundary
raise error_cls(
The above exception was caused by the following exception:
AttributeError: 'NoneType' object has no attribute 'get_test'
File "/home/noud/Documents/Projects/parthenon/.venv/lib/python3.10/site-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
yield
File "/home/noud/Documents/Projects/parthenon/.venv/lib/python3.10/site-packages/dagster/_utils/__init__.py", line 445, in iterate_with_context
next_output = next(iterator)
File "/home/noud/Documents/Projects/parthenon/.venv/lib/python3.10/site-packages/dagster/_core/execution/plan/execute_step.py", line 593, in _gen_fn
gen_output = output_manager.handle_output(output_context, output.value)
File "/home/noud/Documents/Projects/parthenon/dagster/prod/prod/__init__.py", line 54, in handle_output
print(self.gcs.get_test())
ben
05/26/2023, 11:46 AMgcs = PostgresResource(project="my-cool-project")
defs = Definitions(
assets=[asset1, asset2],
resources={
"io_manager": PartitionedParquetObjectGCSIOManager(gcs=gcs),
"gcs": gcs
},
)
defs = Definitions(
assets=[asset1, asset2],
resources={
"io_manager": PartitionedParquetObjectGCSIOManager(gcs=ResourceByKey("gcs")),
"gcs": PostgresResource(project="my-cool-project")
},
)
Noud Jaspers
05/26/2023, 2:40 PM