Harpal
07/20/2022, 11:26 AMAssetGroup()
and @multi_asset
methods to using config_schema
like with <!subteam^S029W40435L|@ops>.
The purpose of this is so that I can use and vary these parameters in the dagitUI.
Is there another way to pass parameters from the dagitui that can be used in the AssetGroup()
and @multi_asset
methods?
See the comment section for my env vars in a code snippet 😄Harpal
07/20/2022, 11:27 AMmulti_asset
scope (line 46) AND the AssetGroup
scope (line 63)?
DATASET_TYPE = os.environ.get("DATASET_TYPE", "hold") # can be "hold" or "arb"
TEST_SPLIT = os.environ.get("TEST_SPLIT", 0.1)
TRAIN_SPLIT = os.environ.get("TRAIN_SPLIT", 0.8)
DIR_ULID = str(ULID())
DBT_PROJECT_DIR = "./dbt"
DBT_PROFILE_DIR = "./dbt/config"
Harpal
07/20/2022, 2:17 PMAssetGroup()
is of a different scope from multi_asset
.
Would it be possible to run the above job with configurable "vars"
in the dagitUI (and push certain files to GCS) if we stopped using AssetGroup()
?
What would be the cons of dropping this as editing these files in the dagitUI would be super nice feature 🙂owen
07/20/2022, 4:44 PMconfig
(just like traditional op-based jobs).
It seems to me that the main change here having all of your dbt assets available to you at once (rather than only loading "arb" or "hold" at one time). Then, you could have two separate asset jobs (one for rematerializing arb, and another for rematerializing hold). From there, you could choose which job you wanted to run in the UI.
from dagster import repository, define_asset_job, AssetSelection, with_resources
# ...
dbt_hold_assets = load_assets_from_dbt_manifest(
manifest_json=manifest_json,
select=f"tag:hold",
)
csv_hold_assets = csv_assets_for_dbt_assets(dbt_hold_assets)
dbt_arb_assets = load_assets_from_dbt_manifest(
manifest_json=manifest_json,
select=f"tag:hold",
)
csv_arb_assets = csv_assets_for_dbt_assets(dbt_arb_assets)
hold_job = define_asset_job("hold", AssetSelection.assets(*(dbt_hold_assets + csv_hold_assets)))
arb_job = define_asset_job("arb", AssetSelection.assets(*(dbt_arb_assets + csv_arb_assets)))
all_assets = with_resources(
dbt_hold_assets + csv_hold_assets + dbt_arb_assets + csv_arb_assets,
resource_des=..., # your resource defs
)
@repository
def my_repo():
return [all_assets, hold_job, arb_job]
owen
07/20/2022, 4:46 PM