https://dagster.io/ logo
#ask-ai
Title
# ask-ai
s

SUBHADEEP ROY

03/12/2024, 3:08 PM
@Scout Using Software-defined Assets# Here, we define a subclass of Config holding a single string value representing the name of a user. We can access the config through the config parameter in the asset body. from dagster import asset, Config class MyAssetConfig(Config): person_name: str @asset def greeting(config: MyAssetConfig) -> str: return f"hello {config.person_name}"
s

Scout

03/12/2024, 3:08 PM
It looks like you're defining a software-defined asset in Dagster that takes a configuration parameter. Your
MyAssetConfig
class, which inherits from
Config
, specifies a schema for the asset's configuration. This schema includes a single field,
person_name
, which is of type
str
. The
greeting
asset function takes a
config
parameter of type
MyAssetConfig
. Inside the function, you're using the
person_name
from the config to return a greeting string. However, there's a small syntax error in your code snippet. The arrow (
->
) should be replaced with the Python type hint arrow (
->
). Here's the corrected code:
Copy code
python from dagster import asset, Config  class MyAssetConfig(Config):     person_name: str  @asset def greeting(config: MyAssetConfig) -> str:     return f"hello {config.person_name}"
With this asset definition, you can provide a configuration value for
person_name
when you run the asset, and it will output a greeting message using the provided name. The configuration can be set at runtime, for example, through the Dagster UI's Launchpad or via a run configuration file.
s

SUBHADEEP ROY

