Igor Shraga
05/22/2023, 2:17 PM@resource
.
from dagster import ConfigurableResource
class PostgressConfig(ConfigurableResource): # type:ignore
host: str = "my-host"
port: int = 5432
database: str = "my-db"
user: str = "my-user"
password: str = "my-password"
I try to use it in the following resource
.
@resource(required_resource_keys={"postgress_config"})
def process_repository(context: InitResourceContext) -> ProcessRepository:
"""Resource to provide the process repository to all the ops"""
if context.log is not None:
<http://context.log.info|context.log.info>("Creating the Lynceus Platform repository...")
repository = SqlAlchemyProcessRepository.create(
host=context.resources.postgress_config.host,
port=context.resources.postgress_config.port,
user=context.resources.postgress_config.user,
password=context.resources.postgress_config.password,
database=context.resources.postgress_config.database,
)
if context.log is not None:
<http://context.log.info|context.log.info>("Process repository created.")
return repository
Adding to definitions
like this.
dagster_definitions = Definitions(
**(PipelineDefinitions.merge([generic_definitions, *project_definitions]).dict()),
resources={"postgress_config": PostgressConfig()},
)
But I get this error.
ImportError while loading conftest '/workspaces/lynceus-platform/test/conftest.py'.
test/conftest.py:51: in <module>
from pipeline.data.jobs.fill_bi_dashboard import Base as BIDashboardPredictionBase
pipeline/data/jobs/fill_bi_dashboard.py:433: in <module>
fill_bi_dashboard_job = fill_bi_dashboard_graph.to_job(
/usr/local/lib/python3.10/site-packages/dagster/_core/definitions/graph_definition.py:631: in to_job
return JobDefinition.dagster_internal_init(
/usr/local/lib/python3.10/site-packages/dagster/_core/definitions/job_definition.py:275: in dagster_internal_init
return JobDefinition(
/usr/local/lib/python3.10/site-packages/dagster/_core/definitions/job_definition.py:206: in __init__
self._required_resource_keys = self._get_required_resource_keys(was_provided_resources)
/usr/local/lib/python3.10/site-packages/dagster/_core/definitions/job_definition.py:467: in _get_required_resource_keys
get_transitive_required_resource_keys(required_keys, self.resource_defs)
/usr/local/lib/python3.10/site-packages/dagster/_core/execution/resources_init.py:388: in get_transitive_required_resource_keys
ensure_resource_deps_satisfiable(resource_dependencies)
/usr/local/lib/python3.10/site-packages/dagster/_core/execution/resources_init.py:106: in ensure_resource_deps_satisfiable
_helper(resource_key)
/usr/local/lib/python3.10/site-packages/dagster/_core/execution/resources_init.py:98: in _helper
raise DagsterInvariantViolationError(
E dagster._core.errors.DagsterInvariantViolationError: Resource with key 'postgress_config' required by resource with key 'process_repository', but not provided.
It seems that adding resources
to defintions
didn't work. Why?ben
05/22/2023, 3:07 PMpipeline/data/jobs/fill_bi_dashboard.py:433: in <module>
fill_bi_dashboard_job = fill_bi_dashboard_graph.to_job(
/usr/local/lib/python3.10/site-
Igor Shraga
05/22/2023, 3:29 PM@graph
def fill_bi_dashboard_graph(): # type:ignore
fill_bi_dashboard_op()
fill_bi_dashboard_job = fill_bi_dashboard_graph.to_job(
name="fill_bi_dashboard_job",
resource_defs={
"process_repository": process_repository,
"meta_model_repository": meta_model_repository,
"process_service": process_service,
"bi_dashboard_repository": bi_dashboard_repository,
},
config={
"ops": {
"fill_bi_dashboard_op": {
"config": {
"compute_sele
Obviously partial. Hope it's enough.ben
05/22/2023, 6:53 PMfill_bi_dashboard_job = fill_bi_dashboard_graph.to_job(
name="fill_bi_dashboard_job",
resource_defs={
"process_repository": process_repository,
"meta_model_repository": meta_model_repository,
"process_service": process_service,
"bi_dashboard_repository": bi_dashboard_repository,
"postgress_config": PostgressConfig(),
},
or move the resource defs from the job to the definitions:
fill_bi_dashboard_job = fill_bi_dashboard_graph.to_job(
name="fill_bi_dashboard_job",
config=...
)
dagster_definitions = Definitions(
**(PipelineDefinitions.merge([generic_definitions, *project_definitions]).dict()),
resources={
"process_repository": process_repository,
"meta_model_repository": meta_model_repository,
"process_service": process_service,
"bi_dashboard_repository": bi_dashboard_repository,
"postgress_config": PostgressConfig(),
},
)
either should fix this issueIgor Shraga
05/23/2023, 6:44 AMDefintions
resources from several different jobs, and now I get Error 1: Received unexpected config entry "process_repository" at the root. Expected: "{ io_manager?: { config?: Any } }".
. Is there any way around that?ben
05/23/2023, 1:27 PMto_job
snippet looks like now? This might be another bit of ordering trickiness