Hey all. Does anyone know how to save the assets l...
# ask-community
h
Hey all. Does anyone know how to save the assets loaded from DBT to a .csv file? I’m using software defined assets and the
AssetGroup()
method to gather all of my tables, as described in the examples in this blog, and can now see the assets in the dagit UI. I would like to use these assets in an @op to write them as a .csv file and then perform this as a @job Better yet, I can write a sensor to listen to the asset key that is generated by this job. But how do I actually pull the postgresql table into dagster from the metadata surrounding the asset? Are there any relevant docs available because I could not find anything relevant under asset materialisation, software defined assets, advanced materialization, or anywhere on the dagster website. Thanks in advance! (See comments for a code snippet)
Copy code
dbt_assets = load_assets_from_dbt_project(DBT_PROJECT_DIR, select="+sector_cls_train_set")
sector_cls_train_test = AssetGroup(
    dbt_assets,
    resource_defs={
        "dbt": dbt_cli_resource.configured(
            {
                "project_dir": DBT_PROJECT_DIR,
                "profiles_dir": DBT_PROFILE_DIR,
                "vars": {"test_split": 0.2},
            }
        ),
    },
)
j
@owen
o
hi @Harpal ! Let me know if I'm misunderstanding your question, but is the basic end result that you're trying to achieve: 1) run dbt to update a bunch of tables 2) copy the contents of those updated tables into .csv files 3) do some sort of further processing on those .csv files? I think the key abstraction that you need here is the IOManager. The IOManager defines how dagster will store in-memory outputs from assets/ops, as well as how dagster will load the those stored values as inputs to downstream assets/ops. For things produced by dbt, by default, nothing additional will need to be stored by dagster for each output (because the data is already stored in the database). As for loading, there are a bunch of different ways to load the contents of a database table as an in-memory object. We have a very lightweight example in the modern_data_stack_assets example. This implements the protocol of "to store dbt output, do nothing, to load dbt as input, fire query against the configured database and load as a dataframe". For your usecase, depending on your needs, you might implement that same protocol, or something like "to store dbt output, write table to .csv file, to load dbt as input, load that .csv file (as a dataframe?)"
there's an io_manager_key parameter to the load_assets_from_dbt_project function, which you can map to an io manager that you define in your resource_defs on your asset group
I also don't see an immediate need to break this out into a group of assets + a separate job. my first instinct would be to create python assets (instead of ops) that depend on the dbt assets (there's an example of that in the repo i linked above)
h
Hiya @owen Thanks again for your deeply insightful responses! Allow me to clarify my usecase with a bit of useful context. • I have lots of tables in a postgres database hosted by google cloud sql. • I have a DBT DAG that transforms that data • Each of the tables created in the multi-stage tranformation (all written in SQL/DBT) are shown as assets thanks to the code snippet above. • I want to be able to take to take these materialised tables that simultaneously exist in my postgres db and displayed in the DagitUI (take the tables by their asset key) and store these tables s .csv files in google cloud storage (GCS). • Then use those .csv files (in GCS) in downstream training jobs I’ve done some snooping around and have discovered that gcloud CLI has a handy one-liner solution to move a postgres table from cloud sql to GCS as a .csv file.
gcloud sql export csv moonfire-01 <gs://moonfire-training-data/sector_cls/test.csv> --database=moonfire --query="SELECT * FROM public.sector_cls_test_set"
I can’t say I know much about IOManagers. But i’ve managed to carve together a proposed workaround. I SHOULD be able to listen for the assets outputted from the dagster-dbt job with a dagster @sensor and then trigger a @job that runs the handy gcloud one-liner. This will essentially move the relevant tables from postgres to GCS. What I don’t know is: • Whether this proposed workaround practical or would I be missing out on some awesome dagster optimisations? • How feasible this solution is given the experimental status of some of the methods. I hope that helps clarify things! Looking forward to your thoughts and suggestions dagsir
o
ah I see! the solution you're describing would work, but I think it would be more ergonomic in the long run to write everything as assets and keep things in a single
AssetGroup
. The way I would conceptualize this is that each .csv file on GCS is in fact its own separate asset (distinct from the table in your database). You could manually specify these csv assets, or you could automatically generate one for each asset created in your dbt project. I threw together some code that should basically do that (but haven't tested it, so buyer beware 😛) :
Copy code
dbt_assets = load_assets_from_dbt_project(...)

def csv_assets_for_dbt_assets(dbt_assets):

    ret = []

    for asset_key in dbt_assets[0].asset_keys:

        table_name = asset_key.path[-1]
        @asset(
            name=table_name,
            namespace="gcs",
            non_argument_deps={asset_key},
            compute_kind="gcs"
        )
        def _asset():
            # run one-liner for this table name

        ret.append(_asset)

    return ret

csv_assets = csv_assets_for_dbt_assets(dbt_assets)

asset_group = AssetGroup(
    dbt_assets + csv_assets, ...
)
Your downstream training jobs (depending on how they're implemented) might also make sense to be written as a sequence of assets that depends on these csv assets, but it's possible that it makes more sense to keep those in job-style syntax.
Anyway, the main benefit of this sort of solution over your proposed solution is that the csv files will be linked (both in the metadata that Dagster tracks and in the UI) to the dbt models that they depend on, which makes it easier to reason about what's going on (rather than depending on an implicit link defined by an asset sensor). The csv files will also be treated as a separate, consumable entity (for potential downstream assets), and it will be much easier to notice issues of the form "dbt ran successfully, but the csv conversion failed".
h
@owen OK I’m convinced. Your solution is definitely the way forward!
🙌 1
I’ve adapted your pseudo code and I think Dagster in unhappy with the
ret
variable returning a list of
_asset
objects.
Copy code
2022-03-23 21:18:18 +0000 - dagster.daemon.SensorDaemon - WARNING - Could not load location repo.py to check for sensors due to the following error: dagster.check.CheckError: Member of list mismatches type. Expected <class 'dagster.core.asset_defs.asset.AssetsDefinition'>. Got [<dagster.core.asset_defs.asset.AssetsDefinition object at 0x2a0014880>, <dagster.core.asset_defs.asset.AssetsDefinition object at 0x2a0014eb0>, <dagster.core.asset_defs.asset.AssetsDefinition object at 0x16f0e2220>, <dagster.core.asset_defs.asset.AssetsDefinition object at 0x16f0e23d0>, <dagster.core.asset_defs.asset.AssetsDefinition object at 0x16f0e2580>, <dagster.core.asset_defs.asset.AssetsDefinition object at 0x16f0e2730>, <dagster.core.asset_defs.asset.AssetsDefinition object at 0x16f0e28e0>, <dagster.core.asset_defs.asset.AssetsDefinition object at 0x16f0e2a90>, <dagster.core.asset_defs.asset.AssetsDefinition object at 0x16f0e2c40>] of type <class 'list'>.
The stack trace seems to agree. Throwing the following error.
Copy code
Stack Trace:
  File "/Users/hdot/.pyenv/versions/3.9.8/envs/machine-learning-3.9.8/lib/python3.9/site-packages/dagster/grpc/server.py", line 212, in __init__
    self._loaded_repositories = LoadedRepositories(
  File "/Users/hdot/.pyenv/versions/3.9.8/envs/machine-learning-3.9.8/lib/python3.9/site-packages/dagster/grpc/server.py", line 97, in __init__
    loadable_targets = get_loadable_targets(
  File "/Users/hdot/.pyenv/versions/3.9.8/envs/machine-learning-3.9.8/lib/python3.9/site-packages/dagster/grpc/utils.py", line 27, in get_loadable_targets
    else loadable_targets_from_python_file(python_file, working_directory)
  File "/Users/hdot/.pyenv/versions/3.9.8/envs/machine-learning-3.9.8/lib/python3.9/site-packages/dagster/core/workspace/autodiscovery.py", line 18, in loadable_targets_from_python_file
    loaded_module = load_python_file(python_file, working_directory)
  File "/Users/hdot/.pyenv/versions/3.9.8/envs/machine-learning-3.9.8/lib/python3.9/site-packages/dagster/core/code_pointer.py", line 79, in load_python_file
    return import_module_from_path(module_name, python_file)
  File "/Users/hdot/.pyenv/versions/3.9.8/envs/machine-learning-3.9.8/lib/python3.9/site-packages/dagster/seven/__init__.py", line 47, in import_module_from_path
    spec.loader.exec_module(module)
  File "<frozen importlib._bootstrap_external>", line 850, in exec_module
  File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
  File "moonfire_dagster/repo.py", line 13, in <module>
    from moonfire_dagster.sector_classification import sector_cls_all_dbt_assets
  File "/Users/hdot/vs_code/machine-learning/moonfire_dagster/sector_classification/sector_cls_all_dbt_assets.py", line 34, in <module>
    asset_group = AssetGroup(
  File "/Users/hdot/.pyenv/versions/3.9.8/envs/machine-learning-3.9.8/lib/python3.9/site-packages/dagster/core/asset_defs/asset_group.py", line 90, in __new__
    check.list_param(assets, "assets", of_type=AssetsDefinition)
  File "/Users/hdot/.pyenv/versions/3.9.8/envs/machine-learning-3.9.8/lib/python3.9/site-packages/dagster/check/__init__.py", line 368, in list_param
    return _check_list_items(obj_list, of_type)
  File "/Users/hdot/.pyenv/versions/3.9.8/envs/machine-learning-3.9.8/lib/python3.9/site-packages/dagster/check/__init__.py", line 449, in _check_list_items
    raise CheckError(
Ooops! Almost forgot the adapted code:
Copy code
import subprocess

from dagster import AssetGroup, asset
from dagster_dbt import dbt_cli_resource, load_assets_from_dbt_project
from moonfire_dagster.sector_classification.gcloud_setup import setup_gcloud

DBT_PROJECT_DIR = "./dbt"
DBT_PROFILE_DIR = "./dbt/config"

setup_gcloud()

dbt_assets = load_assets_from_dbt_project(DBT_PROJECT_DIR, select="+sector_cls_train_set")


def csv_assets_for_dbt_assets(dbt_assets):

    ret = []

    for asset_key in dbt_assets[0].asset_keys:

        table_name = asset_key.path[-1]

        @asset(name=table_name, namespace="gcs", non_argument_deps={asset_key}, compute_kind="gcs")
        def _asset():
            gcl_export_table_comm = f'gcloud sql export csv moonfire-01 <gs://moonfire-training-data/sector_cls/automatic/{table_name}.csv> --database=moonfire --query="SELECT * FROM public.{table_name}"'

        ret.append(_asset)

    return ret


csv_assets = csv_assets_for_dbt_assets(dbt_assets)

asset_group = AssetGroup(
    [dbt_assets + csv_assets],
    resource_defs={
        "dbt": dbt_cli_resource.configured(
            {
                "project_dir": DBT_PROJECT_DIR,
                "profiles_dir": DBT_PROFILE_DIR,
                "vars": {"test_split": 0.2},
            }
        )
    },
)
o
ah yeah it should just be
dbt_assets + csv_assets
not
[dbt_assets + csv_assets]
h
Ahhh my bad! I was following the example in these docs
o
ah yeah there's some trickiness where some functions return lists of AssetsDefinitions and regular python assets are just singular AssetsDefinition objects. Leads to some inconsistent-looking code / confusion
h
Gotcha! I’m almost there. Just need to figure out what’s causing this
DagsterInvalidDefinitionError
. Sorry but the error message doesn’t give me much to go by.
Copy code
2022-03-23 21:30:34 +0000 - dagster.daemon.SensorDaemon - WARNING - Could not load location repo.py to check for sensors due to the following error: dagster.core.errors.DagsterInvalidDefinitionError: Bad return value from repository construction function: all elements of list must be of type JobDefinition, GraphDefinition, PipelineDefinition, PartitionSetDefinition, ScheduleDefinition, or SensorDefinition. Got value of type <class 'module'> at index 25.

Stack Trace:
  File "/Users/hdot/.pyenv/versions/3.9.8/envs/machine-learning-3.9.8/lib/python3.9/site-packages/dagster/grpc/server.py", line 212, in __init__
    self._loaded_repositories = LoadedRepositories(
  File "/Users/hdot/.pyenv/versions/3.9.8/envs/machine-learning-3.9.8/lib/python3.9/site-packages/dagster/grpc/server.py", line 97, in __init__
    loadable_targets = get_loadable_targets(
  File "/Users/hdot/.pyenv/versions/3.9.8/envs/machine-learning-3.9.8/lib/python3.9/site-packages/dagster/grpc/utils.py", line 27, in get_loadable_targets
    else loadable_targets_from_python_file(python_file, working_directory)
  File "/Users/hdot/.pyenv/versions/3.9.8/envs/machine-learning-3.9.8/lib/python3.9/site-packages/dagster/core/workspace/autodiscovery.py", line 18, in loadable_targets_from_python_file
    loaded_module = load_python_file(python_file, working_directory)
  File "/Users/hdot/.pyenv/versions/3.9.8/envs/machine-learning-3.9.8/lib/python3.9/site-packages/dagster/core/code_pointer.py", line 79, in load_python_file
    return import_module_from_path(module_name, python_file)
  File "/Users/hdot/.pyenv/versions/3.9.8/envs/machine-learning-3.9.8/lib/python3.9/site-packages/dagster/seven/__init__.py", line 47, in import_module_from_path
    spec.loader.exec_module(module)
  File "<frozen importlib._bootstrap_external>", line 850, in exec_module
  File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
  File "moonfire_dagster/repo.py", line 32, in <module>
    def repo():
  File "/Users/hdot/.pyenv/versions/3.9.8/envs/machine-learning-3.9.8/lib/python3.9/site-packages/dagster/core/definitions/decorators/repository.py", line 242, in repository
    return _Repository()(name)
  File "/Users/hdot/.pyenv/versions/3.9.8/envs/machine-learning-3.9.8/lib/python3.9/site-packages/dagster/core/definitions/decorators/repository.py", line 65, in __call__
    raise DagsterInvalidDefinitionError(
It seems to be upset with repo.py?
Copy code
File "moonfire_dagster/repo.py", line 32, in <module>
    def repo():
o
do you mind sharing what your repo function looks like? I agree that the error message isn't all that illuminating
seems like you might have an
import some_local_filename
at the top of repo.py and then inside repo.py() you have `some_local_filename`as a member of that list
h
Oooooooooooooh it works!
🎉 2