03/12/2024, 3:09 PM
@Scout in reference to your above response , i have an asset job mentioned below which i trigger through result = my_repo.get_job("all_assets_job").execute_in_process(), how can i pass schema_list_for_query as a paramterer using above config in dwh_metadata_creation asset .here is the scriptfrom dagster import asset, job, repository import os # from dagster_snowflake import SnowflakeResource from dagster import asset from sqlalchemy import create_engine from dagster import asset from dmax_app.orchestration.orchestration_manager.resources.metadata_resource import ( metadata_resource, ) from dmax_app.orchestration.orchestration_manager.resources.patch_resource_def import ( patch_cloud_storage_selector, ) from dmax_app.orchestration.orchestration_manager.loggers.setup_datamax_logger import ( setup_logger, ) from dmax_app.orchestration.orchestration_enums import ( ENUM_DB_CREDENTIAL_DICT, ENUM_DW_CREDENTIAL_DICT, ) from dmax_app.orchestration.orchestration_enums import ( ENUM_DW_CREDENTIAL_DICT, ) from dmax_app.orchestration.orchestration_manager.resources.warehouse_resource import ( warehouse_resource_selector, ) l_dw_database = ENUM_DW_CREDENTIAL_DICT["database"] l_logger = setup_logger() l_db_schema = ENUM_DB_CREDENTIAL_DICT["schema"] dir_path = os.path.dirname(os.path.realpath(file)) dw_obj = warehouse_resource_selector() def engine_creation(): database_name = ENUM_DB_CREDENTIAL_DICT["database"] user = ENUM_DB_CREDENTIAL_DICT["user"] password = ENUM_DB_CREDENTIAL_DICT["password"] host = ENUM_DB_CREDENTIAL_DICT["host"] port = ENUM_DB_CREDENTIAL_DICT["port"] # l_logger.info(database_name,user,password,host,port) engine = create_engine( f"postgresql//{user}{password}@{host}:{port}/{database_name}" ) return engine @asset( resource_defs=patch_cloud_storage_selector( { "db_obj": metadata_resource, "dw_obj": dw_obj, } ) ) def show_schema_asset(context): # wh_ress = warehouse_resource_selector() db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query = f"SHOW SCHEMAS IN DATABASE {l_dw_database};" l_logger.info(l_query) # result = context.resources.db_obj.execute_query_as_dataframe(sql_query=l_query) df_schema = dw_obj.execute_query_as_dataframe(sql_query=l_query) engine = engine_creation() df_schema.to_sql( "crawler_schemas", engine, schema=l_db_schema, if_exists="replace", index=False ) @asset( resource_defs=patch_cloud_storage_selector( { "db_obj": metadata_resource, "dw_obj": dw_obj, } ) ) def show_table_asset(context): db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query = f"SHOW TABLES IN DATABASE {l_dw_database};" l_logger.info(l_query) df_table = dw_obj.execute_query_as_dataframe(sql_query=l_query) engine = engine_creation() df_table.to_sql( "crawler_tables", engine, schema=l_db_schema, if_exists="replace", index=False ) @asset( resource_defs=patch_cloud_storage_selector( { "db_obj": metadata_resource, "dw_obj": dw_obj, } ) ) def show_col_asset(context): db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query = f"SHOW COLUMNS IN DATABASE {l_dw_database};" l_logger.info(l_query) df_col = dw_obj.execute_query_as_dataframe(sql_query=l_query) # l_logger.info(df_col) df_col = df_col.sort_values(by=["schema_name", "table_name"]).reset_index(drop=True) df_col["ordinal_position"] = df_col.index engine = engine_creation() df_col.to_sql( "crawler_columns", engine, schema=l_db_schema, if_exists="replace", index=False ) @asset( resource_defs=patch_cloud_storage_selector( { "db_obj": metadata_resource, "dw_obj": dw_obj, } ) ) def show_view_asset(context): db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query = f"SHOW VIEWS IN DATABASE {l_dw_database};" l_logger.info(l_query) df_view = dw_obj.execute_query_as_dataframe(sql_query=l_query) engine = engine_creation() df_view.to_sql( "crawler_views", engine, schema=l_db_schema, if_exists="replace", index=False ) @asset( resource_defs=patch_cloud_storage_selector( { "db_obj": metadata_resource, "dw_obj": dw_obj, } ) ) def show_keys_asset(context): db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query = f"SHOW IMPORTED KEYS IN DATABASE {l_dw_database};" l_logger.info(l_query) df_keys = dw_obj.execute_query_as_dataframe(sql_query=l_query) engine = engine_creation() df_keys.to_sql( "crawler_constraints", engine, schema=l_db_schema, if_exists="replace", index=False, ) @asset( resource_defs=patch_cloud_storage_selector( { "db_obj": metadata_resource, "dw_obj": dw_obj, } ) ) def show_pk_keys_asset(context): db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query = f"SHOW PRIMARY KEYS IN DATABASE {l_dw_database};" l_logger.info(l_query) df_pk_keys = dw_obj.execute_query_as_dataframe(sql_query=l_query) engine = engine_creation() df_pk_keys.to_sql( "crawler_pk", engine, schema=l_db_schema, if_exists="replace", index=False ) @asset( resource_defs=patch_cloud_storage_selector( { "db_obj": metadata_resource, "dw_obj": dw_obj, } ) ) def copy_dmax_tables_to_rds(context): db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query = f"SELECT * FROM DMAX_CHANGE_VERSION.DMAX_TABLES" l_logger.info(l_query) df_dmax_tables = dw_obj.execute_query_as_dataframe(sql_query=l_query) l_logger.info(df_dmax_tables) engine = engine_creation() df_dmax_tables.to_sql( "crawler_prod_tables", engine, schema=l_db_schema, if_exists="replace", index=False, ) @asset( resource_defs=patch_cloud_storage_selector( { "db_obj": metadata_resource, "dw_obj": dw_obj, } ) ) def dwh_metadata_creation( context, show_schema_asset, show_table_asset, show_col_asset, show_view_asset, show_keys_asset, show_pk_keys_asset, copy_dmax_tables_to_rds, ): db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query_dwh_metadata = dir_path + "/delta_generation_scripts/dwh_metadata.sql" l_query_dwh = open(l_query_dwh_metadata).read().format(schema=l_db_schema) l_query_dwh_metadata = """drop table if exists DWH_METADATA;CREATE TABLE DWH_METADATA AS {query}""".format( query=l_query_dwh ) db_obj.execute_query(l_query_dwh_metadata) @asset( resource_defs=patch_cloud_storage_selector( { "db_obj": metadata_resource, "dw_obj": dw_obj, } ) ) def rds_metadata_creation(context): db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query_rds_metadata = dir_path + "/delta_generation_scripts/rds_metadata.sql" l_query_rds = open(l_query_rds_metadata).read().format(schema=l_db_schema) l_query_rds_metadata = """drop table if exists RDS_METADATA;CREATE TABLE RDS_METADATA AS {query}""".format( query=l_query_rds ) db_obj.execute_query(l_query_rds_metadata) @asset( resource_defs=patch_cloud_storage_selector( { "db_obj": metadata_resource, "dw_obj": dw_obj, } ) ) def insert_delta_asset(context, dwh_metadata_creation, rds_metadata_creation): db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query_insert_delta = ( dir_path + "/delta_generation_scripts/crawler_identify_inserts.sql" ) l_query_insert = open(l_query_insert_delta).read().format(schema=l_db_schema) l_query_insert_delta = """drop table if exists crawler_identify_inserts; CREATE TABLE crawler_identify_inserts AS {query}""".format( query=l_query_insert ) db_obj.execute_query(l_query_insert_delta) @asset( resource_defs=patch_cloud_storage_selector( { "db_obj": metadata_resource, "dw_obj": dw_obj, } ) ) def delete_delta_asset(context, dwh_metadata_creation, rds_metadata_creation): db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query_delete_delta = ( dir_path + "/delta_generation_scripts/crawler_identify_deletes.sql" ) l_query_delete = open(l_query_delete_delta).read().format(schema=l_db_schema) l_query_delete_delta = """drop table if exists crawler_identify_deletes; CREATE TABLE crawler_identify_deletes AS {query}""".format( query=l_query_delete ) db_obj.execute_query(l_query_delete_delta) @asset( resource_defs=patch_cloud_storage_selector( { "db_obj": metadata_resource, "dw_obj": dw_obj, } ) ) def update_delta_asset(context, dwh_metadata_creation, rds_metadata_creation): db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query_update_delta = ( dir_path + "/delta_generation_scripts/crawler_identify_updates.sql" ) l_query_update = open(l_query_update_delta).read().format(schema=l_db_schema) l_query_update_delta = """drop table if exists crawler_identify_updates; CREATE TABLE crawler_identify_updates AS {query}""".format( query=l_query_update ) db_obj.execute_query(l_query_update_delta) @job( resource_defs=patch_cloud_storage_selector( { "db_obj": metadata_resource, "dw_obj": warehouse_resource_selector(), } ) ) def all_assets_job(): show_schema_asset() show_table_asset() show_schema_asset() show_table_asset() show_col_asset() show_view_asset() show_keys_asset() show_pk_keys_asset() copy_dmax_tables_to_rds() dwh_metadata_creation( show_schema_asset(), show_table_asset(), show_col_asset(), show_view_asset(), show_keys_asset(), show_pk_keys_asset(), copy_dmax_tables_to_rds(), ) rds_metadata_creation() insert_delta_asset( dwh_metadata_creation( show_schema_asset(), show_table_asset(), show_col_asset(), show_view_asset(), show_keys_asset(), show_pk_keys_asset(), copy_dmax_tables_to_rds(), ),rds_metadata_creation() ) delete_delta_asset( dwh_metadata_creation( show_schema_asset(), show_table_asset(), show_col_asset(), show_view_asset(), show_keys_asset(), show_pk_keys_asset(), copy_dmax_tables_to_rds(), ),rds_metadata_creation() ) update_delta_asset( dwh_metadata_creation( show_schema_asset(), show_table_asset(), show_col_asset(), show_view_asset(), show_keys_asset(), show_pk_keys_asset(), copy_dmax_tables_to_rds(), ),rds_metadata_creation() ) # Repository definition that includes the assets and the job @repository def my_repo(): return [ show_schema_asset, show_table_asset, show_col_asset, show_view_asset, show_keys_asset, show_pk_keys_asset, copy_dmax_tables_to_rds, dwh_metadata_creation, rds_metadata_creation, insert_delta_asset, delete_delta_asset, update_delta_asset, all_assets_job, ] # Execute the job in process def crawler_asset_job(p_in_db_obj, l_db_schema,schema_list_for_query): try: db_obj = p_in_db_obj dir_path = os.path.dirname(os.path.realpath(file)) drop_DWH_META_query = """ drop table if exists {schema}.DWH_METADATA;""".format( schema=l_db_schema ) db_obj.execute_query(drop_DWH_META_query) # Asset job triggers result = my_repo.get_job("all_assets_job").execute_in_process() if not result.success: raise Exception("Asset job execution failed") return result except Exception as e: print("Asset job execution failed") if name == "__main__": crawler_asset_job()
s

