<@U0667DNC02Y> i have asset job import pandas as p...
# ask-ai
s
@Scout i have asset job import pandas as pd from dagster import asset, job, repository import os # from dagster_snowflake import SnowflakeResource from dagster import asset from sqlalchemy import create_engine from dagster import asset from dmax_app.orchestration.orchestration_manager.resources.metadata_resource import metadata_resource from dmax_app.orchestration.orchestration_manager.resources.patch_resource_def import patch_cloud_storage_selector from dmax_app.orchestration.orchestration_manager.resources.snowflake_crawler_resource import ( SnowflakeCrawlConnection, ) from dmax_app.orchestration.orchestration_manager.loggers.setup_datamax_logger import ( setup_logger, ) from dmax_app.orchestration.orchestration_enums import ( ENUM_DB_CREDENTIAL_DICT, ENUM_DW_CREDENTIAL_DICT, ) from dmax_app.orchestration.orchestration_enums import ( ENUM_DW_CREDENTIAL_DICT, ) from dmax_app.orchestration.orchestration_manager.resources.warehouse_resource import warehouse_resource_selector # wh_res = SnowflakeCrawlConnection() l_dw_database = ENUM_DW_CREDENTIAL_DICT["database"] l_logger = setup_logger() l_db_schema = ENUM_DB_CREDENTIAL_DICT["schema"] dir_path = os.path.dirname(os.path.realpath(file)) def engine_creation(): database_name = ENUM_DB_CREDENTIAL_DICT["database"] user = ENUM_DB_CREDENTIAL_DICT["user"] password = ENUM_DB_CREDENTIAL_DICT["password"] host = ENUM_DB_CREDENTIAL_DICT["host"] port = ENUM_DB_CREDENTIAL_DICT["port"] # l_logger.info(database_name,user,password,host,port) engine = create_engine( f"postgresql//{user}{password}@{host}:{port}/{database_name}" ) return engine dw_obj =warehouse_resource_selector() @asset( resource_defs=patch_cloud_storage_selector({ "db_obj": metadata_resource, "dw_obj": dw_obj, })) def show_schema_asset(context) -> pd.DataFrame: # wh_ress = warehouse_resource_selector() db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query = f"SHOW SCHEMAS IN DATABASE {l_dw_database};" l_logger.info(l_query) # result = context.resources.db_obj.execute_query_as_dataframe(sql_query=l_query) df_schema = dw_obj.execute_query_as_dataframe(sql_query=l_query) engine = engine_creation() df_schema.to_sql( "crawler_schemas", engine, schema=l_db_schema, if_exists="replace", index=False ) @asset( resource_defs=patch_cloud_storage_selector({ "db_obj": metadata_resource, "dw_obj": dw_obj, })) def show_table_asset(context) -> pd.DataFrame: db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query = f"SHOW TABLES IN DATABASE {l_dw_database};" l_logger.info(l_query) df_table = dw_obj.execute_query_as_dataframe(sql_query=l_query) engine = engine_creation() df_table.to_sql( "crawler_tables", engine, schema=l_db_schema, if_exists="replace", index=False ) @asset( resource_defs=patch_cloud_storage_selector({ "db_obj": metadata_resource, "dw_obj": dw_obj, })) def show_col_asset(context) -> pd.DataFrame: db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query = f"SHOW COLUMNS IN DATABASE {l_dw_database};" l_logger.info(l_query) df_col = dw_obj.execute_query_as_dataframe(sql_query=l_query) # l_logger.info(df_col) df_col = df_col.sort_values(by=["schema_name", "table_name"]).reset_index(drop=True) df_col["ordinal_position"] = df_col.index engine = engine_creation() df_col.to_sql( "crawler_columns", engine, schema=l_db_schema, if_exists="replace", index=False ) @asset( resource_defs=patch_cloud_storage_selector({ "db_obj": metadata_resource, "dw_obj": dw_obj, })) def show_view_asset(context) -> pd.DataFrame: db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query = f"SHOW VIEWS IN DATABASE {l_dw_database};" l_logger.info(l_query) df_view = dw_obj.execute_query_as_dataframe(sql_query=l_query) engine = engine_creation() df_view.to_sql( "crawler_views", engine, schema=l_db_schema, if_exists="replace", index=False ) @asset( resource_defs=patch_cloud_storage_selector({ "db_obj": metadata_resource, "dw_obj": dw_obj, })) def show_keys_asset(context) -> pd.DataFrame: db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query = f"SHOW IMPORTED KEYS IN DATABASE {l_dw_database};" l_logger.info(l_query) df_keys = dw_obj.execute_query_as_dataframe(sql_query=l_query) engine = engine_creation() df_keys.to_sql( "crawler_constraints", engine, schema=l_db_schema, if_exists="replace", index=False, ) @asset( resource_defs=patch_cloud_storage_selector({ "db_obj": metadata_resource, "dw_obj": dw_obj, })) def show_pk_keys_asset(context) -> pd.DataFrame: db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query = f"SHOW PRIMARY KEYS IN DATABASE {l_dw_database};" l_logger.info(l_query) df_pk_keys = dw_obj.execute_query_as_dataframe(sql_query=l_query) engine = engine_creation() df_pk_keys.to_sql( "crawler_pk", engine, schema=l_db_schema, if_exists="replace", index=False ) @asset( resource_defs=patch_cloud_storage_selector({ "db_obj": metadata_resource, "dw_obj": dw_obj, })) def show_prod_dmax_tables(context) -> pd.DataFrame: db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query = f"SELECT * FROM DMAX_CHANGE_VERSION.DMAX_TABLES" l_logger.info(l_query) df_dmax_tables = dw_obj.execute_query_as_dataframe(sql_query=l_query) l_logger.info(df_dmax_tables) engine = engine_creation() df_dmax_tables.to_sql( "crawler_prod_tables", engine, schema=l_db_schema, if_exists="replace", index=False, ) @asset( resource_defs=patch_cloud_storage_selector({ "db_obj": metadata_resource, "dw_obj": dw_obj, })) def dwh_metadata_creation(context,show_schema_asset,show_table_asset,show_col_asset,show_view_asset,show_keys_asset,show_pk_keys_asset,show_prod_dmax_tables) -> pd.DataFrame: db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query_dwh_metadata = dir_path + "/delta_generation_scripts/dwh_metadata.sql" l_query_dwh = open(l_query_dwh_metadata).read().format(schema=l_db_schema) l_query_dwh_metadata = """CREATE TABLE DWH_METADATA AS {query}""".format( query=l_query_dwh ) db_obj.execute_query(l_query_dwh_metadata) @asset( resource_defs=patch_cloud_storage_selector({ "db_obj": metadata_resource, "dw_obj": dw_obj, })) def insert_delta_asset(context,dwh_metadata_creation): db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query_insert_delta = ( dir_path + "/delta_generation_scripts/crawler_identify_inserts.sql" ) l_query_insert = open(l_query_insert_delta).read().format(schema=l_db_schema) l_query_insert_delta = """drop table if exists crawler_identify_inserts; CREATE TABLE crawler_identify_inserts AS {query}""".format( query=l_query_insert ) db_obj.execute_query(l_query_insert_delta) @asset( resource_defs=patch_cloud_storage_selector({ "db_obj": metadata_resource, "dw_obj": dw_obj, })) def delete_delta_asset(context,dwh_metadata_creation): db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query_delete_delta = ( dir_path + "/delta_generation_scripts/crawler_identify_deletes.sql" ) l_query_delete = open(l_query_delete_delta).read().format(schema=l_db_schema) l_query_delete_delta = """drop table if exists crawler_identify_ideletes; CREATE TABLE crawler_identify_ideletes AS {query}""".format( query=l_query_delete ) db_obj.execute_query(l_query_delete_delta) @asset( resource_defs=patch_cloud_storage_selector({ "db_obj": metadata_resource, "dw_obj": dw_obj, })) def update_delta_asset(context,dwh_metadata_creation): db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query_update_delta = ( dir_path + "/delta_generation_scripts/crawler_identify_updates.sql" ) l_query_update = open(l_query_update_delta).read().format(schema=l_db_schema) l_query_update_delta = """drop table if exists crawler_identify_updates; CREATE TABLE crawler_identify_updates AS {query}""".format( query=l_query_update ) db_obj.execute_query(l_query_update_delta) @job( resource_defs=patch_cloud_storage_selector({ "db_obj": metadata_resource, "dw_obj": warehouse_resource_selector(), }) ) def all_assets_job(): show_schema_asset() show_table_asset() show_schema_asset() show_table_asset() show_col_asset() show_view_asset() show_keys_asset() show_pk_keys_asset() show_prod_dmax_tables() dwh_metadata_creation(show_schema_asset(),show_table_asset(),show_col_asset(),show_view_asset(),show_keys_asset(),show_pk_keys_asset(),show_prod_dmax_tables()) insert_delta_asset(dwh_metadata_creation()) delete_delta_asset(dwh_metadata_creation()) update_delta_asset(dwh_metadata_creation()) # Repository definition that includes the assets and the job @repository def my_repo(): return [ show_schema_asset, show_table_asset, show_col_asset, show_view_asset, show_keys_asset, show_pk_keys_asset, show_prod_dmax_tables, dwh_metadata_creation, insert_delta_asset, delete_delta_asset, update_delta_asset, all_assets_job ] # Execute the job in process def crawler_asset_job(p_in_db_obj, l_db_schema): try: db_obj = p_in_db_obj # schema = l_db_schema dir_path = os.path.dirname(os.path.realpath(file)) drop_view_query = """ drop table if exists {schema}.DWH_METADATA;""".format( schema=l_db_schema ) db_obj.execute_query(drop_view_query) # Asset job triggers result = my_repo.get_job("all_assets_job").execute_in_process() if not result.success: raise Exception("Asset job execution failed") return result except Exception as e: print("Asset job execution failed") where dwh_metadata_creation depends on other assets , is my syntax correct
s
Based on the documentation, there are a few things to consider when defining assets and asset jobs in Dagster: 1. Asset Dependencies: When an asset depends on other assets, you should specify these dependencies by passing the assets as inputs to the dependent asset. This is done by referencing the assets in the function parameters. 2. Resource Definitions: You are correctly using
resource_defs
to provide resources to your assets. However, ensure that the resources you are providing match the expected resource keys within your assets. 3. Job Definition: When defining a job that orchestrates multiple assets, you should pass the assets as a list to the job definition. The job will then construct a graph of these assets, respecting their dependencies. 4. Repository Definition: Your repository definition should return a list of all the assets and jobs that you want to be part of the repository. 5. Execution: To execute a job, you can use the
execute_in_process
method, which is suitable for testing and development purposes. Looking at your code, there are a few potential issues and improvements that can be made: - The
dwh_metadata_creation
asset seems to depend on the outputs of other assets. You should pass these assets as inputs to
dwh_metadata_creation
by including them in the function parameters. However, your current syntax for defining the dependencies in
dwh_metadata_creation
is incorrect. You should not call the assets as functions within the body of another asset. Instead, pass them as parameters. - The
all_assets_job
job definition is calling the assets as functions, which is not necessary. Instead, you should pass the assets directly to the job definition. - The
crawler_asset_job
function is trying to execute the job from the repository by calling
get_job
. However, the correct method to retrieve a job from a repository is
my_repo.get_all_jobs()
which returns a dictionary of jobs, and you can access the desired job by its name. Here is a revised example based on the documentation:
Copy code
python @asset def asset_one(context):     # Your logic here     pass  @asset def asset_two(context, asset_one_output):     # Your logic here, using asset_one_output     pass  @job def my_job():     asset_two(asset_one())  @repository def my_repo():     return [my_job]
In this example,
asset_two
depends on the output of
asset_one
, and this dependency is expressed by passing
asset_one
as an input to
asset_two
within the job definition. Please review your code and adjust the asset definitions and job orchestration to correctly express dependencies and resource definitions. If you encounter specific errors or issues, you may need to consult the documentation or seek further assistance with debugging.
s
@Scout in reference to your above response
Copy code
python
@asset
def asset_one(context):
    # Your logic here
    pass

