https://dagster.io/ logo
#ask-community
Title
# ask-community
h

Harpal

03/28/2022, 10:32 AM
Good morning all and happy Monday! 🙂 Does anyone here know how to stop Dagster from running `@op`s or
@asset
s with the MultiprocessExecutor and instead wait for one operation to finish before launching the next? I’m running a for loop over my asset_keys and uploading files to them to GCS with the synchronous
gcloud sql export csv ...
command. But because of the synchronous nature of the command I end up uploading the first asset while all others fail silently 😞 ANY suggestions on how to make this implementation work in series/synchronously would be much appreciated.
d

Daniel Mosesson

03/28/2022, 10:34 AM
You can use the in_process executor
separately, I think you might have an issue with your DAG if that is the issue you are running into
h

Harpal

03/28/2022, 10:40 AM
Thats a good point @Daniel Mosesson. Do you mean this execute_in_process() method? I haven’t used an executor before but that’s likely the best way to resolve this. Are there any examples of your proposed solution I can have a look at on github or elsewhere?
d

Daniel Mosesson

03/28/2022, 10:43 AM
Yes, exactly that. For your job, you do something like
@job(executor_def=in_process_executor)
https://docs.dagster.io/_apidocs/execution#dagster.in_process_executor
h

Harpal

03/28/2022, 10:44 AM
Nice one! I’ll give that a try dagsir Thanks
Source code and error logs as promised: # Code
Copy code
import subprocess

from subprocess import Popen


from dagster import AssetGroup, asset
from dagster_dbt import dbt_cli_resource, load_assets_from_dbt_project
from moonfire_dagster.sector_classification.gcloud_setup import setup_gcloud

DBT_PROJECT_DIR = "./dbt"
DBT_PROFILE_DIR = "./dbt/config"


dbt_assets = load_assets_from_dbt_project(DBT_PROJECT_DIR, select="+sector_cls_train_set")


def csv_assets_for_dbt_assets(dbt_assets):

    ret = []

    for asset_key in dbt_assets[0].asset_keys:

        table_name = asset_key.path[-1]

        @asset(name=table_name, namespace="gcs", non_argument_deps={asset_key}, compute_kind="gcs")
        def _asset():
            setup_gcloud()  # need this to make sure auth works
            # Same error
            # ERROR: (gcloud.sql.export.csv) HTTPError 409: Operation failed because another operation was already in progress. Try your request after the current operation is complete.
            gcl_export_table_comm = f'gcloud sql export csv moonfire-01 <gs://moonfire-training-data/sector_cls/automatic/{table_name}.csv> --database=moonfire --query="SELECT * FROM public.{table_name}"'
            subprocess.call(gcl_export_table_comm, shell=True)

        ret.append(_asset)

    return ret


csv_assets = csv_assets_for_dbt_assets(dbt_assets)