Scout

03/12/2024, 3:09 PM
To pass
schema_list_for_query
as a parameter to the
dwh_metadata_creation
asset within the
all_assets_job
, you can use the
OpExecutionContext
to access run-time configuration. However, the code you've provided does not show how
schema_list_for_query
is intended to be used within the
dwh_metadata_creation
asset. If
schema_list_for_query
is meant to be a dynamic configuration that changes per execution, you would typically pass it through the run config when executing the job. Here's a general approach to pass parameters to an asset via run config: 1. Define a config schema for the asset that requires the parameter. 2. When executing the job, provide the run config with the necessary parameters. However, in the code you've provided, there is no config schema defined for the assets, and the assets do not take any parameters directly. To pass
schema_list_for_query
to
dwh_metadata_creation
, you would need to modify the asset to accept a config parameter and then provide that config when you execute the job. Here's a simplified example of how you might define a config schema for an asset and then execute the job with a config:
Copy code
python
from dagster import asset, Field, String, job, repository, execute_job

# Define a config schema for the asset
@asset(config_schema={"schema_list_for_query": Field(String)})
def dwh_metadata_creation(context):
    schema_list_for_query = context.op_config["schema_list_for_query"]
    # Use schema_list_for_query in your asset logic
    ...

# Define the job that includes the asset
@job
def all_assets_job():
    dwh_metadata_creation()

# Repository definition that includes the job
@repository
def my_repo():
    return [all_assets_job]

# Execute the job with the config
if __name__ == "__main__":
    result = execute_job(
        job=my_repo.get_job("all_assets_job"),
        run_config={
            "ops": {
                "dwh_metadata_creation": {
                    "config": {
                        "schema_list_for_query": "your_schema_list_here"
                    }
                }
            }
        }
    )
In this example,
dwh_metadata_creation
is defined with a config schema that expects a
schema_list_for_query
parameter. When executing the job, you provide the
schema_list_for_query
value in the run config under the
ops
key, which corresponds to the asset's name. Please note that the actual implementation may vary based on how
schema_list_for_query
is used within your asset and the structure of your project. You would need to adjust the example to fit your specific use case. If
schema_list_for_query
is not meant to be part of the asset's config, you would need to find another way to pass it to the asset, such as through resources or by modifying the asset's logic to r
etrieve it from an external source.
s

SUBHADEEP ROY

