Harpal
03/22/2022, 9:07 AMAssetGroup()
method to gather all of my tables, as described in the examples in this blog, and can now see the assets in the dagit UI. I would like to use these assets in an @op to write them as a .csv file and then perform this as a @job
Better yet, I can write a sensor to listen to the asset key that is generated by this job. But how do I actually pull the postgresql table into dagster from the metadata surrounding the asset?
Are there any relevant docs available because I could not find anything relevant under asset materialisation, software defined assets, advanced materialization, or anywhere on the dagster website.
Thanks in advance!
(See comments for a code snippet)dbt_assets = load_assets_from_dbt_project(DBT_PROJECT_DIR, select="+sector_cls_train_set")
sector_cls_train_test = AssetGroup(
dbt_assets,
resource_defs={
"dbt": dbt_cli_resource.configured(
{
"project_dir": DBT_PROJECT_DIR,
"profiles_dir": DBT_PROFILE_DIR,
"vars": {"test_split": 0.2},
}
),
},
)
jamie
03/22/2022, 1:48 PMowen
03/22/2022, 4:23 PMHarpal
03/22/2022, 6:34 PMgcloud sql export csv moonfire-01 <gs://moonfire-training-data/sector_cls/test.csv> --database=moonfire --query="SELECT * FROM public.sector_cls_test_set"
I can’t say I know much about IOManagers. But i’ve managed to carve together a proposed workaround.
I SHOULD be able to listen for the assets outputted from the dagster-dbt job with a dagster @sensor and then trigger a @job that runs the handy gcloud one-liner.
This will essentially move the relevant tables from postgres to GCS.
What I don’t know is:
• Whether this proposed workaround practical or would I be missing out on some awesome dagster optimisations?
• How feasible this solution is given the experimental status of some of the methods.
I hope that helps clarify things! Looking forward to your thoughts and suggestions dagsirowen
03/22/2022, 9:19 PMAssetGroup
. The way I would conceptualize this is that each .csv file on GCS is in fact its own separate asset (distinct from the table in your database). You could manually specify these csv assets, or you could automatically generate one for each asset created in your dbt project. I threw together some code that should basically do that (but haven't tested it, so buyer beware 😛) :
dbt_assets = load_assets_from_dbt_project(...)
def csv_assets_for_dbt_assets(dbt_assets):
ret = []
for asset_key in dbt_assets[0].asset_keys:
table_name = asset_key.path[-1]
@asset(
name=table_name,
namespace="gcs",
non_argument_deps={asset_key},
compute_kind="gcs"
)
def _asset():
# run one-liner for this table name
ret.append(_asset)
return ret
csv_assets = csv_assets_for_dbt_assets(dbt_assets)
asset_group = AssetGroup(
dbt_assets + csv_assets, ...
)
Your downstream training jobs (depending on how they're implemented) might also make sense to be written as a sequence of assets that depends on these csv assets, but it's possible that it makes more sense to keep those in job-style syntax.Harpal
03/23/2022, 9:17 PMret
variable returning a list of _asset
objects.
2022-03-23 21:18:18 +0000 - dagster.daemon.SensorDaemon - WARNING - Could not load location repo.py to check for sensors due to the following error: dagster.check.CheckError: Member of list mismatches type. Expected <class 'dagster.core.asset_defs.asset.AssetsDefinition'>. Got [<dagster.core.asset_defs.asset.AssetsDefinition object at 0x2a0014880>, <dagster.core.asset_defs.asset.AssetsDefinition object at 0x2a0014eb0>, <dagster.core.asset_defs.asset.AssetsDefinition object at 0x16f0e2220>, <dagster.core.asset_defs.asset.AssetsDefinition object at 0x16f0e23d0>, <dagster.core.asset_defs.asset.AssetsDefinition object at 0x16f0e2580>, <dagster.core.asset_defs.asset.AssetsDefinition object at 0x16f0e2730>, <dagster.core.asset_defs.asset.AssetsDefinition object at 0x16f0e28e0>, <dagster.core.asset_defs.asset.AssetsDefinition object at 0x16f0e2a90>, <dagster.core.asset_defs.asset.AssetsDefinition object at 0x16f0e2c40>] of type <class 'list'>.
Stack Trace:
File "/Users/hdot/.pyenv/versions/3.9.8/envs/machine-learning-3.9.8/lib/python3.9/site-packages/dagster/grpc/server.py", line 212, in __init__
self._loaded_repositories = LoadedRepositories(
File "/Users/hdot/.pyenv/versions/3.9.8/envs/machine-learning-3.9.8/lib/python3.9/site-packages/dagster/grpc/server.py", line 97, in __init__
loadable_targets = get_loadable_targets(
File "/Users/hdot/.pyenv/versions/3.9.8/envs/machine-learning-3.9.8/lib/python3.9/site-packages/dagster/grpc/utils.py", line 27, in get_loadable_targets
else loadable_targets_from_python_file(python_file, working_directory)
File "/Users/hdot/.pyenv/versions/3.9.8/envs/machine-learning-3.9.8/lib/python3.9/site-packages/dagster/core/workspace/autodiscovery.py", line 18, in loadable_targets_from_python_file
loaded_module = load_python_file(python_file, working_directory)
File "/Users/hdot/.pyenv/versions/3.9.8/envs/machine-learning-3.9.8/lib/python3.9/site-packages/dagster/core/code_pointer.py", line 79, in load_python_file
return import_module_from_path(module_name, python_file)
File "/Users/hdot/.pyenv/versions/3.9.8/envs/machine-learning-3.9.8/lib/python3.9/site-packages/dagster/seven/__init__.py", line 47, in import_module_from_path
spec.loader.exec_module(module)
File "<frozen importlib._bootstrap_external>", line 850, in exec_module
File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
File "moonfire_dagster/repo.py", line 13, in <module>
from moonfire_dagster.sector_classification import sector_cls_all_dbt_assets
File "/Users/hdot/vs_code/machine-learning/moonfire_dagster/sector_classification/sector_cls_all_dbt_assets.py", line 34, in <module>
asset_group = AssetGroup(
File "/Users/hdot/.pyenv/versions/3.9.8/envs/machine-learning-3.9.8/lib/python3.9/site-packages/dagster/core/asset_defs/asset_group.py", line 90, in __new__
check.list_param(assets, "assets", of_type=AssetsDefinition)
File "/Users/hdot/.pyenv/versions/3.9.8/envs/machine-learning-3.9.8/lib/python3.9/site-packages/dagster/check/__init__.py", line 368, in list_param
return _check_list_items(obj_list, of_type)
File "/Users/hdot/.pyenv/versions/3.9.8/envs/machine-learning-3.9.8/lib/python3.9/site-packages/dagster/check/__init__.py", line 449, in _check_list_items
raise CheckError(
import subprocess
from dagster import AssetGroup, asset
from dagster_dbt import dbt_cli_resource, load_assets_from_dbt_project
from moonfire_dagster.sector_classification.gcloud_setup import setup_gcloud
DBT_PROJECT_DIR = "./dbt"
DBT_PROFILE_DIR = "./dbt/config"
setup_gcloud()
dbt_assets = load_assets_from_dbt_project(DBT_PROJECT_DIR, select="+sector_cls_train_set")
def csv_assets_for_dbt_assets(dbt_assets):
ret = []
for asset_key in dbt_assets[0].asset_keys:
table_name = asset_key.path[-1]
@asset(name=table_name, namespace="gcs", non_argument_deps={asset_key}, compute_kind="gcs")
def _asset():
gcl_export_table_comm = f'gcloud sql export csv moonfire-01 <gs://moonfire-training-data/sector_cls/automatic/{table_name}.csv> --database=moonfire --query="SELECT * FROM public.{table_name}"'
ret.append(_asset)
return ret
csv_assets = csv_assets_for_dbt_assets(dbt_assets)
asset_group = AssetGroup(
[dbt_assets + csv_assets],
resource_defs={
"dbt": dbt_cli_resource.configured(
{
"project_dir": DBT_PROJECT_DIR,
"profiles_dir": DBT_PROFILE_DIR,
"vars": {"test_split": 0.2},
}
)
},
)
owen
03/23/2022, 9:25 PMdbt_assets + csv_assets
not [dbt_assets + csv_assets]
Harpal
03/23/2022, 9:25 PMowen
03/23/2022, 9:27 PMHarpal
03/23/2022, 9:31 PMDagsterInvalidDefinitionError
.
Sorry but the error message doesn’t give me much to go by.2022-03-23 21:30:34 +0000 - dagster.daemon.SensorDaemon - WARNING - Could not load location repo.py to check for sensors due to the following error: dagster.core.errors.DagsterInvalidDefinitionError: Bad return value from repository construction function: all elements of list must be of type JobDefinition, GraphDefinition, PipelineDefinition, PartitionSetDefinition, ScheduleDefinition, or SensorDefinition. Got value of type <class 'module'> at index 25.
Stack Trace:
File "/Users/hdot/.pyenv/versions/3.9.8/envs/machine-learning-3.9.8/lib/python3.9/site-packages/dagster/grpc/server.py", line 212, in __init__
self._loaded_repositories = LoadedRepositories(
File "/Users/hdot/.pyenv/versions/3.9.8/envs/machine-learning-3.9.8/lib/python3.9/site-packages/dagster/grpc/server.py", line 97, in __init__
loadable_targets = get_loadable_targets(
File "/Users/hdot/.pyenv/versions/3.9.8/envs/machine-learning-3.9.8/lib/python3.9/site-packages/dagster/grpc/utils.py", line 27, in get_loadable_targets
else loadable_targets_from_python_file(python_file, working_directory)
File "/Users/hdot/.pyenv/versions/3.9.8/envs/machine-learning-3.9.8/lib/python3.9/site-packages/dagster/core/workspace/autodiscovery.py", line 18, in loadable_targets_from_python_file
loaded_module = load_python_file(python_file, working_directory)
File "/Users/hdot/.pyenv/versions/3.9.8/envs/machine-learning-3.9.8/lib/python3.9/site-packages/dagster/core/code_pointer.py", line 79, in load_python_file
return import_module_from_path(module_name, python_file)
File "/Users/hdot/.pyenv/versions/3.9.8/envs/machine-learning-3.9.8/lib/python3.9/site-packages/dagster/seven/__init__.py", line 47, in import_module_from_path
spec.loader.exec_module(module)
File "<frozen importlib._bootstrap_external>", line 850, in exec_module
File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
File "moonfire_dagster/repo.py", line 32, in <module>
def repo():
File "/Users/hdot/.pyenv/versions/3.9.8/envs/machine-learning-3.9.8/lib/python3.9/site-packages/dagster/core/definitions/decorators/repository.py", line 242, in repository
return _Repository()(name)
File "/Users/hdot/.pyenv/versions/3.9.8/envs/machine-learning-3.9.8/lib/python3.9/site-packages/dagster/core/definitions/decorators/repository.py", line 65, in __call__
raise DagsterInvalidDefinitionError(
File "moonfire_dagster/repo.py", line 32, in <module>
def repo():
owen
03/23/2022, 9:34 PMimport some_local_filename
at the top of repo.py and then inside repo.py() you have `some_local_filename`as a member of that listHarpal
03/23/2022, 10:02 PM