Harpal
03/28/2022, 10:32 AM@asset
s with the MultiprocessExecutor and instead wait for one operation to finish before launching the next?
I’m running a for loop over my asset_keys and uploading files to them to GCS with the synchronous gcloud sql export csv ...
command. But because of the synchronous nature of the command I end up uploading the first asset while all others fail silently 😞
ANY suggestions on how to make this implementation work in series/synchronously would be much appreciated.Daniel Mosesson
03/28/2022, 10:34 AMHarpal
03/28/2022, 10:40 AMDaniel Mosesson
03/28/2022, 10:43 AM@job(executor_def=in_process_executor)
https://docs.dagster.io/_apidocs/execution#dagster.in_process_executorHarpal
03/28/2022, 10:44 AMimport subprocess
from subprocess import Popen
from dagster import AssetGroup, asset
from dagster_dbt import dbt_cli_resource, load_assets_from_dbt_project
from moonfire_dagster.sector_classification.gcloud_setup import setup_gcloud
DBT_PROJECT_DIR = "./dbt"
DBT_PROFILE_DIR = "./dbt/config"
dbt_assets = load_assets_from_dbt_project(DBT_PROJECT_DIR, select="+sector_cls_train_set")
def csv_assets_for_dbt_assets(dbt_assets):
ret = []
for asset_key in dbt_assets[0].asset_keys:
table_name = asset_key.path[-1]
@asset(name=table_name, namespace="gcs", non_argument_deps={asset_key}, compute_kind="gcs")
def _asset():
setup_gcloud() # need this to make sure auth works
# Same error
# ERROR: (gcloud.sql.export.csv) HTTPError 409: Operation failed because another operation was already in progress. Try your request after the current operation is complete.
gcl_export_table_comm = f'gcloud sql export csv moonfire-01 <gs://moonfire-training-data/sector_cls/automatic/{table_name}.csv> --database=moonfire --query="SELECT * FROM public.{table_name}"'
subprocess.call(gcl_export_table_comm, shell=True)
ret.append(_asset)
return ret
csv_assets = csv_assets_for_dbt_assets(dbt_assets)
sector_cls_all_dbt_train_test = AssetGroup(
dbt_assets + csv_assets,
resource_defs={
"dbt": dbt_cli_resource.configured(
{
"project_dir": DBT_PROJECT_DIR,
"profiles_dir": DBT_PROFILE_DIR,
"vars": {"test_split": 0.2},
}
)
},
).build_job("sector_cls_ALL_dbt_train_test")
# Error Logs
Exporting Cloud SQL instance...
ERROR: (gcloud.sql.export.csv) HTTPError 409: Operation failed because another operation was already in progress. Try your request after the current operation is complete.
2022-03-28 11:42:23 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2910 - gcs__sector_cls_train_set - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2022-03-28 11:42:23 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - gcs__sector_cls_train_set - Writing file at: /Users/hdot/vs_code/machine-learning/build/dagster_home/storage/a44b0ba2-4d15-460a-aa31-e00a87a51da1/gcs__sector_cls_train_set/result
2022-03-28 11:42:23 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2910 - gcs__sector_cls_train_set - ASSET_MATERIALIZATION - Materialized value gcs sector_cls_train_set.
2022-03-28 11:42:23 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2910 - gcs__sector_cls_train_set - HANDLED_OUTPUT - Handled output "result" using IO manager "io_manager"
2022-03-28 11:42:23 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2910 - gcs__sector_cls_train_set - STEP_SUCCESS - Finished execution of step "gcs__sector_cls_train_set" in 3.63s.
ERROR: (gcloud.sql.export.csv) HTTPError 409: Operation failed because another operation was already in progress. Try your request after the current operation is complete.
ERROR: (gcloud.sql.export.csv) HTTPError 409: Operation failed because another operation was already in progress. Try your request after the current operation is complete.
ERROR: (gcloud.sql.export.csv) HTTPError 409: Operation failed because another operation was already in progress. Try your request after the current operation is complete.
2022-03-28 11:42:23 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2911 - gcs__sector_cls_test_set - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
ERROR: (gcloud.sql.export.csv) HTTPError 409: Operation failed because another operation was already in progress. Try your request after the current operation is complete.
ERROR: (gcloud.sql.export.csv) HTTPError 409: Operation failed because another operation was already in progress. Try your request after the current operation is complete.
2022-03-28 11:42:23 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - gcs__sector_cls_test_set - Writing file at: /Users/hdot/vs_code/machine-learning/build/dagster_home/storage/a44b0ba2-4d15-460a-aa31-e00a87a51da1/gcs__sector_cls_test_set/result
ERROR: (gcloud.sql.export.csv) HTTPError 409: Operation failed because another operation was already in progress. Try your request after the current operation is complete.
2022-03-28 11:42:23 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2911 - gcs__sector_cls_test_set - ASSET_MATERIALIZATION - Materialized value gcs sector_cls_test_set.
2022-03-28 11:42:23 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2911 - gcs__sector_cls_test_set - HANDLED_OUTPUT - Handled output "result" using IO manager "io_manager"
2022-03-28 11:42:23 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2911 - gcs__sector_cls_test_set - STEP_SUCCESS - Finished execution of step "gcs__sector_cls_test_set" in 4.02s.
2022-03-28 11:42:23 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2905 - gcs__sector_cls_unk - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2022-03-28 11:42:23 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - gcs__sector_cls_unk - Writing file at: /Users/hdot/vs_code/machine-learning/build/dagster_home/storage/a44b0ba2-4d15-460a-aa31-e00a87a51da1/gcs__sector_cls_unk/result
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2912 - gcs__sector_cls_gcal - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - gcs__sector_cls_gcal - Writing file at: /Users/hdot/vs_code/machine-learning/build/dagster_home/storage/a44b0ba2-4d15-460a-aa31-e00a87a51da1/gcs__sector_cls_gcal/result
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2903 - gcs__sector_cls_wak - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2913 - gcs__sector_cls_crunchbase_data - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2905 - gcs__sector_cls_unk - ASSET_MATERIALIZATION - Materialized value gcs sector_cls_unk.
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - gcs__sector_cls_crunchbase_data - Writing file at: /Users/hdot/vs_code/machine-learning/build/dagster_home/storage/a44b0ba2-4d15-460a-aa31-e00a87a51da1/gcs__sector_cls_crunchbase_data/result
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2907 - gcs__sector_cls_caf - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2905 - gcs__sector_cls_unk - HANDLED_OUTPUT - Handled output "result" using IO manager "io_manager"
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - gcs__sector_cls_caf - Writing file at: /Users/hdot/vs_code/machine-learning/build/dagster_home/storage/a44b0ba2-4d15-460a-aa31-e00a87a51da1/gcs__sector_cls_caf/result
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - gcs__sector_cls_wak - Writing file at: /Users/hdot/vs_code/machine-learning/build/dagster_home/storage/a44b0ba2-4d15-460a-aa31-e00a87a51da1/gcs__sector_cls_wak/result
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2905 - gcs__sector_cls_unk - STEP_SUCCESS - Finished execution of step "gcs__sector_cls_unk" in 4.66s.
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2912 - gcs__sector_cls_gcal - ASSET_MATERIALIZATION - Materialized value gcs sector_cls_gcal.
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2913 - gcs__sector_cls_crunchbase_data - ASSET_MATERIALIZATION - Materialized value gcs sector_cls_crunchbase_data.
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2912 - gcs__sector_cls_gcal - HANDLED_OUTPUT - Handled output "result" using IO manager "io_manager"
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2913 - gcs__sector_cls_crunchbase_data - HANDLED_OUTPUT - Handled output "result" using IO manager "io_manager"
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2912 - gcs__sector_cls_gcal - STEP_SUCCESS - Finished execution of step "gcs__sector_cls_gcal" in 4.61s.
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2907 - gcs__sector_cls_caf - ASSET_MATERIALIZATION - Materialized value gcs sector_cls_caf.
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2913 - gcs__sector_cls_crunchbase_data - STEP_SUCCESS - Finished execution of step "gcs__sector_cls_crunchbase_data" in 4.66s.
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2907 - gcs__sector_cls_caf - HANDLED_OUTPUT - Handled output "result" using IO manager "io_manager"
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2907 - gcs__sector_cls_caf - STEP_SUCCESS - Finished execution of step "gcs__sector_cls_caf" in 4.69s.
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2903 - gcs__sector_cls_wak - ASSET_MATERIALIZATION - Materialized value gcs sector_cls_wak.
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2903 - gcs__sector_cls_wak - HANDLED_OUTPUT - Handled output "result" using IO manager "io_manager"
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2903 - gcs__sector_cls_wak - STEP_SUCCESS - Finished execution of step "gcs__sector_cls_wak" in 4.21s.
Daniel Mosesson
03/28/2022, 10:53 AMsetup gcloud
and the following functions mean that you might benefit from having a @resource
that handles that for you. This would then be instantiated per op
.
It also seems weird to me that all the assets are created in the same but to be honest, I am just getting started with assets so I have no idea if that is the "right" way, or if you would have better luck using an Op Factory or similar for something for thisHarpal
03/28/2022, 12:07 PM).build_job("sector_cls_ALL_dbt_train_test", executor_def=ExecutorDefinition("in_process_executor"))
didn’t change the output in any meaningful way…
@owen Is it possible to run these assets in series (waiting for one thread to finish before kicking off another) or this way or is it not yet possible to restrict the number of workers/threads in this way?owen
03/28/2022, 5:49 PMfrom dagster import in_process_executor
, and then ).build_job("...", executor_def=in_process_executor)
. The main drawback of this approach is that it means that if you ever add more assets into this group, they'll also be bound by this parallelism constraint. If that's not something you foresee doing, then this is probably the best optionmulti_asset
(although the APIs might shift a bit by dagster 1.0), which is essentially multiple assets mapped to a single op. The body of that op could be a loop that iterates through each cli command one by one. This would mean that you wouldn't have to mess around with the executor.Harpal
03/28/2022, 6:01 PM).build_job("sector_cls_ALL_dbt_train_test", executor_def=in_process_executor)
.
The problem I’m facing is that It’s still spawning all of the processes (with different pid
numbers and all) and running them at once. This is visible from the screenshot below and the accompanying error message.
What could be going wrong to cause it to not wait for the first process to complete?
ASSET_MATERIALIZATION - Materialized value gcs sector_cls_crunchbase_data.
2022-03-28 18:53:08 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a5580e24-8ee1-48ee-a5a3-e63b8cab8d67 - 36874 - gcs__sector_cls_crunchbase_data - HANDLED_OUTPUT - Handled output "result" using IO manager "io_manager"
DEBUG: <https://sqladmin.googleapis.com:443> "POST /sql/v1beta4/projects/moonfire/instances/moonfire-01/export?alt=json HTTP/1.1" 409 None
DEBUG: (gcloud.sql.export.csv) HTTPError 409: Operation failed because another operation was already in progress. Try your request after the current operation is complete.
Traceback (most recent call last):
File "/Users/hdot/google-cloud-sdk/lib/googlecloudsdk/calliope/cli.py", line 987, in Execute
resources = calliope_command.Run(cli=self, args=args)
File "/Users/hdot/google-cloud-sdk/lib/googlecloudsdk/calliope/backend.py", line 809, in Run
resources = command_instance.Run(args)
File "/Users/hdot/google-cloud-sdk/lib/surface/sql/export/csv.py", line 64, in Run
return export_util.RunCsvExportCommand(args, client)
File "/Users/hdot/google-cloud-sdk/lib/googlecloudsdk/command_lib/sql/export_util.py", line 163, in RunCsvExportCommand
return RunExportCommand(args, client, csv_export_context)
File "/Users/hdot/google-cloud-sdk/lib/googlecloudsdk/command_lib/sql/export_util.py", line 91, in RunExportCommand
result_operation = sql_client.instances.Export(export_request)
File "/Users/hdot/google-cloud-sdk/lib/googlecloudsdk/third_party/apis/sqladmin/v1beta4/sqladmin_v1beta4_client.py", line 556, in Export
return self._RunMethod(
File "/Users/hdot/google-cloud-sdk/lib/third_party/apitools/base/py/base_api.py", line 737, in _RunMethod
return self.ProcessHttpResponse(method_config, http_response, request)
File "/Users/hdot/google-cloud-sdk/lib/third_party/apitools/base/py/base_api.py", line 743, in ProcessHttpResponse
self.__ProcessHttpResponse(method_config, http_response, request))
File "/Users/hdot/google-cloud-sdk/lib/third_party/apitools/base/py/base_api.py", line 609, in __ProcessHttpResponse
raise exceptions.HttpError.FromResponse(
apitools.base.py.exceptions.HttpConflictError: HttpError accessing <<https://sqladmin.googleapis.com/sql/v1beta4/projects/moonfire/instances/moonfire-01/export?alt=json>>: response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'content-encoding': 'gzip', 'date': 'Mon, 28 Mar 2022 17:53:08 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'x-content-type-options': 'nosniff', 'alt-svc': 'h3=":443"; ma=2592000,h3-29=":443"; ma=2592000,h3-Q050=":443"; ma=2592000,h3-Q046=":443"; ma=2592000,h3-Q043=":443"; ma=2592000,quic=":443"; ma=2592000; v="46,43"', 'transfer-encoding': 'chunked', 'status': 409}>, content <{
"error": {
"code": 409,
"message": "Operation failed because another operation was already in progress. Try your request after the current operation is complete.",
"errors": [
{
"message": "Operation failed because another operation was already in progress. Try your request after the current operation is complete.",
"domain": "global",
"reason": "operationInProgress"
}
]
}
}
>
ERROR: (gcloud.sql.export.csv) HTTPError 409: Operation failed because another operation was already in progress. Try your request after the current operation is complete.
owen
03/28/2022, 6:59 PMdef csv_assets_for_dbt_assets(dbt_assets):
outs = {}
for asset_key in dbt_assets[0].asset_keys:
table_name = asset_key.path[-1]
outs[table_name] = Out(asset_key=AssetKey(["gcs", table_name]))
@multi_asset(outs=outs, non_argument_deps=dbt_assets[0].asset_keys, compute_kind="gcs")
def _assets():
setup_gcloud() # need this to make sure auth works
for table_name in outs.keys():
gcl_export_table_comm = f'gcloud sql export csv moonfire-01 <gs://moonfire-training-data/sector_cls/automatic/{table_name}.csv> --database=moonfire --query="SELECT * FROM public.{table_name}"'
subprocess.call(gcl_export_table_comm, shell=True)
yield Output(table_name, table_name)
return [_assets]
Harpal
03/28/2022, 7:05 PMowen
03/28/2022, 7:06 PMHarpal
03/28/2022, 9:04 PMmulti_asset
Error loading repository location repo.py:dagster.check.ParameterCheckError: Param "non_argument_deps" is not one of ['frozenset', 'set']. Got dict_keys([AssetKey(['sector_cls_haw']), AssetKey(['sector_cls_wak']), AssetKey(['sector_cls_unk']), AssetKey(['sector_cls_caf']), AssetKey(['sector_cls_train_set']), AssetKey(['sector_cls_test_set']), AssetKey(['sector_cls_gcal']), AssetKey(['sector_cls_crunchbase_data'])]) which is type <class 'dict_keys'>.
non_argument_deps=set(dbt_assets[0].asset_keys),
owen
03/28/2022, 9:08 PMHarpal
03/28/2022, 9:20 PMowen
03/28/2022, 10:37 PMdef csv_assets_for_dbt_assets(dbt_assets):
outs = {}
deps = {}
for asset_key in dbt_assets[0].asset_keys:
table_name = asset_key.path[-1]
outs[table_name] = Out(asset_key=AssetKey(["gcs", table_name]))
deps[table_name] = {AssetKey(table_name)}
@multi_asset(outs=outs, non_argument_deps=set(dbt_assets[0].asset_keys), compute_kind="gcs", internal_asset_deps=deps)
basically, just defining that each gcs asset actually only depends on its specific dbt model, rather than every single one of them.Harpal
04/20/2022, 2:33 PMrepo.py
? (asset groups and the likes)
I seem to be having trouble getting a second job up and running and fear this may be why. Is creating a second similar job in this manner possible? If so how do I go about executing this?owen
04/20/2022, 9:42 PMHarpal
04/20/2022, 11:02 PMselect
argument to the function below allows one to pick precisely which dbt DAG to run for each. ‘job’.
dbt_assets = load_assets_from_dbt_project(DBT_PROJECT_DIR, select=f"tag:{DATASET_TYPE}")
Therefore I can run different DBT DAGs and save them to GCS by simply changing the value I provide to DATSET_TYPE. A global variable of input argument to some wrapper function should do the trick. e.g.
DATASET_TYPE = os.environ.get("DATASET_TYPE", "DAG_1")
Is this poor practice in Dagster? Because it seems to solve a lot of my problems without having to change any dagster code daggy 3dowen
04/21/2022, 4:18 PMHarpal
04/22/2022, 8:03 AM