https://dagster.io/ logo
#ask-ai
Title
# ask-ai
s

SUBHADEEP ROY

03/09/2024, 11:12 AM
@Scout i have an asset job import pandas as pd from dagster import asset, job, repository import os # from dagster_snowflake import SnowflakeResource from dagster import asset from sqlalchemy import create_engine from dagster import asset from dmax_app.orchestration.orchestration_manager.resources.metadata_resource import metadata_resource from dmax_app.orchestration.orchestration_manager.resources.patch_resource_def import patch_cloud_storage_selector from dmax_app.orchestration.orchestration_manager.resources.snowflake_crawler_resource import ( SnowflakeCrawlConnection, ) from dmax_app.orchestration.orchestration_manager.loggers.setup_datamax_logger import ( setup_logger, ) from dmax_app.orchestration.orchestration_enums import ( ENUM_DB_CREDENTIAL_DICT, ENUM_DW_CREDENTIAL_DICT, ) from dmax_app.orchestration.orchestration_enums import ( ENUM_DW_CREDENTIAL_DICT, ) from dmax_app.orchestration.orchestration_manager.resources.warehouse_resource import warehouse_resource_selector # wh_res = SnowflakeCrawlConnection() l_dw_database = ENUM_DW_CREDENTIAL_DICT["database"] l_logger = setup_logger() l_db_schema = ENUM_DB_CREDENTIAL_DICT["schema"] dir_path = os.path.dirname(os.path.realpath(file)) def engine_creation(): database_name = ENUM_DB_CREDENTIAL_DICT["database"] user = ENUM_DB_CREDENTIAL_DICT["user"] password = ENUM_DB_CREDENTIAL_DICT["password"] host = ENUM_DB_CREDENTIAL_DICT["host"] port = ENUM_DB_CREDENTIAL_DICT["port"] # l_logger.info(database_name,user,password,host,port) engine = create_engine( f"postgresql//{user}{password}@{host}:{port}/{database_name}" ) return engine dw_obj =warehouse_resource_selector() @asset( resource_defs=patch_cloud_storage_selector({ "db_obj": metadata_resource, "dw_obj": dw_obj, })) def show_schema_asset(context) -> pd.DataFrame: # wh_ress = warehouse_resource_selector() db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query = f"SHOW SCHEMAS IN DATABASE {l_dw_database};" l_logger.info(l_query) # result = context.resources.db_obj.execute_query_as_dataframe(sql_query=l_query) df_schema = dw_obj.execute_query_as_dataframe(sql_query=l_query) engine = engine_creation() df_schema.to_sql( "crawler_schemas", engine, schema=l_db_schema, if_exists="replace", index=False ) @asset( resource_defs=patch_cloud_storage_selector({ "db_obj": metadata_resource, "dw_obj": dw_obj, })) def show_table_asset(context) -> pd.DataFrame: db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query = f"SHOW TABLES IN DATABASE {l_dw_database};" l_logger.info(l_query) df_table = dw_obj.execute_query_as_dataframe(sql_query=l_query) engine = engine_creation() df_table.to_sql( "crawler_tables", engine, schema=l_db_schema, if_exists="replace", index=False ) @asset( resource_defs=patch_cloud_storage_selector({ "db_obj": metadata_resource, "dw_obj": dw_obj, })) def show_col_asset(context) -> pd.DataFrame: db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query = f"SHOW COLUMNS IN DATABASE {l_dw_database};" l_logger.info(l_query) df_col = dw_obj.execute_query_as_dataframe(sql_query=l_query) # l_logger.info(df_col) df_col = df_col.sort_values(by=["schema_name", "table_name"]).reset_index(drop=True) df_col["ordinal_position"] = df_col.index engine = engine_creation() df_col.to_sql( "crawler_columns", engine, schema=l_db_schema, if_exists="replace", index=False ) @asset( resource_defs=patch_cloud_storage_selector({ "db_obj": metadata_resource, "dw_obj": dw_obj, })) def show_view_asset(context) -> pd.DataFrame: db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query = f"SHOW VIEWS IN DATABASE {l_dw_database};" l_logger.info(l_query) df_view = dw_obj.execute_query_as_dataframe(sql_query=l_query) engine = engine_creation() df_view.to_sql( "crawler_views", engine, schema=l_db_schema, if_exists="replace", index=False ) @asset( resource_defs=patch_cloud_storage_selector({ "db_obj": metadata_resource, "dw_obj": dw_obj, })) def show_keys_asset(context) -> pd.DataFrame: db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query = f"SHOW IMPORTED KEYS IN DATABASE {l_dw_database};" l_logger.info(l_query) df_keys = dw_obj.execute_query_as_dataframe(sql_query=l_query) engine = engine_creation() df_keys.to_sql( "crawler_constraints", engine, schema=l_db_schema, if_exists="replace", index=False, ) @asset( resource_defs=patch_cloud_storage_selector({ "db_obj": metadata_resource, "dw_obj": dw_obj, })) def show_pk_keys_asset(context) -> pd.DataFrame: db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query = f"SHOW PRIMARY KEYS IN DATABASE {l_dw_database};" l_logger.info(l_query) df_pk_keys = dw_obj.execute_query_as_dataframe(sql_query=l_query) engine = engine_creation() df_pk_keys.to_sql( "crawler_pk", engine, schema=l_db_schema, if_exists="replace", index=False ) @asset( resource_defs=patch_cloud_storage_selector({ "db_obj": metadata_resource, "dw_obj": dw_obj, })) def show_prod_dmax_tables(context) -> pd.DataFrame: db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query = f"SELECT * FROM DMAX_CHANGE_VERSION.DMAX_TABLES" l_logger.info(l_query) df_dmax_tables = dw_obj.execute_query_as_dataframe(sql_query=l_query) l_logger.info(df_dmax_tables) engine = engine_creation() df_dmax_tables.to_sql( "crawler_prod_tables", engine, schema=l_db_schema, if_exists="replace", index=False, ) @asset( resource_defs=patch_cloud_storage_selector({ "db_obj": metadata_resource, "dw_obj": dw_obj, })) def dwh_metadata_creation(context,show_schema_asset,show_table_asset,show_col_asset,show_view_asset,show_keys_asset,show_pk_keys_asset,show_prod_dmax_tables) -> pd.DataFrame: db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query_dwh_metadata = dir_path + "/delta_generation_scripts/dwh_metadata.sql" l_query_dwh = open(l_query_dwh_metadata).read().format(schema=l_db_schema) l_query_dwh_metadata = """CREATE TABLE DWH_METADATA AS {query}""".format( query=l_query_dwh ) db_obj.execute_query(l_query_dwh_metadata) @asset( resource_defs=patch_cloud_storage_selector({ "db_obj": metadata_resource, "dw_obj": dw_obj, })) def insert_delta_asset(context,dwh_metadata_creation): db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query_insert_delta = ( dir_path + "/delta_generation_scripts/crawler_identify_inserts.sql" ) l_query_insert = open(l_query_insert_delta).read().format(schema=l_db_schema) l_query_insert_delta = """drop table if exists crawler_identify_inserts; CREATE TABLE crawler_identify_inserts AS {query}""".format( query=l_query_insert ) db_obj.execute_query(l_query_insert_delta) @asset( resource_defs=patch_cloud_storage_selector({ "db_obj": metadata_resource, "dw_obj": dw_obj, })) def delete_delta_asset(context,dwh_metadata_creation): db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query_delete_delta = ( dir_path + "/delta_generation_scripts/crawler_identify_deletes.sql" ) l_query_delete = open(l_query_delete_delta).read().format(schema=l_db_schema) l_query_delete_delta = """drop table if exists crawler_identify_ideletes; CREATE TABLE crawler_identify_ideletes AS {query}""".format( query=l_query_delete ) db_obj.execute_query(l_query_delete_delta) @asset( resource_defs=patch_cloud_storage_selector({ "db_obj": metadata_resource, "dw_obj": dw_obj, })) def update_delta_asset(context,dwh_metadata_creation): db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query_update_delta = ( dir_path + "/delta_generation_scripts/crawler_identify_updates.sql" ) l_query_update = open(l_query_update_delta).read().format(schema=l_db_schema) l_query_update_delta = """drop table if exists crawler_identify_updates; CREATE TABLE crawler_identify_updates AS {query}""".format( query=l_query_update ) db_obj.execute_query(l_query_update_delta) @job( resource_defs=patch_cloud_storage_selector({ "db_obj": metadata_resource, "dw_obj": warehouse_resource_selector(), }) ) def all_assets_job(): show_schema_asset() show_table_asset() show_schema_asset() show_table_asset() show_col_asset() show_view_asset() show_keys_asset() show_pk_keys_asset() show_prod_dmax_tables() dwh_metadata_creation(show_schema_asset(),show_table_asset(),show_col_asset(),show_view_asset(),show_keys_asset(),show_pk_keys_asset(),show_prod_dmax_tables()) insert_delta_asset(dwh_metadata_creation()) delete_delta_asset(dwh_metadata_creation()) update_delta_asset(dwh_metadata_creation()) # Repository definition that includes the assets and the job @repository def my_repo(): return [ show_schema_asset, show_table_asset, show_col_asset, show_view_asset, show_keys_asset, show_pk_keys_asset, show_prod_dmax_tables, dwh_metadata_creation, insert_delta_asset, delete_delta_asset, update_delta_asset, all_assets_job ] # Execute the job in process def crawler_asset_job(p_in_db_obj, l_db_schema): try: db_obj = p_in_db_obj # schema = l_db_schema dir_path = os.path.dirname(os.path.realpath(file)) drop_view_query = """ drop table if exists {schema}.DWH_METADATA;""".format( schema=l_db_schema ) db_obj.execute_query(drop_view_query) # Asset job triggers result = my_repo.get_job("all_assets_job").execute_in_process() if not result.success: raise Exception("Asset job execution failed") return result except Exception as e: print("Asset job execution failed") if name == "__main__": crawler_asset_job() i am getting this error 'Missing required config entry "ops" at the root. Sample config for missing entry: {\'ops\': {\'dwh_metadata_creation_2\': {\'inputs\': {\'show_col_asset\': \'<selector>\', \'show_keys_asset\': \'<selector>\', \'show_pk_keys_asset\': \'<selector>\', \'show_prod_dmax_tables\': \'<selector>\', \'show_schema_asset\': \'<selector>\', \'show_table_asset\': \'<selector>\', \'show_view_asset\': \'<selector>\'}}, \'dwh_metadata_creation_3\': {\'inputs\': {\'show_col_asset\': \'<selector>\', \'show_keys_asset\': \'<selector>\', \'show_pk_keys_asset\': \'<selector>\', \'show_prod_dmax_tables\': \'<selector>\', \'show_schema_asset\': \'<selector>\', \'show_table_asset\': \'<selector>\', \'show_view_asset\': \'<selector>\'}}, \'dwh_metadata_creation_4\': {\'inputs\': {\'show_col_asset\': \'<selector>\', \'show_keys_asset\': \'<selector>\', \'show_pk_keys_asset\': \'<selector>\', \'show_prod_dmax_tables\': \'<selector>\', \'show_schema_asset\': \'<selector>\', \'show_table_asset\': \'<selector>\', \'show_view_asset\': \'<selector>\'}}}}' solve this issue
s