03/12/2024, 3:14 PM
@Scout in reference to your aboce response ,this is my whole code can u pls tell what changes to make so that i can pass schema_list_for_query as a parameter to dwh_metadata_creation asset @asset( resource_defs=patch_cloud_storage_selector( { "db_obj": metadata_resource, "dw_obj": dw_obj, } ) ) def show_schema_asset(context): # wh_ress = warehouse_resource_selector() db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query = f"SHOW SCHEMAS IN DATABASE {l_dw_database};" l_logger.info(l_query) # result = context.resources.db_obj.execute_query_as_dataframe(sql_query=l_query) df_schema = dw_obj.execute_query_as_dataframe(sql_query=l_query) engine = engine_creation() df_schema.to_sql( "crawler_schemas", engine, schema=l_db_schema, if_exists="replace", index=False ) @asset( resource_defs=patch_cloud_storage_selector( { "db_obj": metadata_resource, "dw_obj": dw_obj, } ) ) def show_table_asset(context): db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query = f"SHOW TABLES IN DATABASE {l_dw_database};" l_logger.info(l_query) df_table = dw_obj.execute_query_as_dataframe(sql_query=l_query) engine = engine_creation() df_table.to_sql( "crawler_tables", engine, schema=l_db_schema, if_exists="replace", index=False ) @asset( resource_defs=patch_cloud_storage_selector( { "db_obj": metadata_resource, "dw_obj": dw_obj, } ) ) def show_col_asset(context): db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query = f"SHOW COLUMNS IN DATABASE {l_dw_database};" l_logger.info(l_query) df_col = dw_obj.execute_query_as_dataframe(sql_query=l_query) # l_logger.info(df_col) df_col = df_col.sort_values(by=["schema_name", "table_name"]).reset_index(drop=True) df_col["ordinal_position"] = df_col.index engine = engine_creation() df_col.to_sql( "crawler_columns", engine, schema=l_db_schema, if_exists="replace", index=False ) @asset( resource_defs=patch_cloud_storage_selector( { "db_obj": metadata_resource, "dw_obj": dw_obj, } ) ) def show_view_asset(context): db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query = f"SHOW VIEWS IN DATABASE {l_dw_database};" l_logger.info(l_query) df_view = dw_obj.execute_query_as_dataframe(sql_query=l_query) engine = engine_creation() df_view.to_sql( "crawler_views", engine, schema=l_db_schema, if_exists="replace", index=False ) @asset( resource_defs=patch_cloud_storage_selector( { "db_obj": metadata_resource, "dw_obj": dw_obj, } ) ) def show_keys_asset(context): db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query = f"SHOW IMPORTED KEYS IN DATABASE {l_dw_database};" l_logger.info(l_query) df_keys = dw_obj.execute_query_as_dataframe(sql_query=l_query) engine = engine_creation() df_keys.to_sql( "crawler_constraints", engine, schema=l_db_schema, if_exists="replace", index=False, ) @asset( resource_defs=patch_cloud_storage_selector( { "db_obj": metadata_resource, "dw_obj": dw_obj, } ) ) def show_pk_keys_asset(context): db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query = f"SHOW PRIMARY KEYS IN DATABASE {l_dw_database};" l_logger.info(l_query) df_pk_keys = dw_obj.execute_query_as_dataframe(sql_query=l_query) engine = engine_creation() df_pk_keys.to_sql( "crawler_pk", engine, schema=l_db_schema, if_exists="replace", index=False ) @asset( resource_defs=patch_cloud_storage_selector( { "db_obj": metadata_resource, "dw_obj": dw_obj, } ) ) def copy_dmax_tables_to_rds(context): db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query = f"SELECT * FROM DMAX_CHANGE_VERSION.DMAX_TABLES" l_logger.info(l_query) df_dmax_tables = dw_obj.execute_query_as_dataframe(sql_query=l_query) l_logger.info(df_dmax_tables) engine = engine_creation() df_dmax_tables.to_sql( "crawler_prod_tables", engine, schema=l_db_schema, if_exists="replace", index=False, ) @asset( resource_defs=patch_cloud_storage_selector( { "db_obj": metadata_resource, "dw_obj": dw_obj, } ) ) def dwh_metadata_creation( context, show_schema_asset, show_table_asset, show_col_asset, show_view_asset, show_keys_asset, show_pk_keys_asset, copy_dmax_tables_to_rds, ): db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query_dwh_metadata = dir_path + "/delta_generation_scripts/dwh_metadata.sql" l_query_dwh = open(l_query_dwh_metadata).read().format(schema=l_db_schema) l_query_dwh_metadata = """drop table if exists DWH_METADATA;CREATE TABLE DWH_METADATA AS {query}""".format( query=l_query_dwh ) db_obj.execute_query(l_query_dwh_metadata) @asset( resource_defs=patch_cloud_storage_selector( { "db_obj": metadata_resource, "dw_obj": dw_obj, } ) ) def rds_metadata_creation(context): db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query_rds_metadata = dir_path + "/delta_generation_scripts/rds_metadata.sql" l_query_rds = open(l_query_rds_metadata).read().format(schema=l_db_schema) l_query_rds_metadata = """drop table if exists RDS_METADATA;CREATE TABLE RDS_METADATA AS {query}""".format( query=l_query_rds ) db_obj.execute_query(l_query_rds_metadata) @asset( resource_defs=patch_cloud_storage_selector( { "db_obj": metadata_resource, "dw_obj": dw_obj, } ) ) def insert_delta_asset(context, dwh_metadata_creation, rds_metadata_creation): db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query_insert_delta = ( dir_path + "/delta_generation_scripts/crawler_identify_inserts.sql" ) l_query_insert = open(l_query_insert_delta).read().format(schema=l_db_schema) l_query_insert_delta = """drop table if exists crawler_identify_inserts; CREATE TABLE crawler_identify_inserts AS {query}""".format( query=l_query_insert ) db_obj.execute_query(l_query_insert_delta) @asset( resource_defs=patch_cloud_storage_selector( { "db_obj": metadata_resource, "dw_obj": dw_obj, } ) ) def delete_delta_asset(context, dwh_metadata_creation, rds_metadata_creation): db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query_delete_delta = ( dir_path + "/delta_generation_scripts/crawler_identify_deletes.sql" ) l_query_delete = open(l_query_delete_delta).read().format(schema=l_db_schema) l_query_delete_delta = """drop table if exists crawler_identify_deletes; CREATE TABLE crawler_identify_deletes AS {query}""".format( query=l_query_delete ) db_obj.execute_query(l_query_delete_delta) @asset( resource_defs=patch_cloud_storage_selector( { "db_obj": metadata_resource, "dw_obj": dw_obj, } ) ) def update_delta_asset(context, dwh_metadata_creation, rds_metadata_creation): db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query_update_delta = ( dir_path + "/delta_generation_scripts/crawler_identify_updates.sql" ) l_query_update = open(l_query_update_delta).read().format(schema=l_db_schema) l_query_update_delta = """drop table if exists crawler_identify_updates; CREATE TABLE crawler_identify_updates AS {query}""".format( query=l_query_update ) db_obj.execute_query(l_query_update_delta) @job( resource_defs=patch_cloud_storage_selector( { "db_obj": metadata_resource, "dw_obj": warehouse_resource_selector(), } ) ) def all_assets_job(): show_schema_asset() show_table_asset() show_schema_asset() show_table_asset() show_col_asset() show_view_asset() show_keys_asset() show_pk_keys_asset() copy_dmax_tables_to_rds() dwh_metadata_creation( show_schema_asset(), show_table_asset(), show_col_asset(), show_view_asset(), show_keys_asset(), show_pk_keys_asset(), copy_dmax_tables_to_rds(), ) rds_metadata_creation() insert_delta_asset( dwh_metadata_creation( show_schema_asset(), show_table_asset(), show_col_asset(), show_view_asset(), show_keys_asset(), show_pk_keys_asset(), copy_dmax_tables_to_rds(), ),rds_metadata_creation() ) delete_delta_asset( dwh_metadata_creation( show_schema_asset(), show_table_asset(), show_col_asset(), show_view_asset(), show_keys_asset(), show_pk_keys_asset(), copy_dmax_tables_to_rds(), ),rds_metadata_creation() ) update_delta_asset( dwh_metadata_creation( show_schema_asset(), show_table_asset(), show_col_asset(), show_view_asset(), show_keys_asset(), show_pk_keys_asset(), copy_dmax_tables_to_rds(), ),rds_metadata_creation() ) # Repository definition that includes the assets and the job @repository def my_repo(): return [ show_schema_asset, show_table_asset, show_col_asset, show_view_asset, show_keys_asset, show_pk_keys_asset, copy_dmax_tables_to_rds, dwh_metadata_creation, rds_metadata_creation, insert_delta_asset, delete_delta_asset, update_delta_asset, all_assets_job, ] # Execute the job in process def crawler_asset_job(p_in_db_obj, l_db_schema,schema_list_for_query): try: db_obj = p_in_db_obj dir_path = os.path.dirname(os.path.realpath(file)) drop_DWH_META_query = """ drop table if exists {schema}.DWH_METADATA;""".format( schema=l_db_schema ) db_obj.execute_query(drop_DWH_META_query) # Asset job triggers result = my_repo.get_job("all_assets_job").execute_in_process() if not result.success: raise Exception("Asset job execution failed") return result except Exception as e: print("Asset job execution failed") if name == "__main__": crawler_asset_job()
s

