hey folks - <from this thread>, it seems that the...
# integration-dbt
r
hey folks - from this thread, it seems that the tables referenced in my
sources.yml
should be assigned a Dagster group according to the
name
in the file. this isn't working for me. my
sources.yml
looks like this:
Copy code
version: 1

sources:
  - name: bronze
    schema: bronze
    tables:
      - name: my_source_table_1
      - name: my_source_table_2
in the Dagit UI, they are not assigned a group. they do not come up at
<http://127.0.0.1:3000/locations/{repo> name}/asset-groups/bronze
the asset prefix is being assigned properly, but not the group. I'm not overriding
node_info_to_group_fn
anywhere. is there some other configuration that's required? thanks!
r
Do they show up in any group?
r
no - they come up as "Ungrouped Assets" at
<http://127.0.0.1:3000/locations/{repo> name}/assets
r
I might have misspoken in the original thread: we use the dbt’s fully qualified name, not just the name in the
sources.yml
. https://sourcegraph.com/github.com/dagster-io/dagster/-/blob/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py?L64-70 cc @owen if you know the correct interaction here. I think we should be using
schema
field here, but if the
sources.yml
isn’t located in the top level of the
models
folder, there might be some unexpected behaviors here?
r
the
schema
is being used as the asset prefix, I'm pretty sure. they come up under
<http://127.0.0.1:3000/assets/bronze/>
I changed
sources.yml
to
Copy code
version: 1

sources:
  - name: bronze
    schema: iron
    tables:
      - name: my_source_table_1
      - name: my_source_table_2
these are listed under
<http://127.0.0.1:3000/assets/iron/>
but remain ungrouped at
<http://127.0.0.1:3000/locations/warehouse/assets>
on my end,
sources.yml
is directly under
models
. @rex what do you mean by "dbt's fully qualified name"? I don't see these getting put into any groups
r
We’re looking into this for you. When we construct software-defined assets for you in our dbt integration, we’re inspecting the dbt manifest to do so. It has a representation of all the objects in your project. As part of this schema, we use the fqn (fully qualified name) that dbt produces for the object.
D 1
👍 1
r
great thank you @rex
@rex this worked, thank you! quick question ... right now, my
sources.yml
has all available source tables, many of which are not currently being used in DBT models. it's nice to keep them there so folks can see what's available. is there an easy way in Dagster to filter out which sources are referenced by the DBT models as I'm building the `SourceAsset`s?
r
Are you using a manifest to load your dbt assets? You could use a combination of the
manifest
and
sources.yml
to generate only the `SourceAsset`’s that are being used. In the manifest, there’s a
child_map
that you could make use of. For sources that are not currently being used, I’d imagine that it will be empty for that resource’s id.
r
no manifest .. using
load_assets_from_dbt_project
I think I can pull the sources off using
keys_by_input_name
and regex match based on the prefix of
source_{repo name}_bronze_*
just confirming - I have a solution working. thank you @rex & team!
🚶 1
m
Hi @Rob Sicurelli - I have this same issue which I just put down my list of priorities so I'm glad you asked this here 🙂 @rex Is it possible to get the schema (like with a model) to be attached to the asset for visibility in the dagit ui using this approach?
❤️ 1
OK got it - this method gives me what what I wanted :
Copy code
def make_asset_metadata_from_schema(schema: dict[str, Any]) -> dict[Literal["table_schema"], TableSchemaMetadataValue]:
    """Creates a table schema to show in dagit UI under the Metadata heading from a table source entry

    Args:
        schema : Dictionary of parsed yaml from a sources / model file

    Returns:
        dict[str, TableSchemaMetadataValue]: Returns a dictionary of the form {"table_schema": TableSchemaMetadataValue }
    """

    if "columns" not in schema:
        return {}

    columns = [
        TableColumn(
            name=column.get("name"),
            type=str(column.get("type") or "?"),
            description=column.get("description") or "No description provided.",
            constraints=TableColumnConstraints(
                nullable=True if "not_null" not in column.get("tests", []) else False,
                unique=True if "unique" in column.get("tests", []) else False,
                other=[t for t in column.get("tests", []) if t not in ["not_null", "unique"]],
            ),
        )
        for column in schema["columns"]
    ]

    table_schema = TableSchema(
        columns=columns,
        constraints=TableConstraints(other=schema.get("constraints", [])),
    )

    return {"table_schema": TableSchemaMetadataValue(schema=table_schema)}
Just threw together so it's not battle tested yet, but gets me what I want and it might be useful to someone else !
For future me / anyone else interested - this doesn't handle
{{ doc("...") }}
blocks as it reads directly from the raw yaml file. I had to add more supporting functions to merge the manifest data with the source data in order to make this work but it does work nicely now.