Scout

03/09/2024, 11:12 AM
The error message you're encountering indicates that there is a missing configuration for the "ops" key in the run configuration when executing the job. In Dagster, the run configuration specifies how each op (or asset, in the case of software-defined assets) should be configured when the job is executed. However, since you're using assets and not ops, it's important to ensure that your assets are correctly defined and that the job is properly constructed to execute these assets. The error message mentioning "ops" could be a bit misleading if you're working within the asset-based API. Here are some steps you can take to debug and potentially solve this issue: 1. Check Asset Dependencies: Ensure that the dependencies between your assets are correctly defined. In your
dwh_metadata_creation
asset, you are passing the results of other assets as inputs. Make sure that these dependencies are correctly specified. 2. Run Configuration: When executing the job in process with
execute_in_process
, you can pass a
run_config
dictionary that specifies the configuration for resources and assets. If your assets require specific configuration, make sure to include it in the
run_config
. 3. Resource Definitions: Verify that the resources required by your assets are correctly defined and passed to the
resource_defs
parameter of the
@asset
decorator. 4. Repository and Job Definition: Ensure that your repository and job are correctly set up to include all the necessary assets and that the job is constructed to execute these assets in the correct order based on their dependencies. 5. Execution: When calling
execute_in_process
, ensure that you're not passing any incorrect parameters that might be causing Dagster to look for an "ops" configuration. Since the error message you're seeing is not typical for asset-based workflows and the provided code is using assets, it's possible that there might be a mismatch between the assets and the execution call, or there might be a version mismatch where the error message is not accurately reflecting the issue with assets. If you continue to face issues, you might want to check the Dagster documentation or community resources for more information on configuring and executing asset jobs, as the provided documentation does not include a direct solution to the error message you're encountering.
The error message you're encountering indicates that there is a missing configuration for the "ops" key in your job's run configuration. This typically happens when you have a job that expects certain inputs to be provided, but the necessary configuration has not been supplied. In Dagster, when you define a job that includes assets with dependencies, you need to provide the run configuration that specifies how these dependencies are resolved. The error message is suggesting that you need to provide selectors for the inputs of the
dwh_metadata_creation
asset within the job. However, since you are using assets and not ops, it's possible that the error message is misleading or that there is a mismatch in how your assets are being used within the job. With assets, you typically do not need to provide an "ops" configuration, as assets are designed to handle their dependencies automatically. Here are some steps you can take to debug and resolve this issue: 1. Ensure that all assets are correctly defined with their dependencies. For example,
dwh_metadata_creation
depends on other assets like
show_schema_asset
,
show_table_asset
, etc. These dependencies should be correctly specified in the asset definition. 2. Check that the
resource_defs
provided to each asset are correctly configured and that the resources they depend on are available and correctly implemented. 3. When executing the job in process with
execute_in_process
, you typically do not need to provide a run configuration for assets. However, if you have a complex setup or are using selectors, you might need to provide a configuration. Make sure that if a configuration is needed, it is correctly structured and provided to the
execute_in_process
method. 4. Review the job definition
all_assets_job
to ensure that it is correctly composed of the assets and that the assets are invoked in the correct order based on their dependencies. 5. If you have recently migrated from using ops to assets, ensure that there are no remnants of the old op-based configuration that might be causing confusion. 6. If the issue persists, you may want to simplify your job and repository setup to the minimum viable configuration and incrementally add complexity, testing at each step to identify where the configuration issue is introduced. Since the error message references "ops" and you are using assets, it's also possible that there is a version mismatch or a bug in the version of Dagster you are using. Make sure you are using the latest stable version of Dagster, and check the release notes for any known issues or changes related to assets and job execution. If you continue to face difficulties, you may want to reach out to the Dagster community or support channels with specific details about your set
up and the error message for further assistance.