Scout

03/12/2024, 3:14 PM
To pass
schema_list_for_query
as a parameter to the
dwh_metadata_creation
asset, you will need to modify the asset to accept a configuration parameter. Here's how you can do it: 1. Define a config schema for the
dwh_metadata_creation
asset. 2. Modify the
dwh_metadata_creation
asset to access the config parameter. 3. Provide the config parameter when executing the job. Here's how you can implement these steps: Step 1: Define a config schema for the
dwh_metadata_creation
asset.
Copy code
python
from dagster import Field, String

@asset(
    config_schema={"schema_list_for_query": Field(String)},
    resource_defs=patch_cloud_storage_selector(
        {
            "db_obj": metadata_resource,
            "dw_obj": dw_obj,
        }
    )
)
def dwh_metadata_creation(context, ...):  # other parameters
    schema_list_for_query = context.op_config["schema_list_for_query"]
    # Use schema_list_for_query in your asset logic
    ...
Step 2: Modify the
dwh_metadata_creation
asset to access the config parameter. You've already added the line to access the config parameter in the asset definition above. Now you can use
schema_list_for_query
within your asset logic. Step 3: Provide the config parameter when executing the job. When you execute the job, you need to provide the
schema_list_for_query
parameter in the run config. Since you're using
execute_in_process
, you can pass the run config directly as an argument:
Copy code
python
def crawler_asset_job(p_in_db_obj, l_db_schema, schema_list_for_query):
    try:
        db_obj = p_in_db_obj
        dir_path = os.path.dirname(os.path.realpath(__file__))
        drop_DWH_META_query = f"drop table if exists {l_db_schema}.DWH_METADATA;"
        db_obj.execute_query(drop_DWH_META_query)

        # Asset job triggers with run config
        result = my_repo.get_job("all_assets_job").execute_in_process(
            run_config={
                "ops": {
                    "dwh_metadata_creation": {
                        "config": {
                            "schema_list_for_query": schema_list_for_query
                        }
                    }
                }
            }
        )

        if not result.success:
            raise Exception("Asset job execution failed")

        return result
    except Exception as e:
        print(f"Asset job execution failed: {e}")
Make sure to replace
...
with the actual parameters that
dwh_metadata_creation
takes, if any. Please note that the
execute_in_process
method is typically used for testing and development. For production scenarios, you would use the Dagster instance to run the job, and the run config would be provided through the Dagster UI or a YAML configuration file. Also, ensure that the
schema_list_for_query
is a string th
at matches the expected format for your asset logic. If it's a list or another type, you'll need to adjust the
Field
type accordingly.
s

SUBHADEEP ROY

03/12/2024, 3:39 PM
@Scout the schema_list_for_query is in format like ('abc','asd') so instead of Field(String) what should it be
s

Scout