sector_cls_all_dbt_train_test = AssetGroup(
    dbt_assets + csv_assets,
    resource_defs={
        "dbt": dbt_cli_resource.configured(
            {
                "project_dir": DBT_PROJECT_DIR,
                "profiles_dir": DBT_PROFILE_DIR,
                "vars": {"test_split": 0.2},
            }
        )
    },
).build_job("sector_cls_ALL_dbt_train_test")
# Error Logs
Copy code
Exporting Cloud SQL instance...
ERROR: (gcloud.sql.export.csv) HTTPError 409: Operation failed because another operation was already in progress. Try your request after the current operation is complete.
2022-03-28 11:42:23 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2910 - gcs__sector_cls_train_set - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2022-03-28 11:42:23 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - gcs__sector_cls_train_set - Writing file at: /Users/hdot/vs_code/machine-learning/build/dagster_home/storage/a44b0ba2-4d15-460a-aa31-e00a87a51da1/gcs__sector_cls_train_set/result
2022-03-28 11:42:23 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2910 - gcs__sector_cls_train_set - ASSET_MATERIALIZATION - Materialized value gcs sector_cls_train_set.
2022-03-28 11:42:23 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2910 - gcs__sector_cls_train_set - HANDLED_OUTPUT - Handled output "result" using IO manager "io_manager"
2022-03-28 11:42:23 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2910 - gcs__sector_cls_train_set - STEP_SUCCESS - Finished execution of step "gcs__sector_cls_train_set" in 3.63s.
ERROR: (gcloud.sql.export.csv) HTTPError 409: Operation failed because another operation was already in progress. Try your request after the current operation is complete.
ERROR: (gcloud.sql.export.csv) HTTPError 409: Operation failed because another operation was already in progress. Try your request after the current operation is complete.
ERROR: (gcloud.sql.export.csv) HTTPError 409: Operation failed because another operation was already in progress. Try your request after the current operation is complete.
2022-03-28 11:42:23 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2911 - gcs__sector_cls_test_set - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
ERROR: (gcloud.sql.export.csv) HTTPError 409: Operation failed because another operation was already in progress. Try your request after the current operation is complete.
ERROR: (gcloud.sql.export.csv) HTTPError 409: Operation failed because another operation was already in progress. Try your request after the current operation is complete.
2022-03-28 11:42:23 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - gcs__sector_cls_test_set - Writing file at: /Users/hdot/vs_code/machine-learning/build/dagster_home/storage/a44b0ba2-4d15-460a-aa31-e00a87a51da1/gcs__sector_cls_test_set/result
ERROR: (gcloud.sql.export.csv) HTTPError 409: Operation failed because another operation was already in progress. Try your request after the current operation is complete.
2022-03-28 11:42:23 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2911 - gcs__sector_cls_test_set - ASSET_MATERIALIZATION - Materialized value gcs sector_cls_test_set.
2022-03-28 11:42:23 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2911 - gcs__sector_cls_test_set - HANDLED_OUTPUT - Handled output "result" using IO manager "io_manager"
2022-03-28 11:42:23 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2911 - gcs__sector_cls_test_set - STEP_SUCCESS - Finished execution of step "gcs__sector_cls_test_set" in 4.02s.
2022-03-28 11:42:23 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2905 - gcs__sector_cls_unk - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2022-03-28 11:42:23 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - gcs__sector_cls_unk - Writing file at: /Users/hdot/vs_code/machine-learning/build/dagster_home/storage/a44b0ba2-4d15-460a-aa31-e00a87a51da1/gcs__sector_cls_unk/result
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2912 - gcs__sector_cls_gcal - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - gcs__sector_cls_gcal - Writing file at: /Users/hdot/vs_code/machine-learning/build/dagster_home/storage/a44b0ba2-4d15-460a-aa31-e00a87a51da1/gcs__sector_cls_gcal/result
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2903 - gcs__sector_cls_wak - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2913 - gcs__sector_cls_crunchbase_data - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2905 - gcs__sector_cls_unk - ASSET_MATERIALIZATION - Materialized value gcs sector_cls_unk.
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - gcs__sector_cls_crunchbase_data - Writing file at: /Users/hdot/vs_code/machine-learning/build/dagster_home/storage/a44b0ba2-4d15-460a-aa31-e00a87a51da1/gcs__sector_cls_crunchbase_data/result
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2907 - gcs__sector_cls_caf - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2905 - gcs__sector_cls_unk - HANDLED_OUTPUT - Handled output "result" using IO manager "io_manager"
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - gcs__sector_cls_caf - Writing file at: /Users/hdot/vs_code/machine-learning/build/dagster_home/storage/a44b0ba2-4d15-460a-aa31-e00a87a51da1/gcs__sector_cls_caf/result
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - gcs__sector_cls_wak - Writing file at: /Users/hdot/vs_code/machine-learning/build/dagster_home/storage/a44b0ba2-4d15-460a-aa31-e00a87a51da1/gcs__sector_cls_wak/result
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2905 - gcs__sector_cls_unk - STEP_SUCCESS - Finished execution of step "gcs__sector_cls_unk" in 4.66s.
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2912 - gcs__sector_cls_gcal - ASSET_MATERIALIZATION - Materialized value gcs sector_cls_gcal.
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2913 - gcs__sector_cls_crunchbase_data - ASSET_MATERIALIZATION - Materialized value gcs sector_cls_crunchbase_data.
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2912 - gcs__sector_cls_gcal - HANDLED_OUTPUT - Handled output "result" using IO manager "io_manager"
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2913 - gcs__sector_cls_crunchbase_data - HANDLED_OUTPUT - Handled output "result" using IO manager "io_manager"
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2912 - gcs__sector_cls_gcal - STEP_SUCCESS - Finished execution of step "gcs__sector_cls_gcal" in 4.61s.
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2907 - gcs__sector_cls_caf - ASSET_MATERIALIZATION - Materialized value gcs sector_cls_caf.
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2913 - gcs__sector_cls_crunchbase_data - STEP_SUCCESS - Finished execution of step "gcs__sector_cls_crunchbase_data" in 4.66s.
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2907 - gcs__sector_cls_caf - HANDLED_OUTPUT - Handled output "result" using IO manager "io_manager"
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2907 - gcs__sector_cls_caf - STEP_SUCCESS - Finished execution of step "gcs__sector_cls_caf" in 4.69s.
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2903 - gcs__sector_cls_wak - ASSET_MATERIALIZATION - Materialized value gcs sector_cls_wak.
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2903 - gcs__sector_cls_wak - HANDLED_OUTPUT - Handled output "result" using IO manager "io_manager"
2022-03-28 11:42:24 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a44b0ba2-4d15-460a-aa31-e00a87a51da1 - 2903 - gcs__sector_cls_wak - STEP_SUCCESS - Finished execution of step "gcs__sector_cls_wak" in 4.21s.
d

