Rasmus Bonnevie
01/18/2023, 4:41 PMruntime_metadata_fn
receives - for some reason it doesn't get the node_info
?owen
01/18/2023, 11:15 PMdbt compile
is executed, or is it not known until dbt run
?
If the schema is known at compile time, then I think the best course of action would be to make the asset key that's generated for each model dependent on the schema. That would look like
def get_node_asset_key(node_info):
return AssetKey([node_info["schema"], node_info["name"]])
load_assets_from_dbt_manifest(
...,
node_info_to_asset_key=get_node_asset_key,
)
Then, in the IOManager, you could have
schema_name = context.asset_key.path[-2]
table_name = context.asset_key.path[-1]
and go from there.Rasmus Bonnevie
01/19/2023, 9:10 AMcompile
, I'm not doing anything too obscure. I follow the DBT guidelines where I have separate development and production schemas encoded in profiles.yml
, i.e. dev_rasmus
for me, and then I use +schema: foo
for some directories in dbt_project.yml
to add some structure, which gets compiled to a schema of dev_rasmus_foo
for those models.
I'd rather not have my asset keys depend on my user schema and it's also a bit inconvenient to have to rebuild the schema with all suffixes manually for each table, if I'm e.g. generating in a loop. My user schema even depends on an environment variable, so I'd have to parse that too if I were to e.g. read the profiles.yml
in as a resource/asset.runtime_metadata_fn
is actually the necessary metadata/configuration, but even when I add e.g. database and schema to the DBT asset as metadata, it is not accessible in the IO input_manager
I've added, at least not on the upstream_output.metadata
which is empty.owen
01/19/2023, 10:52 PMinput_context.metadata
field is referencing metadata that is baked into the definition of the output, rather than anything that's generated at runtime (the runtime metadata is more for bookkeeping purposes).
as for a workaround, I'd recommend the (somewhat painful) approach of replicating the schema name logic when configuring your IOManager.
right now, the asset key for each model will depend on the configured schema (but not the base schema). So if you have model_a
with +schema: foo
applied to it, it should get the asset key ["foo", "model_a"]
, whereas model_b
with no schema configured for it will just be ["model_b"]
.
you'll then want to have an IOManager that can be configured with a base_schema
property, i.e.
def MyIOManager(IOManager):
def __init__(self, base_schema: str):
self.base_schema = base_schema
def _get_schema(self, context):
if len(context.asset_key.path) == 2:
return f"{self.base_schema}_{context.asset_key.path[0]}"
return self.base_schema
...
@io_manager(config_schema={"base_schema": str})
def my_io_manager(context):
return MyIOManager(base_schema=context.resource_config["base_schema"])
As you note, this would require you to duplicate some config between profiles.yml
and dagster, i.e.
dev_io_manager = my_io_manager.configured({"base_schema": os.getenv("...")})
prod_io_manager = my_io_manager.configured({"base_schema": "something"})
but it should at least unblock youRasmus Bonnevie
01/20/2023, 7:58 AMprofiles.yml
so that you can access the schema? But the nicest would still be to be able to read it from the dbt table asset directly.Rob Martorano
02/09/2023, 9:57 PM