03/12/2024, 3:39 PM
If
schema_list_for_query
is a tuple of strings, you can represent it as a list of strings in the config schema. In Dagster, you would use
Field(List[str])
to represent a list of strings. However, since you mentioned the format
('abc', 'asd')
, which is a Python tuple, you'll need to handle it as a list in the config and then convert it to a tuple within your asset if necessary. Here's how you can define the config schema for a list of strings:
Copy code
python from dagster import Field, List, String  @asset(     config_schema={"schema_list_for_query": Field(List[String])},     resource_defs=patch_cloud_storage_selector(         {             "db_obj": metadata_resource,             "dw_obj": dw_obj,         }     ) ) def dwh_metadata_creation(context, ...):  # other parameters     schema_list_for_query = tuple(context.op_config["schema_list_for_query"])     # Now schema_list_for_query is a tuple, and you can use it in your asset logic     ...
When you execute the job, you will provide the list of strings in the run config:
Copy code
python result = my_repo.get_job("all_assets_job").execute_in_process(     run_config={         "ops": {             "dwh_metadata_creation": {                 "config": {                     "schema_list_for_query": ["abc", "asd"]  # List of strings                 }             }         }     } )
In the asset function,
context.op_config["schema_list_for_query"]
will retrieve the list, and
tuple(...)
will convert it to a tuple, which matches the format you specified. Make sure to replace
...
with the actual parameters that
dwh_metadata_creation
takes, if any.
s

SUBHADEEP ROY

03/12/2024, 3:43 PM
@Scout in reference to your above response getting this error E dagster._core.errors.DagsterInvalidDefinitionError: Cannot use List in the context of config. Please use a python list (e.g. [int]) or dagster.Array (e.g. Array(int)) instead.
s

Scout

03/12/2024, 3:43 PM
The error message you're encountering indicates that you should not use
List
from the
typing
module in the context of Dagster config. Instead, you should use a Python list (e.g.,
[str]
) or
dagster.Array
(e.g.,
Array(str)
) for defining a list of items in the config schema. Here's how you can define the config schema using a Python list for a list of strings:
Copy code
python from dagster import Field, Array, String  @asset(     config_schema={"schema_list_for_query": Field(Array(String))},     resource_defs=patch_cloud_storage_selector(         {             "db_obj": metadata_resource,             "dw_obj": dw_obj,         }     ) ) def dwh_metadata_creation(context, ...):  # other parameters     schema_list_for_query = tuple(context.op_config["schema_list_for_query"])     # Now schema_list_for_query is a tuple, and you can use it in your asset logic     ...
When you execute the job, you will provide the list of strings in the run config as before:
Copy code
python result = my_repo.get_job("all_assets_job").execute_in_process(     run_config={         "ops": {             "dwh_metadata_creation": {                 "config": {                     "schema_list_for_query": ["abc", "asd"]  # List of strings                 }             }         }     } )
This should resolve the error you're encountering. The
Array
type is used to define a list of items in Dagster's config system, and
String
specifies that each item in the array is a string. The
Field
wrapper is used to define a field in the config schema. When you retrieve the config value in your asset, it will be a list, which you can then convert to a tuple if needed.
s

SUBHADEEP ROY

03/12/2024, 3:52 PM
@Scout in reference to your above response the schema_list_for_query is in format like ('abc','asd') and not a list . pls reply with correct answer
s

Scout

03/12/2024, 3:52 PM
There was an error processing your request. Please try again.
I apologize for the confusion. If
schema_list_for_query
is expected to be a string that represents a tuple, such as
"(abc, asd)"
, you can define it as a single string in the config schema and then parse it as needed within your asset. Here's how you can define the config schema using a single string:
Copy code
python from dagster import Field, String  @asset(     config_schema={"schema_list_for_query": Field(String)},     resource_defs=patch_cloud_storage_selector(         {             "db_obj": metadata_resource,             "dw_obj": dw_obj,         }     ) ) def dwh_metadata_creation(context, ...):  # other parameters     # Retrieve the string from the config     schema_list_for_query_str = context.op_config["schema_list_for_query"]     # Parse the string to get the tuple, assuming the string is formatted like "(abc, asd)"     schema_list_for_query = eval(schema_list_for_query_str)     # Now schema_list_for_query is a tuple, and you can use it in your asset logic     ...
When you execute the job, you will provide the string that represents a tuple in the run config:
Copy code
python result = my_repo.get_job("all_assets_job").execute_in_process(     run_config={         "ops": {             "dwh_metadata_creation": {                 "config": {                     "schema_list_for_query": "('abc', 'asd')"  # String that represents a tuple                 }             }         }     } )
Please note that using
eval
can be risky if the input is not controlled or sanitized, as it will execute arbitrary code. If you have control over the format of
schema_list_for_query
, ensure it is always in a safe, expected format. If the input comes from an untrusted source, consider using a safer parsing method to convert the string to a tuple.
s

SUBHADEEP ROY

03/12/2024, 3:58 PM
@Scout getting this error now 'Error in config for job\n Error 1: Invalid scalar at path rootopsdwh_metadata_creationconfigschema_list_for_query. Value "(\'abc\', \'bcd\')" of type "<class \'tuple\'>" is not valid for expected type "String".\n Error 2: Invalid scalar at path rootopsdwh_metadata_creation_2configschema_list_for_query. Value "(\'abc\', \'bcd\')" of type "<class \'tuple\'>" is not valid for expected type "String".\n Error 3: Invalid scalar at path rootopsdwh_metadata_creation_3configschema_list_for_query. Value "(\'abc\', \'bcd\')" of type "<class \'tuple\'>" is not valid for expected type "String".\n Error 4: Invalid scalar at path rootopsdwh_metadata_creation_4configschema_list_for_query. Value "(\'abc\', \'bcd\')" of type "<class \'tuple\'>" is not valid for expected type "String".'
s

Scout

03/12/2024, 3:58 PM
The error message you're seeing indicates that Dagster is expecting a string for the
schema_list_for_query
config field, but it's receiving a tuple instead. This suggests that the value provided in the run config is being interpreted as a tuple rather than a string. To resolve this issue, ensure that the value you provide for
schema_list_for_query
in the run config is a string. Specifically, you should ensure that the value is enclosed in quotes to be treated as a string literal in the YAML or JSON format of the run config. Here's how you should format the run config:
Copy code
python result = my_repo.get_job("all_assets_job").execute_in_process(     run_config={         "ops": {             "dwh_metadata_creation": {                 "config": {                     # Ensure the value is a string literal, not a tuple                     "schema_list_for_query": "('abc', 'bcd')"                 }             }         }     } )
In this run config, the value for
schema_list_for_query
is a string that looks like a tuple, but it's still a string because it's enclosed in quotes. This should match the expected type "String" in the config schema. If you continue to encounter issues, please double-check that the run config is being provided correctly in the format expected by Dagster, and that no additional parsing or transformation is occurring that might change the string into a tuple before it's passed to Dagster.
s

SUBHADEEP ROY

03/12/2024, 4:32 PM
@Scout i have stored the values in the variable like this schema_list_for_query = ('abc','bcd') , how can i pass the variable in run config so that it doesnt errors out
s

Scout

03/12/2024, 4:32 PM
If you have a Python variable
schema_list_for_query
that is a tuple and you want to pass it as a string in the run config, you need to convert the tuple to a string that represents the tuple. You can do this by using the
str()
function in Python, which will give you a string representation of the tuple. Here's how you can convert the tuple to a string and use it in the run config:
Copy code
python # Your tuple schema_list_for_query = ('abc', 'bcd')  # Convert the tuple to a string that represents the tuple schema_list_for_query_str = str(schema_list_for_query)  # Now pass the string representation to the run config result = my_repo.get_job("all_assets_job").execute_in_process(     run_config={         "ops": {             "dwh_metadata_creation": {                 "config": {                     # Pass the string representation of the tuple                     "schema_list_for_query": schema_list_for_query_str                 }             }         }     } )
In this code,
schema_list_for_query_str
will be a string that looks like
"('abc', 'bcd')"
. When you pass this string to the run config, it should not error out because it matches the expected type "String" in the config schema. Make sure that when you define the config schema for the
dwh_metadata_creation
asset, you expect a string:
Copy code
python @asset(     config_schema={"schema_list_for_query": Field(String)},     # ... other parameters ) def dwh_metadata_creation(context, ...):     # ... asset implementation
And within the asset, if you need to use it as a tuple, you can safely convert it back:
Copy code
python def dwh_metadata_creation(context, ...):     schema_list_for_query_str = context.op_config["schema_list_for_query"]     schema_list_for_query = eval(schema_list_for_query_str)     # Now schema_list_for_query is a tuple again     # ... use the tuple in your asset logic
Please be cautious with the use of
eval()
, as it can execute arbitrary code. It should only be used if you are certain of the safety of the input. If the input is coming from an untrusted source, consider using a safer parsing method.
s

SUBHADEEP ROY

03/13/2024, 2:56 PM
@Scout i am getting KeyError(' l_wh_schema_ignore_list ') in the below asset job code pls tell the solution @asset( resource_defs=patch_cloud_storage_selector( { "db_obj": metadata_resource, "dw_obj": dw_obj, } ) ) def copy_dmax_tables_to_rds(context): db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query = f"SELECT * FROM DMAX_CHANGE_VERSION.DMAX_TABLES" l_logger.info(l_query) df_dmax_tables = dw_obj.execute_query_as_dataframe(sql_query=l_query) l_logger.info(df_dmax_tables) engine = engine_creation() df_dmax_tables.to_sql( "crawler_prod_tables", engine, schema=l_db_schema, if_exists="replace", index=False, ) @asset( config_schema={"schema_list_for_query": Field(String)}, resource_defs=patch_cloud_storage_selector( { "db_obj": metadata_resource, "dw_obj": dw_obj, "db_query_obj": metadata_query_resource, } ), ) def dwh_metadata_creation( context, show_schema_asset, show_table_asset, show_col_asset, show_view_asset, show_keys_asset, show_pk_keys_asset, copy_dmax_tables_to_rds, ): db_obj = context.resources.db_obj db_query_obj = context.resources.db_query_obj l_global_var_dict = db_query_obj.get_global_variable() l_wh_schema_ignore_list = l_global_var_dict["wh_schema_ignore_list"] schema_list_for_query_str = context.op_config["schema_list_for_query"] if len(schema_list_for_query_str): schema_where_cond = f' and lower(ds."name") in {schema_list_for_query_str}' fk_schema_where_cond = ( f' and lower(dp."schema_name") in {schema_list_for_query_str}' ) else: schema_where_cond = "" fk_schema_where_cond = "" l_query_dwh_metadata = dir_path + "/delta_generation_scripts/dwh_metadata.sql" l_query_dwh = ( open(l_query_dwh_metadata) .read() .format( schema_list=schema_where_cond, fk_schema_list=fk_schema_where_cond, l_wh_schema_ignore_list=l_wh_schema_ignore_list, ) ) l_query_dwh_metadata = """drop table if exists DWH_METADATA;CREATE TABLE DWH_METADATA AS {query}""".format( query=l_query_dwh ) db_obj.execute_query(l_query_dwh_metadata) @asset( config_schema={"schema_list_for_query": Field(String)}, resource_defs=patch_cloud_storage_selector( { "db_obj": metadata_resource, "dw_obj": dw_obj, "db_query_obj": metadata_query_resource, } ), ) def rds_metadata_creation(context): db_obj = context.resources.db_obj db_query_obj = context.resources.db_query_obj l_global_var_dict = db_query_obj.get_global_variable() l_wh_schema_ignore_list = l_global_var_dict["wh_schema_ignore_list"] schema_list_for_query_str = context.op_config["schema_list_for_query"] l_wh_schema_ignore_list_str = f" ({l_wh_schema_ignore_list})" if len(schema_list_for_query_str): schema_where_cond = f' and lower("schema_name") in {schema_list_for_query_str}' else: schema_where_cond = "" l_query_rds_metadata = dir_path + "/delta_generation_scripts/rds_metadata.sql" l_query_rds = ( open(l_query_rds_metadata) .read() .format( schema_list=schema_where_cond, l_wh_schema_ignore_list=l_wh_schema_ignore_list_str, ) ) l_query_rds_metadata = """drop table if exists RDS_METADATA;CREATE TABLE RDS_METADATA AS {query}""".format( query=l_query_rds ) db_obj.execute_query(l_query_rds_metadata) @asset( resource_defs=patch_cloud_storage_selector( { "db_obj": metadata_resource, "dw_obj": dw_obj, } ) ) def insert_delta_asset(context, dwh_metadata_creation, rds_metadata_creation): db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query_insert_delta = ( dir_path + "/delta_generation_scripts/crawler_identify_inserts.sql" ) l_query_insert = open(l_query_insert_delta).read().format(schema=l_db_schema) l_query_insert_delta = """drop table if exists crawler_identify_inserts; CREATE TABLE crawler_identify_inserts AS {query}""".format( query=l_query_insert ) db_obj.execute_query(l_query_insert_delta) @asset( resource_defs=patch_cloud_storage_selector( { "db_obj": metadata_resource, "dw_obj": dw_obj, } ) ) def delete_delta_asset(context, dwh_metadata_creation, rds_metadata_creation): db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query_delete_delta = ( dir_path + "/delta_generation_scripts/crawler_identify_deletes.sql" ) l_query_delete = open(l_query_delete_delta).read().format(schema=l_db_schema) l_query_delete_delta = """drop table if exists crawler_identify_deletes; CREATE TABLE crawler_identify_deletes AS {query}""".format( query=l_query_delete ) db_obj.execute_query(l_query_delete_delta) @asset( resource_defs=patch_cloud_storage_selector( { "db_obj": metadata_resource, "dw_obj": dw_obj, } ) ) def update_delta_asset(context, dwh_metadata_creation, rds_metadata_creation): db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query_update_delta = ( dir_path + "/delta_generation_scripts/crawler_identify_updates.sql" ) l_query_update = open(l_query_update_delta).read().format(schema=l_db_schema) l_query_update_delta = """drop table if exists crawler_identify_updates; CREATE TABLE crawler_identify_updates AS {query}""".format( query=l_query_update ) db_obj.execute_query(l_query_update_delta) @job( resource_defs=patch_cloud_storage_selector( { "db_obj": metadata_resource, "dw_obj": warehouse_resource_selector(), "db_query_obj": metadata_query_resource, } ) ) def all_assets_job(): """ This function is named
all_assets_job
and likely performs a task related to handling all assets. """ show_schema_asset() show_table_asset() show_schema_asset() show_table_asset() show_col_asset() show_view_asset() show_keys_asset() show_pk_keys_asset() copy_dmax_tables_to_rds() dwh_metadata_creation( show_schema_asset(), show_table_asset(), show_col_asset(), show_view_asset(), show_keys_asset(), show_pk_keys_asset(), copy_dmax_tables_to_rds(), ) rds_metadata_creation() insert_delta_asset( dwh_metadata_creation( show_schema_asset(), show_table_asset(), show_col_asset(), show_view_asset(), show_keys_asset(), show_pk_keys_asset(), copy_dmax_tables_to_rds(), ), rds_metadata_creation(), ) delete_delta_asset( dwh_metadata_creation( show_schema_asset(), show_table_asset(), show_col_asset(), show_view_asset(), show_keys_asset(), show_pk_keys_asset(), copy_dmax_tables_to_rds(), ), rds_metadata_creation(), ) update_delta_asset( dwh_metadata_creation( show_schema_asset(), show_table_asset(), show_col_asset(), show_view_asset(), show_keys_asset(), show_pk_keys_asset(), copy_dmax_tables_to_rds(), ), rds_metadata_creation(), ) # Repository definition that includes the assets and the job @repository def my_repo(): return [ show_schema_asset, show_table_asset, show_col_asset, show_view_asset, show_keys_asset, show_pk_keys_asset, copy_dmax_tables_to_rds, dwh_metadata_creation, rds_metadata_creation, insert_delta_asset, delete_delta_asset, update_delta_asset, all_assets_job, ] # Execute the job in process def crawler_asset_job(p_in_db_obj, l_db_schema, schema_list_for_query): try: db_obj = p_in_db_obj dir_path = os.path.dirname(os.path.realpath(file)) drop_DWH_META_query = """ drop table if exists {schema}.DWH_METADATA;""".format( schema=l_db_schema ) db_obj.execute_query(drop_DWH_META_query) # Asset job triggers schema_list_for_query = f"('int_call_act','dwh_call_act')" schema_list_for_query_str = str(schema_list_for_query) result = my_repo.get_job("all_assets_job").execute_in_process( run_config={ "ops": { "dwh_metadata_creation": { "config": {"schema_list_for_query": schema_list_for_query_str} }, "dwh_metadata_creation_2": { "config": {"schema_list_for_query": schema_list_for_query_str} }, "dwh_metadata_creation_3": { "config": {"schema_list_for_query": schema_list_for_query_str} }, "dwh_metadata_creation_4": { "config": {"schema_list_for_query": schema_list_for_query_str} }, "rds_metadata_creation": { "config": {"schema_list_for_query": schema_list_for_query_str} }, "rds_metadata_creation_2": { "config": {"schema_list_for_query": schema_list_for_query_str} }, "rds_metadata_creation_3": { "config": {"schema_list_for_query": schema_list_for_query_str} }, "rds_metadata_creation_4": { "config": {"schema_list_for_query": schema_list_for_query_str} }, } } ) if not result.success: raise Exception("Asset job execution failed") return result except Exception as e: print("Asset job execution failed") if name == "__main__": crawler_asset_job()
s

Scout

03/13/2024, 2:56 PM
The
KeyError('l_wh_schema_ignore_list')
suggests that the key
wh_schema_ignore_list
is not found in the dictionary
l_global_var_dict
when the code attempts to access it. This could happen if the key does not exist in the dictionary returned by
db_query_obj.get_global_variable()
. To resolve this issue, you should ensure that the key
wh_schema_ignore_list
is indeed present in the dictionary returned by
get_global_variable()
. Here are a few steps you can take to debug and fix the problem: 1. Verify that the
get_global_variable()
method of the
db_query_obj
resource is returning a dictionary that contains the key
wh_schema_ignore_list
. You can add a logging statement to print out the dictionary to confirm its contents. 2. If the key is supposed to be optional, you can provide a default value when accessing it using the
get()
method of the dictionary. For example:
Copy code
python l_wh_schema_ignore_list = l_global_var_dict.get("wh_schema_ignore_list", default_value)
Replace
default_value
with the value you want to use if
wh_schema_ignore_list
is not found. 3. If the key is required and should always be present, you need to ensure that the logic that populates the
l_global_var_dict
dictionary includes this key. This might involve checking the configuration or the logic of the
metadata_query_resource
. 4. If the
get_global_variable()
method is supposed to fetch data from an external source (like a database or a configuration file), make sure that the data source contains the required key and that the method is correctly retrieving it. 5. If the key is dynamically determined and may change, you may need to adjust your logic to handle cases where the key is not present. Once you've confirmed that the key is present in the dictionary or handled its absence appropriately, the
KeyError
should be resolved. If you continue to face issues, you may need to provide more context or review the implementation of the
metadata_query_resource
and its
get_global_variable()
method.
s

SUBHADEEP ROY

03/13/2024, 3:02 PM
@Scout i am getting this value in l_wh_schema_ignore_list = "'dmax_change_version'"
s

Scout

03/13/2024, 3:02 PM
If you are receiving the value
"'dmax_change_version'"
(including the single quotes) in the variable
l_wh_schema_ignore_list
, it suggests that the value is being interpreted as a string that contains single quotes, rather than just the string
dmax_change_version
. This could potentially cause issues if you are using this value in SQL queries or other contexts where the single quotes are not expected or required. To address this, you can strip the single quotes from the string before using it. Here's how you can modify the value to remove the single quotes:
Copy code
python # Original value with single quotes included l_wh_schema_ignore_list = "'dmax_change_version'"  # Remove single quotes from the string l_wh_schema_ignore_list = l_wh_schema_ignore_list.strip("'")  # Now l_wh_schema_ignore_list should be 'dmax_change_version' without the single quotes
After stripping the single quotes, you can use
l_wh_schema_ignore_list
in your asset logic where it is expected to be a string without single quotes. If
l_wh_schema_ignore_list
is supposed to be a list of strings and you are receiving it as a single string, you might need to parse it into a list. For example, if
l_wh_schema_ignore_list
should be
["dmax_change_version"]
, you would need to adjust the code that sets this value to ensure it's in the correct format. Make sure to review the context in which
l_wh_schema_ignore_list
is used and adjust the handling of this variable accordingly. If the value is coming from an external source, such as a configuration file or environment variable, you may also need to check that source to ensure the value is being provided in the correct format.
4 Views