@asset
def asset_two(context, asset_one_output):
    # Your logic here, using asset_one_output
    pass
@asset
def asset_three(context, asset_one_output,asset_two_output):
    # Your logic here, using asset_one_output
    pass
@job
def my_job():
    asset_two(asset_one())

@repository
def my_repo():
    return [my_job]
here i have a 3rd asset which depends on two asset output how to define it?
s
In Dagster, when you have an asset that depends on the outputs of multiple other assets, you can define it by simply listing those assets as parameters in the dependent asset's function. Here's how you can define
asset_three
which depends on the outputs of both
asset_one
and `asset_two`:
Copy code
python @asset def asset_one(context):     # Your logic here     pass  @asset def asset_two(context, asset_one_output):     # Your logic here, using asset_one_output     pass  @asset def asset_three(context, asset_one_output, asset_two_output):     # Your logic here, using asset_one_output and asset_two_output     pass  @job def my_job():     asset_three(asset_one(), asset_two(asset_one()))  @repository def my_repo():     return [my_job]
In the
my_job
definition,
asset_two
is called with
asset_one()
as its input, and
asset_three
is called with both
asset_one()
and
asset_two(asset_one())
as its inputs. This way, Dagster knows to execute
asset_one
and
asset_two
before
asset_three
, and it passes their outputs to
asset_three
as arguments. When you define a job in this way, Dagster constructs a graph of assets, respecting their dependencies based on the parameters you've provided. The execution plan for the job will ensure that assets are executed in the correct order, with the outputs of upstream assets being passed to downstream assets that depend on them.
s
@Scout i have an asset materialization job given below , i want to pass the schema_list_for_query as a parameter to all the assets, how can i do that
s
To pass a parameter such as
schema_list_for_query
to all assets in a job, you can use the
config
system in Dagster to supply runtime configuration to your assets. Each asset can be configured to accept a
schema_list_for_query
parameter, and you can then provide the value for this parameter when you launch the job. Here's an example of how you can modify your assets to accept a
schema_list_for_query
parameter and how to provide that parameter when launching the job: 1. Modify each asset to accept a config schema:
Copy code
python from dagster import asset, Field, String  @asset(config_schema={"schema_list_for_query": Field(String)}) def my_asset(context):     schema_list_for_query = context.op_config["schema_list_for_query"]     # Use schema_list_for_query in your asset logic     pass  # Repeat for other assets as needed
2. Define your job as usual:
Copy code
python @job def my_asset_job():     my_asset()  # And other assets as needed
3. When you launch the job, provide the
schema_list_for_query
parameter in the config:
Copy code
python result = my_asset_job.execute_in_process(     run_config={         "ops": {             "my_asset": {                 "config": {                     "schema_list_for_query": "your_schema_list_value"                 }             },             # Repeat for other assets as needed         }     } )
In this example,
my_asset
is an asset that now accepts a config schema with a field named
schema_list_for_query
. When you execute the job, you provide the value for
schema_list_for_query
in the
run_config
dictionary under the corresponding asset's config. Please note that the
execute_in_process
method is typically used for testing and development. For production scenarios, you would provide the config when you launch the job through the Dagster UI or when you trigger the job via the Dagster API. If you have many assets and you want to avoid repeating the config for each asset, you can consider using a shared resource that provides the
schema_list_for_query
to all assets that require it. This way, you can centralize the configuration and make it easier to manage.
s
@Scout in reference to your above response i want the
schema_list_for_query
for a particular asset so the config also should be for that particular asset or shall i add for all
s
If you want the
schema_list_for_query
to be specific to a particular asset, you should define the config schema only for that asset. You do not need to add the config schema to all assets if they do not require it. Here's how you can define the config schema for a single asset:
Copy code
python from dagster import asset, Field, String  @asset(config_schema={"schema_list_for_query": Field(String)}) def my_specific_asset(context):     schema_list_for_query = context.op_config["schema_list_for_query"]     # Use schema_list_for_query in your asset logic     pass  # Other assets without the config_schema @asset def my_other_asset(context):     # Asset logic that does not require schema_list_for_query     pass  # Repeat for other assets as needed
And when you launch the job, you only provide the
schema_list_for_query
parameter in the config for the asset that requires it:
Copy code
python result = my_asset_job.execute_in_process(     run_config={         "ops": {             "my_specific_asset": {                 "config": {                     "schema_list_for_query": "your_schema_list_value"                 }             },             # No need to provide config for other assets that don't require it         }     } )
In this example, only
my_specific_asset
has a
config_schema
that includes
schema_list_for_query
. When you execute the job, you provide the value for
schema_list_for_query
only for
my_specific_asset
. Other assets that do not require this configuration are not affected and do not need to have a
config_schema
defined for
schema_list_for_query
.
s
@Scout pls rewrite this syntax config_schema={"schema_list_for_query": Field(String) since schema_list_for_query is a list of string
s
To define a config schema for an asset where
schema_list_for_query
is a list of strings, you can use the
List
field type from Dagster's
typing
module. Here's how you can define the config schema for an asset that accepts a list of strings:
Copy code
python from dagster import asset, Field from typing import List  @asset(config_schema={"schema_list_for_query": Field(List[str])}) def my_specific_asset(context):     schema_list_for_query = context.op_config["schema_list_for_query"]     # Use schema_list_for_query in your asset logic     pass  # Other assets without the config_schema @asset def my_other_asset(context):     # Asset logic that does not require schema_list_for_query     pass  # Repeat for other assets as needed
When you launch the job, you provide the
schema_list_for_query
parameter as a list of strings in the config for the asset that requires it:
Copy code
python result = my_asset_job.execute_in_process(     run_config={         "ops": {             "my_specific_asset": {                 "config": {                     "schema_list_for_query": ["schema1", "schema2", "schema3"]                 }             },             # No need to provide config for other assets that don't require it         }     } )
In this example,
schema_list_for_query
is expected to be a list of strings, and you provide an actual list of strings when you execute the job. This configuration is specific to
my_specific_asset
, and other assets do not require this configuration.
s
@Scout in reference to your above response i am getting this error E dagster._core.errors.DagsterInvalidDefinitionError: You have passed in typing.List[str] to the config system. Types from the typing module in python are not allowed in the config system. You must use types that are imported from dagster or primitive types such as bool, int, etc.
s
The error message you're encountering indicates that Dagster's config system does not accept the
typing.List
type directly. Instead, you should use Dagster's own
Array
type for lists. Here's how you can define the config schema for an asset that accepts a list of strings using Dagster's
Array
type:
Copy code
python from dagster import asset, Field, Array, String  @asset(config_schema={"schema_list_for_query": Field(Array[String])}) def my_specific_asset(context):     schema_list_for_query = context.op_config["schema_list_for_query"]     # Use schema_list_for_query in your asset logic     pass  # Other assets without the config_schema @asset def my_other_asset(context):     # Asset logic that does not require schema_list_for_query     pass  # Repeat for other assets as needed
When you launch the job, you provide the
schema_list_for_query
parameter as a list of strings in the config for the asset that requires it:
Copy code
python result = my_asset_job.execute_in_process(     run_config={         "ops": {             "my_specific_asset": {                 "config": {                     "schema_list_for_query": ["schema1", "schema2", "schema3"]                 }             },             # No need to provide config for other assets that don't require it         }     } )
In this revised example,
Field(Array[String])
is used to define a list of strings in the config schema, which is compatible with Dagster's config system. This should resolve the error you're encountering.
s
@Scout in reference to your above response i am getting this error E TypeError: 'type' object is not subscriptable
s
The error message you're encountering, "TypeError: 'type' object is not subscriptable," suggests that you're trying to use subscript notation (e.g.,
String[...]
) with a type that doesn't support it. This is likely due to the fact that the
String
type from Dagster's type system is not meant to be used with subscript notation. In Dagster, to define a list of strings in the config schema, you should use
Array
with
str
as the inner type, not
String
. Here's the correct way to define the config schema for an asset that accepts a list of strings:
Copy code
python from dagster import asset, Field, Array  @asset(config_schema={"schema_list_for_query": Field(Array(str))}) def my_specific_asset(context):     schema_list_for_query = context.op_config["schema_list_for_query"]     # Use schema_list_for_query in your asset logic     pass  # Other assets without the config_schema @asset def my_other_asset(context):     # Asset logic that does not require schema_list_for_query     pass  # Repeat for other assets as needed
When you launch the job, you provide the
schema_list_for_query
parameter as a list of strings in the config for the asset that requires it:
Copy code
python result = my_asset_job.execute_in_process(     run_config={         "ops": {             "my_specific_asset": {                 "config": {                     "schema_list_for_query": ["schema1", "schema2", "schema3"]                 }             },             # No need to provide config for other assets that don't require it         }     } )
In this corrected example,
Field(Array(str))
is used to define a list of strings in the config schema. This should resolve the "TypeError: 'type' object is not subscriptable" error you're encountering.
s
@Scout in reference to your above response , i am getting this error 'Error in config for job\n Error 1: Missing required config entries [\'show_col_asset_2\', \'show_col_asset_3\', \'show_col_asset_4\', \'show_col_asset_5\'] at path root:ops. Sample config for missing entries: {\'show_col_asset_2\': {\'config\': {\'schema_list_for_query\': []}}, \'show_col_asset_3\': {\'config\': {\'schema_list_for_query\': []}}, \'show_col_asset_4\': {\'config\': {\'schema_list_for_query\': []}}, \'show_col_asset_5\': {\'config\': {\'schema_list_for_query\': []}}}\n Error 2: Value at path rootopsshow_col_assetconfigschema_list_for_query must be list. Expected: "[String]"'
s
The error message you're encountering indicates two issues: 1. Missing required config entries for
show_col_asset_2
,
show_col_asset_3
,
show_col_asset_4
, and
show_col_asset_5
. It seems that these assets are expecting a config entry named
schema_list_for_query
, which has not been provided. 2. The value provided for
schema_list_for_query
for
show_col_asset
is not a list, as expected by the config schema. To resolve these issues, you need to ensure that: - Each asset that requires the
schema_list_for_query
config entry has it provided in the
run_config
. - The value for
schema_list_for_query
is indeed a list of strings. Here's how you can structure your
run_config
to provide the necessary config entries for each asset:
Copy code
python result = my_asset_job.execute_in_process(     run_config={         "ops": {             "show_col_asset": {                 "config": {                     "schema_list_for_query": ["schema1", "schema2", "schema3"]                 }             },             "show_col_asset_2": {                 "config": {                     "schema_list_for_query": ["schema1", "schema2", "schema3"]                 }             },             "show_col_asset_3": {                 "config": {                     "schema_list_for_query": ["schema1", "schema2", "schema3"]                 }             },             "show_col_asset_4": {                 "config": {                     "schema_list_for_query": ["schema1", "schema2", "schema3"]                 }             },             "show_col_asset_5": {                 "config": {                     "schema_list_for_query": ["schema1", "schema2", "schema3"]                 }             },             # Repeat for other assets as needed         }     } )
Make sure that each asset that requires the
schema_list_for_query
config has an entry under
ops
in the
run_config
, and that the value for
schema_list_for_query
is a list. If you have many assets that require the same config, and you want to avoid repeating the config for each asset, you might consider using a shared resource or leveraging Dagster's
configured
API to create a new version of the asset with the config baked in. However, this approach would depend on the specifics of your use case and how you've structured your Dagster assets and resources.
s
@Scout in reference to your above response the value of schema_list_for_query in this format ('staging'), pls refactor accordingly
s
If the
schema_list_for_query
is expected to be a list with a single string element like
'staging'
, you should provide it as a list with one item in your
run_config
. Here's how you can refactor the
run_config
to include the
schema_list_for_query
with the value `('staging',)`:
Copy code
python result = my_asset_job.execute_in_process(     run_config={         "ops": {             "show_col_asset": {                 "config": {                     "schema_list_for_query": ["staging"]                 }             },             "show_col_asset_2": {                 "config": {                     "schema_list_for_query": ["staging"]                 }             },             "show_col_asset_3": {                 "config": {                     "schema_list_for_query": ["staging"]                 }             },             "show_col_asset_4": {                 "config": {                     "schema_list_for_query": ["staging"]                 }             },             "show_col_asset_5": {                 "config": {                     "schema_list_for_query": ["staging"]                 }             },             # Repeat for other assets as needed         }     } )
In this configuration,
schema_list_for_query
is provided as a list with a single string element
["staging"]
for each asset that requires it. This should match the expected config schema for the assets and resolve the error you're encountering.