Daniel Mosesson

03/28/2022, 10:53 AM
Skimming a bit, my hunch is that your
setup gcloud
and the following functions mean that you might benefit from having a
@resource
that handles that for you. This would then be instantiated per
op
. It also seems weird to me that all the assets are created in the same but to be honest, I am just getting started with assets so I have no idea if that is the "right" way, or if you would have better luck using an Op Factory or similar for something for this
h

Harpal

03/28/2022, 12:07 PM
All very solid suggestions! I’m also new to software defined assets but I’d love to hear what @owen thinks is the “right” way. If I can’t get this gcloud one-liner to work then I’ll have to download all of the assets as .csv files first and then upload them to GCS in a second step.
Unfortunately I haven’t been able to run the aforementioned loop of @asset s in series. They keep launching subprocesses via the [MultiprocessExecutor] (with their own pid and all - see screenshot below for more details). The input arguments for these assets are quite different from ops so I can’t easily string them into a graph that I would with traditional @op s. Using
).build_job("sector_cls_ALL_dbt_train_test", executor_def=ExecutorDefinition("in_process_executor"))
didn’t change the output in any meaningful way… @owen Is it possible to run these assets in series (waiting for one thread to finish before kicking off another) or this way or is it not yet possible to restrict the number of workers/threads in this way?
o

owen

03/28/2022, 5:49 PM
hi @Harpal! Daniel's suggestion is a reasonable one, and the way to implement it with asset groups would be
from dagster import in_process_executor
, and then
).build_job("...", executor_def=in_process_executor)
. The main drawback of this approach is that it means that if you ever add more assets into this group, they'll also be bound by this parallelism constraint. If that's not something you foresee doing, then this is probably the best option
the other option would be to take advantage of the fact that these assets definitions live on top of the underlying operational graph. dagster currently has the concept of a
multi_asset
(although the APIs might shift a bit by dagster 1.0), which is essentially multiple assets mapped to a single op. The body of that op could be a loop that iterates through each cli command one by one. This would mean that you wouldn't have to mess around with the executor.
h

Harpal

03/28/2022, 6:01 PM
Hiya @owen! Thanks for suggesting the proper syntax. I see what you mean. It should be fine to have these assets bound to the parallelism constraint enforced
).build_job("sector_cls_ALL_dbt_train_test", executor_def=in_process_executor)
. The problem I’m facing is that It’s still spawning all of the processes (with different
pid
numbers and all) and running them at once. This is visible from the screenshot below and the accompanying error message. What could be going wrong to cause it to not wait for the first process to complete?
Copy code
ASSET_MATERIALIZATION - Materialized value gcs sector_cls_crunchbase_data.
2022-03-28 18:53:08 +0100 - dagster - DEBUG - sector_cls_ALL_dbt_train_test - a5580e24-8ee1-48ee-a5a3-e63b8cab8d67 - 36874 - gcs__sector_cls_crunchbase_data - HANDLED_OUTPUT - Handled output "result" using IO manager "io_manager"
DEBUG: <https://sqladmin.googleapis.com:443> "POST /sql/v1beta4/projects/moonfire/instances/moonfire-01/export?alt=json HTTP/1.1" 409 None
DEBUG: (gcloud.sql.export.csv) HTTPError 409: Operation failed because another operation was already in progress. Try your request after the current operation is complete.


