Julius
06/30/2023, 8:21 AMDagsterInstance()
to call all dynamic partitions on the DagsterCloud Serverless. But it returns an empty list.
I'm using DagsterInstance()
in the function of partition_key_to_vars_fn when running Dagster DBT.
# assets\__init__.py
dbt_assets = load_assets_from_dbt_project(
project_dir=DBT_PROJECT_PATH,
profiles_dir=DBT_PROFILES,
select=f"tag:{source}",
op_name=f"{dbt_ops}",
partitions_def=partitions_dict[f"fivetran_{source}"],
key_prefix=source,
partition_key_to_vars_fn=date_from_partition_key, <-------- FUNCTION TO TURN PARTITION KEY TO DBT VARS
use_build_command=True,
display_raw_sql=True
)
And the partition_key_to_vars_fn function:
# date_from_partition_key
def date_from_partition_key(input_str: str):
source, comment, date, code = partition_key_split(input_str)
print(date)
instance = DagsterInstance.get() <---------- DagsterInstance TO GET ALL DYNAMIC PARTITIONS
all_partition = instance.get_dynamic_partitions(
partitions_def_name=f"fivetran_{source}")
print(all_partition) <---------- THIS RETURN AN EMPTY LIST
index = all_partition.index(input_str)
if code != "INIT":
_, _, last_incremental_load_end_date, _ = partition_key_split(all_partition[index - 1])
incremental_load_end_date = date
dbt_vars = {
"last_incremental_load_end_date": last_incremental_load_end_date,
"incremental_load_end_date": incremental_load_end_date # <----use for vars
}
return dbt_vars
It's running fine on my local machine, but when I test on Dagster Cloud Serverless. The all_partition returns an empty list. (image)
I also have to set the DAGSTER_HOME = "/opt/dagster/app/dagster_home"
, if I don't set the DAGSTER_HOME environment variables. I will got this error:
dagster._core.errors.DagsterHomeNotSetError: The environment variable $DAGSTER_HOME is not set.
Dagster requires this environment variable to be set to an existing directory in your filesystem. This directory is used to store metadata across sessions, or load the dagster.yaml file which can configure storing metadata in an external database.
You can resolve this error by exporting the environment variable. For example, you can run the following command in your shell or include it in your shell configuration file:
export DAGSTER_HOME=~"/dagster_home"
or PowerShell
$env:DAGSTER_HOME = ($home + '\dagster_home')or batchset DAGSTER_HOME=%UserProfile%/dagster_homeAlternatively, DagsterInstance.ephemeral() can be used for a transient instance.
File "/usr/local/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_plan.py", line 273, in dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/usr/local/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 369, in core_dagster_event_sequence_for_step
for user_event in check.generator(
File "/usr/local/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 90, in _step_output_error_checked_user_event_sequence
for user_event in user_event_sequence:
File "/usr/local/lib/python3.9/site-packages/dagster/_core/execution/plan/compute.py", line 192, in execute_core_compute
for step_output in _yield_compute_results(step_context, inputs, compute_fn):
File "/usr/local/lib/python3.9/site-packages/dagster/_core/execution/plan/compute.py", line 161, in _yield_compute_results
for event in iterate_with_context(
File "/usr/local/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 445, in iterate_with_context
next_output = next(iterator)
File "/usr/local/lib/python3.9/site-packages/dagster_dbt/asset_defs.py", line 300, in _dbt_op
kwargs["vars"] = partition_key_to_vars_fn(context.partition_key)
File "/opt/dagster/app/dbt_vault/utils/dbt_utils.py", line 23, in date_from_partition_key
instance = DagsterInstance.get()
File "/usr/local/lib/python3.9/site-packages/dagster/_core/instance/__init__.py", line 484, in get
raise DagsterHomeNotSetError(