martin o leary
05/18/2023, 5:17 PMruntime_metadata_fn
via the OpExecutionContext
once it is part of the same Definitions
class ?
When I log out the context.resources
property from within my runtime_metadata_fn
this is what I see:
_ScopedResources(io_manager=<dagster._core.storage.fs_io_manager.PickledObjectFilesystemIOManager object at 0x7fccc8ddb700>, dbt=<dagster_dbt.cli.resources.DbtCliResource object at 0x7fccc8f68880>)
I have a few more resources provided to my Definitions
which don't show up here.
The resource I want to access is a database client so that I can add number of rows to the asset metadata for each model. The client is passed to the Definitions
as a @configured
resource if that makes a difference?jamie
05/18/2023, 5:40 PMrequired_resource_key
?martin o leary
05/18/2023, 5:53 PMruntime_metadata_fn
is passed to load_assets_from_dbt_manifest
and I don't see required_resource_key
as an argument.
Here is the code I am using (some stuff removed for conciseness) :
@resource(
config_schema={
"PG_DATABASE": Field(StringSource, description="The name of the PostgreSQL database to connect to"),
"PG_HOST": Field(StringSource, description="PostgreSQL database host to connect to"),
"PG_PORT": Field(StringSource, description="PostgreSQL port", default_value="5432"),
"PG_USERNAME": Field(
StringSource,
description="The username to use for connecting to the database. Provided via environment variable.",
),
"PG_PASSWORD": Field(
StringSource, description="The password associated with the username. Provided via environment variable."
),
},
description="""
A database client,\n
This client can be used as a standalone resource for making low level
calls to a database or it can be used as an input to an IO manager
""",
)
def db_client(context: InitResourceContext) -> ResourceDefinition:
"""Generates a db client
Returns:
An instance of DB
"""
config = context.resource_config
return DB(**config)
@configured(
db_client,
config_schema={
"PG_DATABASE": Field(StringSource, description="The database to use for IO", default_value="db_gsa_ops"),
"PG_HOST": Field(
StringSource,
description="The database host",
default_value="<http://gsa-db.caqhmxommxbl.eu-west-1.rds.amazonaws.com|gsa-db.caqhmxommxbl.eu-west-1.rds.amazonaws.com>",
),
"PG_PORT": Field(StringSource, description="Port for the database", default_value="5432"),
},
)
def configured_db_client(config: dict[str, str]) -> dict[str, str | dict[str, str]]:
return {
"PG_DATABASE": config["PG_DATABASE"] or {"env": "PG_DATABASE"},
"PG_HOST": config["PG_HOST"] or {"env": "PG_HOST"},
"PG_PORT": config["PG_PORT"] or {"env": "PG_PORT"},
"PG_USERNAME": {"env": "PG_USER"},
"PG_PASSWORD": {"env": "PG_PASSWORD"},
}
def get_asset_meta(ctx: OpExecutionContext, node_info: dict[str, Any]) -> dict[str, Any]:
relation_name = node_info.get("relation_name")
ctx.log.debug(ctx.resources) <-- doesn't containe db_client
# GRAB DB_CLIENT AND QUERY TABLE FOR N_ROWS
...
return {"n_rows": 0 }
dbt_assets = load_assets_from_dbt_manifest(
manifest_json=manifest_json,
key_prefix="dbt_assets",
select="markets.*",
display_raw_sql=True,
runtime_metadata_fn=get_asset_meta,
)
resources = {
"dbt": configured_dbt_resource_w_vars,
"db_client": configured_db_client,
"pandas_io_manager": postgres_df_table_io_manager,
"io_manager": fs_io_manager,
}
defs = Definitions(
assets=[*dbt_assets],
resources=resources,
)
db_client
within get_asset_meta
jamie
05/19/2023, 8:56 PMmartin o leary
05/19/2023, 9:00 PM