Traceback (most recent call last):
  File "/Users/hdot/google-cloud-sdk/lib/googlecloudsdk/calliope/cli.py", line 987, in Execute
    resources = calliope_command.Run(cli=self, args=args)
  File "/Users/hdot/google-cloud-sdk/lib/googlecloudsdk/calliope/backend.py", line 809, in Run
    resources = command_instance.Run(args)
  File "/Users/hdot/google-cloud-sdk/lib/surface/sql/export/csv.py", line 64, in Run
    return export_util.RunCsvExportCommand(args, client)
  File "/Users/hdot/google-cloud-sdk/lib/googlecloudsdk/command_lib/sql/export_util.py", line 163, in RunCsvExportCommand
    return RunExportCommand(args, client, csv_export_context)
  File "/Users/hdot/google-cloud-sdk/lib/googlecloudsdk/command_lib/sql/export_util.py", line 91, in RunExportCommand
    result_operation = sql_client.instances.Export(export_request)
  File "/Users/hdot/google-cloud-sdk/lib/googlecloudsdk/third_party/apis/sqladmin/v1beta4/sqladmin_v1beta4_client.py", line 556, in Export
    return self._RunMethod(
  File "/Users/hdot/google-cloud-sdk/lib/third_party/apitools/base/py/base_api.py", line 737, in _RunMethod
    return self.ProcessHttpResponse(method_config, http_response, request)
  File "/Users/hdot/google-cloud-sdk/lib/third_party/apitools/base/py/base_api.py", line 743, in ProcessHttpResponse
    self.__ProcessHttpResponse(method_config, http_response, request))
  File "/Users/hdot/google-cloud-sdk/lib/third_party/apitools/base/py/base_api.py", line 609, in __ProcessHttpResponse
    raise exceptions.HttpError.FromResponse(
apitools.base.py.exceptions.HttpConflictError: HttpError accessing <<https://sqladmin.googleapis.com/sql/v1beta4/projects/moonfire/instances/moonfire-01/export?alt=json>>: response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'content-encoding': 'gzip', 'date': 'Mon, 28 Mar 2022 17:53:08 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'x-content-type-options': 'nosniff', 'alt-svc': 'h3=":443"; ma=2592000,h3-29=":443"; ma=2592000,h3-Q050=":443"; ma=2592000,h3-Q046=":443"; ma=2592000,h3-Q043=":443"; ma=2592000,quic=":443"; ma=2592000; v="46,43"', 'transfer-encoding': 'chunked', 'status': 409}>, content <{
  "error": {
    "code": 409,
    "message": "Operation failed because another operation was already in progress. Try your request after the current operation is complete.",
    "errors": [
      {
        "message": "Operation failed because another operation was already in progress. Try your request after the current operation is complete.",
        "domain": "global",
        "reason": "operationInProgress"
      }
    ]
  }
}
>


ERROR: (gcloud.sql.export.csv) HTTPError 409: Operation failed because another operation was already in progress. Try your request after the current operation is complete.
I’m really hoping that I can get these running in series as I’d hate to have to pay for separate Extraction tool (like fivetran) just to do this one step in my DAG 😅
o

owen

03/28/2022, 6:59 PM
hm it's really strange that the executor def argument isn't being respected here. seems to indicate a bug on our side, although looking at the code I don't see an immediate way for that to occur. I'll dig into this more this afternoon, sorry that you're running into this
the multi_asset solution would also work, so I can throw together what that would look like as well
Copy code
def csv_assets_for_dbt_assets(dbt_assets):

    outs = {}

    for asset_key in dbt_assets[0].asset_keys:

        table_name = asset_key.path[-1]
        outs[table_name] = Out(asset_key=AssetKey(["gcs", table_name]))

    @multi_asset(outs=outs, non_argument_deps=dbt_assets[0].asset_keys, compute_kind="gcs")
    def _assets():
        setup_gcloud()  # need this to make sure auth works
        for table_name in outs.keys():
            gcl_export_table_comm = f'gcloud sql export csv moonfire-01 <gs://moonfire-training-data/sector_cls/automatic/{table_name}.csv> --database=moonfire --query="SELECT * FROM public.{table_name}"'
            subprocess.call(gcl_export_table_comm, shell=True)
            yield Output(table_name, table_name)

    return [_assets]
that would be something like this (might be some random syntax errors in there but that's basically it)
h

Harpal

03/28/2022, 7:05 PM
No problemo Owen. I'll keep beavering away from my end too! Thanks so much for the pseudocode 👍
o

owen

03/28/2022, 7:06 PM
this solution just maps all your gcs csv file assets to a single operation (which will do all of them in sequence)
h

Harpal

03/28/2022, 9:04 PM
I’ve almost cracked it
There seems to be an issue with the types accepted by
multi_asset
Copy code
Error loading repository location repo.py:dagster.check.ParameterCheckError: Param "non_argument_deps" is not one of ['frozenset', 'set']. Got dict_keys([AssetKey(['sector_cls_haw']), AssetKey(['sector_cls_wak']), AssetKey(['sector_cls_unk']), AssetKey(['sector_cls_caf']), AssetKey(['sector_cls_train_set']), AssetKey(['sector_cls_test_set']), AssetKey(['sector_cls_gcal']), AssetKey(['sector_cls_crunchbase_data'])]) which is type <class 'dict_keys'>.
We cool. This may have fixed it:
Copy code
non_argument_deps=set(dbt_assets[0].asset_keys),
o

owen

03/28/2022, 9:08 PM
yep nice was just about to suggest that
h

Harpal

03/28/2022, 9:20 PM
And it works AGAIN. @owen you are an absolute superhero
🎉 2
o

owen

03/28/2022, 10:37 PM
@Harpal just saw your message in #dagster-feedback and right now we don't have a way to do that, but one thing that would clean up your diagram a bit is
Copy code
def csv_assets_for_dbt_assets(dbt_assets):

    outs = {}
    deps = {}

    for asset_key in dbt_assets[0].asset_keys:

        table_name = asset_key.path[-1]
        outs[table_name] = Out(asset_key=AssetKey(["gcs", table_name]))
        deps[table_name] = {AssetKey(table_name)}

    @multi_asset(outs=outs, non_argument_deps=set(dbt_assets[0].asset_keys), compute_kind="gcs", internal_asset_deps=deps)
basically, just defining that each gcs asset actually only depends on its specific dbt model, rather than every single one of them.
🎉 1
h

Harpal

04/20/2022, 2:33 PM
Hi @owen! I’ve designed another job that need to upload assets to gcloud in a similar manner. Is it possible to have two instance of the above functionality within the same
repo.py
? (asset groups and the likes) I seem to be having trouble getting a second job up and running and fear this may be why. Is creating a second similar job in this manner possible? If so how do I go about executing this?
o

owen

04/20/2022, 9:42 PM
hi @Harpal! what issue are you running into exactly? if you just have mulitple invocations of "csv_assets_for_dbt_assets()" what happens?
h

Harpal

04/20/2022, 11:02 PM
Hiya @owen. Apologies, I kept it vague because I recall reading in the docs somewhere that it was not possible and was busy looking for workaround. I found a pretty solid workaround for now which should tide me over for the time being 🤞 I discovered that changing the input to the
select
argument to the function below allows one to pick precisely which dbt DAG to run for each. ‘job’.
Copy code
dbt_assets = load_assets_from_dbt_project(DBT_PROJECT_DIR, select=f"tag:{DATASET_TYPE}")
Therefore I can run different DBT DAGs and save them to GCS by simply changing the value I provide to DATSET_TYPE. A global variable of input argument to some wrapper function should do the trick. e.g.
Copy code
DATASET_TYPE = os.environ.get("DATASET_TYPE", "DAG_1")
Is this poor practice in Dagster? Because it seems to solve a lot of my problems without having to change any dagster code daggy 3d
o

owen

04/21/2022, 4:18 PM
hi @Harpal! I wouldn't call this bad practice exactly, but it does cut you off from being able to view the structure of both of these dbt dags in Dagit at the same time. in general, the biggest issues here would be UI concerns like that, where we want to provide metadata about the structure of your job, but the structure of your job is subject to change depending on the env var set.
h

Harpal

04/22/2022, 8:03 AM
Ahh I see what you mean. Fortunately enough, these two actually have a one node in common, giving me one large DAG with two “paths” representing each tagged “job”. So the DagitUI shows everything (albeit a little cluttered 😅). Clean as a whistle dagsir
7 Views