https://dagster.io/ logo
#ask-ai
Title
# ask-ai
s

SUBHADEEP ROY

03/09/2024, 6:29 AM
@Scout i have an asset job given below in which i am passing some resources in the job so that assets can use it but i am unable to and getting error E dagster._core.errors.DagsterInvalidDefinitionError: resource with key 'sio' required by op 'show_schema_asset' was not provided. Please provide a <class 'dagster._core.definitions.resource_definition.ResourceDefinition'> to key 'sio', or change the required key to one of the following keys which points to an <class 'dagster._core.definitions.resource_definition.ResourceDefinition'>: ['io_manager'] , here is the asset job import pandas as pd import psycopg2 from dagster import Definitions, asset, job, repository, resource import os # from dagster_snowflake import SnowflakeResource from dagster import asset from psycopg2.extensions import connection from sqlalchemy import create_engine from typing import Dict, Any from dagster import Definitions, asset from dmax_app.orchestration.orchestration_manager.resources.patch_resource_def import ( patch_cloud_storage_selector, ) from dagster import job from dmax_app.orchestration.orchestration_manager.hooks.status_notif_hook import ( update_inprogress_status_hook, update_completed_status_hook, update_failed_status_hook, ) from dmax_app.orchestration.orchestration_manager.jobs.crawler.op_initiate_sync_crawler_job import ( initiate_sync_crawler_job, ) from dmax_app.orchestration.orchestration_manager.resources.metadata_resource import metadata_resource from dmax_app.orchestration.orchestration_manager.resources.socketio_resource import socket_resource from dmax_app.orchestration.orchestration_manager.resources.metadata_query_resource import ( metadata_query_resource, ) from dmax_app.orchestration.orchestration_manager.resources.file_storage_repo_resource import ( file_storage_repo_selector, ) from dmax_app.orchestration.orchestration_manager.resources.warehouse_resource import ( warehouse_resource_selector, ) from dmax_app.orchestration.orchestration_manager.resources.warehouse_query_provider_resource import ( warehouse_query_resource_selector, ) from dmax_app.orchestration.orchestration_manager.executors.executor_selector import ( executor_selector, ) from dmax_app.orchestration.orchestration_manager.resources.patch_resource_def import ( patch_cloud_storage_selector, ) from dmax_app.orchestration.orchestration_manager.resources.get_default_config import ( get_default_job_config, get_op_default_retry, ) from dmax_app.orchestration.orchestration_manager.resources.snowflake_crawler_resource import ( SnowflakeCrawlConnection, ) from dmax_app.orchestration.orchestration_manager.executors.executor_selector import ( executor_selector, ) from dmax_app.orchestration.orchestration_manager.loggers.setup_datamax_logger import ( setup_logger, ) from dmax_app.orchestration.orchestration_manager.resources.op_config_tag_selector import ( op_tag_selector, ) from dmax_app.orchestration.orchestration_manager.resources.get_default_config import ( get_default_job_config, get_op_default_retry, ) from dmax_app.orchestration.orchestration_manager.resources.metadata_resource import metadata_resource from dmax_app.orchestration.orchestration_enums import ( ENUM_DB_CREDENTIAL_DICT, ENUM_DB_DATABRICKS, ENUM_DW_CREDENTIAL_DICT, ) from dmax_app.orchestration.orchestration_config import ENUM_DW_DB from dmax_app.orchestration.orchestration_enums import ( ENUM_DB_DATABRICKS, ENUM_DB_SNOWFLAKE, ENUM_DB_REDSHIFT, ENUM_DW_CREDENTIAL_DICT, ENUM_DWH_MASTER_DB_PASSWORD, ENUM_DWH_MASTER_DB_USER, ENUM_WAREHOUSE_SF_ROLE_MASTER, ) from dmax_app.orchestration.orchestration_manager.resources.warehouse_resource import warehouse_resource_selector wh_res = SnowflakeCrawlConnection() l_dw_database = ENUM_DW_CREDENTIAL_DICT["database"] l_logger = setup_logger() l_db_schema = ENUM_DB_CREDENTIAL_DICT["schema"] dir_path = os.path.dirname(os.path.realpath(file)) # db_obj = metadata_resource def engine_creation(): database_name = ENUM_DB_CREDENTIAL_DICT["database"] user = ENUM_DB_CREDENTIAL_DICT["user"] password = ENUM_DB_CREDENTIAL_DICT["password"] host = ENUM_DB_CREDENTIAL_DICT["host"] port = ENUM_DB_CREDENTIAL_DICT["port"] # l_logger.info(database_name,user,password,host,port) engine = create_engine( f"postgresql//{user}{password}@{host}:{port}/{database_name}" ) return engine @asset(required_resource_keys={"sio", "db_obj", "db_query_obj", "dw_obj", "dw_query_obj"},) def show_schema_asset(context) -> pd.DataFrame: # wh_ress = warehouse_resource_selector() db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query = f"SHOW SCHEMAS IN DATABASE {l_dw_database};" l_logger.info(l_query) # result = context.resources.db_obj.execute_query_as_dataframe_crawl(sql_query=l_query) df_schema = dw_obj.execute_query_as_dataframe(sql_query=l_query) return df_schema @asset def show_table_asset(context) -> pd.DataFrame: l_query = f"SHOW TABLES IN DATABASE {l_dw_database};" l_logger.info(l_query) df_table = wh_res.execute_query_as_dataframe_crawl(sql_query=l_query) return df_table @asset def show_col_asset(context) -> pd.DataFrame: l_query = f"SHOW COLUMNS IN DATABASE {l_dw_database};" l_logger.info(l_query) df_col = wh_res.execute_query_as_dataframe_crawl(sql_query=l_query) # l_logger.info(df_col) df_col = df_col.sort_values(by=["schema_name", "table_name"]).reset_index(drop=True) df_col["ordinal_position"] = df_col.index return df_col @asset def show_view_asset(context) -> pd.DataFrame: l_query = f"SHOW VIEWS IN DATABASE {l_dw_database};" l_logger.info(l_query) df_view = wh_res.execute_query_as_dataframe_crawl(sql_query=l_query) return df_view @asset def show_keys_asset(context) -> pd.DataFrame: l_query = f"SHOW IMPORTED KEYS IN DATABASE {l_dw_database};" l_logger.info(l_query) df_keys = wh_res.execute_query_as_dataframe_crawl(sql_query=l_query) return df_keys @asset def show_pk_keys_asset(context) -> pd.DataFrame: l_query = f"SHOW PRIMARY KEYS IN DATABASE {l_dw_database};" l_logger.info(l_query) df_pk_keys = wh_res.execute_query_as_dataframe_crawl(sql_query=l_query) return df_pk_keys @asset def show_prod_dmax_tables(context) -> pd.DataFrame: l_query = f"SELECT * FROM DMAX_CHANGE_VERSION.DMAX_TABLES" l_logger.info(l_query) df_dmax_tables = wh_res.execute_query_as_dataframe_crawl(sql_query=l_query) l_logger.info(df_dmax_tables) return df_dmax_tables @asset(required_resource_keys={"sio", "db_obj", "db_query_obj", "dw_obj", "dw_query_obj"},) def dmax_schema_asset(context,show_schema_asset): engine = engine_creation() show_schema_asset.to_sql( "crawler_schemas", engine, schema=l_db_schema, if_exists="replace", index=False ) @asset def dmax_table_asset(show_table_asset): engine = engine_creation() show_table_asset.to_sql( "crawler_tables", engine, schema=l_db_schema, if_exists="replace", index=False ) @asset def dmax_col_asset(show_col_asset): engine = engine_creation() show_col_asset.to_sql( "crawler_columns", engine, schema=l_db_schema, if_exists="replace", index=False ) @asset def dmax_view_asset(show_view_asset): engine = engine_creation() show_view_asset.to_sql( "crawler_views", engine, schema=l_db_schema, if_exists="replace", index=False ) @asset def dmax_key_asset(show_keys_asset): engine = engine_creation() show_keys_asset.to_sql( "crawler_constraints", engine, schema=l_db_schema, if_exists="replace", index=False, ) @asset def dmax_pk_key_asset(show_pk_keys_asset): engine = engine_creation() show_pk_keys_asset.to_sql( "crawler_pk", engine, schema=l_db_schema, if_exists="replace", index=False ) @asset def dmax_prod_table_asset(show_prod_dmax_tables): engine = engine_creation() show_prod_dmax_tables.to_sql( "crawler_prod_tables", engine, schema=l_db_schema, if_exists="replace", index=False, ) @job( resource_defs=patch_cloud_storage_selector({ "db_obj": metadata_resource, "sio": socket_resource, "db_query_obj": metadata_query_resource, "dw_obj": warehouse_resource_selector(), "dw_query_obj": warehouse_query_resource_selector(), "fs_obj": file_storage_repo_selector(), }) ) def all_assets_job(): dmax_schema_asset(show_schema_asset()) dmax_table_asset(show_table_asset()) dmax_col_asset(show_col_asset()) dmax_view_asset(show_view_asset()) dmax_key_asset(show_keys_asset()) dmax_pk_key_asset(show_pk_keys_asset()) dmax_prod_table_asset(show_prod_dmax_tables()) # Repository definition that includes the assets and the job @repository def my_repo(): return [ show_schema_asset, show_table_asset, show_col_asset, show_view_asset, show_keys_asset, show_pk_keys_asset, show_prod_dmax_tables, dmax_schema_asset, dmax_table_asset, dmax_col_asset, dmax_view_asset, dmax_key_asset, dmax_pk_key_asset, dmax_prod_table_asset, all_assets_job, ] # Execute the job in process def crawler_asset_job(p_in_db_obj, l_db_schema): try: db_obj = p_in_db_obj # schema = l_db_schema dir_path = os.path.dirname(os.path.realpath(file)) drop_view_query = """ drop table if exists {schema}.DWH_METADATA;""".format( schema=l_db_schema ) db_obj.execute_query(drop_view_query) # Asset job triggers result = my_repo.get_job("all_assets_job").execute_in_process() if not result.success: raise Exception("Asset job execution failed") return result except Exception as e: print("Asset job execution failed") if name == "__main__": crawler_asset_job()
s

Scout

03/09/2024, 6:29 AM
The error message you're encountering indicates that the
show_schema_asset
asset requires a resource with the key
'sio'
, but this resource was not provided in the job's
resource_defs
. The error message also suggests that you should provide a
ResourceDefinition
for the key
'sio'
, or change the required key to one of the keys that is actually provided. In your
all_assets_job
, you are defining
resource_defs
using
patch_cloud_storage_selector
and passing a dictionary that includes
"sio": socket_resource
. This suggests that the
socket_resource
should be available to your assets with the key
'sio'
. However, if the error persists, it could be due to several reasons: 1. The
socket_resource
may not be correctly defined or may not be a
ResourceDefinition
. 2. There might be an issue with how the
patch_cloud_storage_selector
function is handling the resources. 3. The
socket_resource
may not be correctly configured or instantiated when the job is executed. To debug this issue, you can: - Ensure that
socket_resource
is a valid
ResourceDefinition
and is correctly configured. - Verify that the
patch_cloud_storage_selector
function is correctly applying the resource definitions to the job. - Check if there are any typos or mismatches in the resource keys between the asset definitions and the job's
resource_defs
. - Make sure that the
socket_resource
is accessible and properly initialized at runtime. If you've checked all of the above and the issue still occurs, you may need to look into the implementation details of the
patch_cloud_storage_selector
function and the
socket_resource
to ensure they are compatible with the job and asset definitions. Remember that the resources you define in
resource_defs
for a job must match the
required_resource_keys
specified in your assets. If there's a mismatch, Dagster will not be able to resolve the required resources, leading to the error you're seeing.
s

SUBHADEEP ROY

03/09/2024, 8:09 AM
@Scout i have an asset job and i am getting error import pandas as pd import psycopg2 from dagster import Definitions, asset, job, repository, resource import os # from dagster_snowflake import SnowflakeResource from dagster import asset from psycopg2.extensions import connection from sqlalchemy import create_engine from typing import Dict, Any from dagster import Definitions, asset from dmax_app.orchestration.orchestration_manager.resources.patch_resource_def import ( patch_cloud_storage_selector, ) from dagster import job from dmax_app.orchestration.orchestration_manager.hooks.status_notif_hook import ( update_inprogress_status_hook, update_completed_status_hook, update_failed_status_hook, ) from dmax_app.orchestration.orchestration_manager.jobs.crawler.op_initiate_sync_crawler_job import ( initiate_sync_crawler_job, ) from dmax_app.orchestration.orchestration_manager.resources.metadata_resource import metadata_resource from dmax_app.orchestration.orchestration_manager.resources.socketio_resource import socket_resource from dmax_app.orchestration.orchestration_manager.resources.metadata_query_resource import ( metadata_query_resource, ) from dmax_app.orchestration.orchestration_manager.resources.file_storage_repo_resource import ( file_storage_repo_selector, ) from dmax_app.orchestration.orchestration_manager.resources.warehouse_resource import ( warehouse_resource_selector, ) from dmax_app.orchestration.orchestration_manager.resources.warehouse_query_provider_resource import ( warehouse_query_resource_selector, ) from dmax_app.orchestration.orchestration_manager.executors.executor_selector import ( executor_selector, ) from dmax_app.orchestration.orchestration_manager.resources.patch_resource_def import ( patch_cloud_storage_selector, ) from dmax_app.orchestration.orchestration_manager.resources.get_default_config import ( get_default_job_config, get_op_default_retry, ) from dmax_app.orchestration.orchestration_manager.resources.snowflake_crawler_resource import ( SnowflakeCrawlConnection, ) from dmax_app.orchestration.orchestration_manager.executors.executor_selector import ( executor_selector, ) from dmax_app.orchestration.orchestration_manager.loggers.setup_datamax_logger import ( setup_logger, ) from dmax_app.orchestration.orchestration_manager.resources.op_config_tag_selector import ( op_tag_selector, ) from dmax_app.orchestration.orchestration_manager.resources.get_default_config import ( get_default_job_config, get_op_default_retry, ) from dmax_app.orchestration.orchestration_manager.resources.metadata_resource import metadata_resource from dmax_app.orchestration.orchestration_enums import ( ENUM_DB_CREDENTIAL_DICT, ENUM_DB_DATABRICKS, ENUM_DW_CREDENTIAL_DICT, ) from dmax_app.orchestration.orchestration_config import ENUM_DW_DB from dmax_app.orchestration.orchestration_enums import ( ENUM_DB_DATABRICKS, ENUM_DB_SNOWFLAKE, ENUM_DB_REDSHIFT, ENUM_DW_CREDENTIAL_DICT, ENUM_DWH_MASTER_DB_PASSWORD, ENUM_DWH_MASTER_DB_USER, ENUM_WAREHOUSE_SF_ROLE_MASTER, ) from dmax_app.orchestration.orchestration_manager.resources.warehouse_resource import warehouse_resource_selector wh_res = SnowflakeCrawlConnection() l_dw_database = ENUM_DW_CREDENTIAL_DICT["database"] l_logger = setup_logger() l_db_schema = ENUM_DB_CREDENTIAL_DICT["schema"] dir_path = os.path.dirname(os.path.realpath(file)) # db_obj = metadata_resource def engine_creation(): database_name = ENUM_DB_CREDENTIAL_DICT["database"] user = ENUM_DB_CREDENTIAL_DICT["user"] password = ENUM_DB_CREDENTIAL_DICT["password"] host = ENUM_DB_CREDENTIAL_DICT["host"] port = ENUM_DB_CREDENTIAL_DICT["port"] # l_logger.info(database_name,user,password,host,port) engine = create_engine( f"postgresql//{user}{password}@{host}:{port}/{database_name}" ) return engine @asset( resource_defs={ "sio": socket_resource, "db_obj1": metadata_resource, "dw_obj": warehouse_resource_selector(), }) def show_schema_asset(context) -> pd.DataFrame: db_obj = context.resources.db_obj1 dw_obj = context.resources.dw_obj l_query = f"SHOW SCHEMAS IN DATABASE {l_dw_database};" l_logger.info(l_query) # result = context.resources.db_obj.execute_query_as_dataframe_crawl(sql_query=l_query) df_schema = dw_obj.execute_query_as_dataframe(sql_query=l_query) engine = engine_creation() df_schema.to_sql( "crawler_schemas", engine, schema=l_db_schema, if_exists="replace", index=False ) @asset( resource_defs={ "sio": socket_resource, "db_obj1": metadata_resource, "dw_obj": warehouse_resource_selector(), }) def show_table_asset(context) -> pd.DataFrame: db_obj = context.resources.db_obj1 dw_obj = context.resources.dw_obj l_query = f"SHOW TABLES IN DATABASE {l_dw_database};" l_logger.info(l_query) df_table = dw_obj.execute_query_as_dataframe(sql_query=l_query) engine = engine_creation() df_table.to_sql( "crawler_tables", engine, schema=l_db_schema, if_exists="replace", index=False ) # @asset( resource_defs={ # "sio": socket_resource, # "db_obj1": metadata_resource, # "dw_obj": warehouse_resource_selector(), # }) # def show_col_asset(context) -> pd.DataFrame: # dw_obj = context.resources.dw_obj # l_query = f"SHOW COLUMNS IN DATABASE {l_dw_database};" # l_logger.info(l_query) # df_col = dw_obj.execute_query_as_dataframe(sql_query=l_query) # # l_logger.info(df_col) # df_col = df_col.sort_values(by=["schema_name", "table_name"]).reset_index(drop=True) # df_col["ordinal_position"] = df_col.index # engine = engine_creation() # df_col.to_sql( # "crawler_columns", engine, schema=l_db_schema, if_exists="replace", index=False # ) # @asset( resource_defs={ # "sio": socket_resource, # "db_obj1": metadata_resource, # "dw_obj": warehouse_resource_selector(), # }) # def show_view_asset(context) -> pd.DataFrame: # dw_obj = context.resources.dw_obj # l_query = f"SHOW VIEWS IN DATABASE {l_dw_database};" # l_logger.info(l_query) # df_view = dw_obj.execute_query_as_dataframe(sql_query=l_query) # engine = engine_creation() # df_view.to_sql( # "crawler_views", engine, schema=l_db_schema, if_exists="replace", index=False # ) # @asset( resource_defs={ # "sio": socket_resource, # "db_obj1": metadata_resource, # "dw_obj": warehouse_resource_selector(), # }) # def show_keys_asset(context) -> pd.DataFrame: # dw_obj = context.resources.dw_obj # l_query = f"SHOW IMPORTED KEYS IN DATABASE {l_dw_database};" # l_logger.info(l_query) # df_keys =dw_obj.execute_query_as_dataframe(sql_query=l_query) # engine = engine_creation() # df_keys.to_sql( # "crawler_constraints", # engine, # schema=l_db_schema, # if_exists="replace", # index=False, # ) # @asset( resource_defs={ # "sio": socket_resource, # "db_obj1": metadata_resource, # "dw_obj": warehouse_resource_selector(), # }) # def show_pk_keys_asset(context) -> pd.DataFrame: # dw_obj = context.resources.dw_obj # l_query = f"SHOW PRIMARY KEYS IN DATABASE {l_dw_database};" # l_logger.info(l_query) # df_pk_keys = dw_obj.execute_query_as_dataframe(sql_query=l_query) # engine = engine_creation() # df_pk_keys.to_sql( # "crawler_pk", engine, schema=l_db_schema, if_exists="replace", index=False # ) # @asset( resource_defs={ # "sio": socket_resource, # "db_obj1": metadata_resource, # "dw_obj": warehouse_resource_selector(), # }) # def show_prod_dmax_tables(context) -> pd.DataFrame: # dw_obj = context.resources.dw_obj # l_query = f"SELECT * FROM DMAX_CHANGE_VERSION.DMAX_TABLES" # l_logger.info(l_query) # df_dmax_tables = dw_obj.execute_query_as_dataframe(sql_query=l_query) # l_logger.info(df_dmax_tables) # engine = engine_creation() # df_dmax_tables.to_sql( # "crawler_prod_tables", # engine, # schema=l_db_schema, # if_exists="replace", # index=False, # ) @job( resource_defs={ "sio": socket_resource, "db_obj1": metadata_resource, "dw_obj": warehouse_resource_selector(), } ) def all_assets_job(): show_schema_asset() show_table_asset() # show_col_asset() # show_view_asset() # show_keys_asset() # show_pk_keys_asset() # show_prod_dmax_tables() # Repository definition that includes the assets and the job @repository def my_repo(): return [ show_schema_asset, show_table_asset, # show_col_asset, # show_view_asset, # show_keys_asset, # show_pk_keys_asset, # show_prod_dmax_tables, all_assets_job, ] # Execute the job in process def crawler_asset_job(p_in_db_obj, l_db_schema): try: db_obj = p_in_db_obj # schema = l_db_schema dir_path = os.path.dirname(os.path.realpath(file)) drop_view_query = """ drop table if exists {schema}.DWH_METADATA;""".format( schema=l_db_schema ) db_obj.execute_query(drop_view_query) # Asset job triggers result = my_repo.get_job("all_assets_job").execute_in_process() if not result.success: raise Exception("Asset job execution failed") # Creating deltas insert , update , delete along with DWH metadata l_query_dwh_metadata = dir_path + "/delta_generation_scripts/dwh_metadata.sql" l_query_dwh = open(l_query_dwh_metadata).read().format(schema=l_db_schema) l_query_dwh_metadata = """CREATE TABLE DWH_METADATA AS {query}""".format( query=l_query_dwh ) db_obj.execute_query(l_query_dwh_metadata) # Insert Delta table l_query_insert_delta = ( dir_path + "/delta_generation_scripts/crawler_identify_inserts.sql" ) l_query_insert = open(l_query_insert_delta).read().format(schema=l_db_schema) l_query_insert_delta = """drop table if exists crawler_identify_inserts; CREATE TABLE crawler_identify_inserts AS {query}""".format( query=l_query_insert ) db_obj.execute_query(l_query_insert_delta) # Delete Delta table l_query_delete_delta = ( dir_path + "/delta_generation_scripts/crawler_identify_deletes.sql" ) l_query_delete = open(l_query_delete_delta).read().format(schema=l_db_schema) l_query_delete_delta = """drop table if exists crawler_identify_ideletes; CREATE TABLE crawler_identify_ideletes AS {query}""".format( query=l_query_delete ) db_obj.execute_query(l_query_delete_delta) # Update Delta table l_query_update_delta = ( dir_path + "/delta_generation_scripts/crawler_identify_updates.sql" ) l_query_update = open(l_query_update_delta).read().format(schema=l_db_schema) l_query_update_delta = """drop table if exists crawler_identify_updates; CREATE TABLE crawler_identify_updates AS {query}""".format( query=l_query_update ) db_obj.execute_query(l_query_update_delta) return result except Exception as e: print("Asset job execution failed") if name == "__main__": crawler_asset_job() E dagster._core.errors.DagsterInvalidDefinitionError: Conflicting versions of resource with key 'dw_obj' were provided to different assets. When constructing a job, all resource definitions provided to assets must match by reference equality for a given key.
s

Scout

03/09/2024, 8:09 AM
It seems like there might be a misunderstanding or an issue with the configuration of your resources within the job definition. To resolve the
DagsterInvalidDefinitionError
you're encountering, you should ensure that the resources required by your assets are correctly provided in the job's
resource_defs
. Here are some steps you can take to debug and resolve the issue: 1. Check Resource Definitions: Ensure that
socket_resource
and other resources are correctly defined and that they inherit from
ResourceDefinition
. The resources should be available and correctly configured to be used by the assets. 2. Verify Resource Keys: Make sure that the keys used in
required_resource_keys
within your asset definitions exactly match the keys provided in the
resource_defs
of your job. In your case, the
show_schema_asset
asset requires a resource with the key
'sio'
, so you must provide a resource with that exact key in the job's
resource_defs
. 3. Review Resource Selector Function: If you are using a custom function like
patch_cloud_storage_selector
to define your resources, ensure that this function is correctly applying the resource definitions. It should return a dictionary where the keys match the
required_resource_keys
of your assets. 4. Check Resource Availability: When the job is executed, the resources must be correctly instantiated and available for the assets. If there's an issue with the instantiation or configuration of the resources at runtime, it could lead to the error you're seeing. 5. Simplify for Testing: As a debugging step, you might want to simplify your job definition to include only the
show_schema_asset
asset and the minimal set of resources required. This can help isolate the issue. 6. Consult Documentation and Examples: Review the Dagster documentation for examples of defining resources and providing them to jobs. Ensure that your implementation follows the documented patterns. 7. Error Messages and Logs: Pay close attention to the error messages and logs when you execute the job. They can provide clues about what might be going wrong. 8. Update and Retry: After making changes to your resource definitions or job configuration, make sure to update your repository and retry executing the job to see if the issue is resolved. If you've gone through these steps and the issue persists, you might want to provide more details about the
socket_resource
and the
patch_cloud_storage_selector
function, as the problem could be specific to their implementations.
The error message you're encountering,
DagsterInvalidDefinitionError: Conflicting versions of resource with key 'dw_obj' were provided to different assets
, indicates that there are conflicting resource definitions for the resource key
'dw_obj'
being used by different assets within the same job. In Dagster, when you construct a job that includes multiple assets, each resource key must point to a single resource definition. If different assets within the same job provide different definitions for the same resource key, Dagster will raise an error because it cannot determine which definition to use. To resolve this issue, you need to ensure that the resource with the key
'dw_obj'
is defined once and that the same definition is used by all assets that require it. Here's how you can approach fixing this: 1. Consolidate Resource Definitions: Make sure that you define the resource with the key
'dw_obj'
only once and use the same definition for all assets that require it. 2. Check Resource Selector Functions: If you are using a function like
warehouse_resource_selector()
to get the resource definition, ensure that this function always returns the same resource instance for the
'dw_obj'
key. If this function returns different instances (perhaps due to different configurations or parameters), it will lead to the error you're seeing. 3. Review Job Resource Definitions: In your
all_assets_job
, you should provide a single, consistent set of resource definitions that will be used by all assets included in the job. This means that the
resource_defs
dictionary should have one entry for each unique resource key, and the corresponding value should be the same resource definition used by all assets that require that key. 4. Update Asset Decorators: Update the
@asset
decorators for all assets to ensure they are not providing conflicting resource definitions. They should all reference the same resource instances provided in the job's
resource_defs
. 5. Test with a Simplified Job: As a debugging step, you can create a simplified job that includes only one asset and the minimal set of resources required. Once that works, gradually add more assets to the job, ensuring that the resource definitions remain consistent. 6. Execute and Verify: After making the necessary changes, execute the job again and verify that the error is resolved. By following these steps, you should be able to resolve the
DagsterInvalidDefinitionError
and ensure that your job executes successfully with consistent resource definitions.
s

SUBHADEEP ROY

03/09/2024, 8:14 AM
@Scout can u write the code solution by refactoring my code given above
s

Scout

03/09/2024, 8:14 AM
As a developer relations assistant for Dagster, I can provide guidance on how to refactor your code based on the error message you've encountered. However, please note that I can only provide general advice and cannot write or test the actual code for you. To address the
DagsterInvalidDefinitionError
regarding conflicting resource definitions for the key
'dw_obj'
, you should ensure that the resource is defined once and used consistently across all assets. Here's a step-by-step guide to refactor your code: 1. Define Resources Once: Define your resources in one place. For example, you can create a function that returns all the resources you need:
Copy code
python def define_resources():     return {         "sio": socket_resource,         "db_obj1": metadata_resource,         "dw_obj": warehouse_resource_selector(),         # Add other resources as needed     }
2. Use Defined Resources in Assets: Use the resources from the function you defined in your assets. For example:
Copy code
python @asset(resource_defs=define_resources()) def show_schema_asset(context) -> pd.DataFrame:     # Your asset code here     pass  @asset(resource_defs=define_resources()) def show_table_asset(context) -> pd.DataFrame:     # Your asset code here     pass
3. Use Defined Resources in Job: Use the same resources in your job definition:
Copy code
python @job(resource_defs=define_resources()) def all_assets_job():     show_schema_asset()     show_table_asset()     # Include other assets as needed
4. Repository Definition: Update your repository definition to include the refactored job and assets:
Copy code
python @repository def my_repo():     return [         show_schema_asset,         show_table_asset,         # Include other assets as needed         all_assets_job,     ]
5. Execute and Verify: After refactoring, execute the job again to verify that the error is resolved:
Copy code
python if __name__ == "__main__":     result = my_repo.get_job("all_assets_job").execute_in_process()     if not result.success:         raise Exception("Asset job execution failed")
By following these steps, you should be able to refactor your code to resolve the conflicting resource definitions. Remember to replace the placeholders with your actual asset code and ensure that the
define_resources
function returns consistent resource instances for the
'dw_obj'
key and any other resources your assets require.
s

SUBHADEEP ROY

03/09/2024, 8:27 AM
@Scout i refactored the script according to your suggestion still getting same error import pandas as pd import psycopg2 from dagster import Definitions, asset, job, repository, resource import os # from dagster_snowflake import SnowflakeResource from dagster import asset from psycopg2.extensions import connection from sqlalchemy import create_engine from typing import Dict, Any from dagster import Definitions, asset from dmax_app.orchestration.orchestration_manager.resources.patch_resource_def import ( patch_cloud_storage_selector, ) from dagster import job from dmax_app.orchestration.orchestration_manager.hooks.status_notif_hook import ( update_inprogress_status_hook, update_completed_status_hook, update_failed_status_hook, ) from dmax_app.orchestration.orchestration_manager.jobs.crawler.op_initiate_sync_crawler_job import ( initiate_sync_crawler_job, ) from dmax_app.orchestration.orchestration_manager.resources.metadata_resource import metadata_resource from dmax_app.orchestration.orchestration_manager.resources.socketio_resource import socket_resource from dmax_app.orchestration.orchestration_manager.resources.metadata_query_resource import ( metadata_query_resource, ) from dmax_app.orchestration.orchestration_manager.resources.file_storage_repo_resource import ( file_storage_repo_selector, ) from dmax_app.orchestration.orchestration_manager.resources.warehouse_resource import ( warehouse_resource_selector, ) from dmax_app.orchestration.orchestration_manager.resources.warehouse_query_provider_resource import ( warehouse_query_resource_selector, ) from dmax_app.orchestration.orchestration_manager.executors.executor_selector import ( executor_selector, ) from dmax_app.orchestration.orchestration_manager.resources.patch_resource_def import ( patch_cloud_storage_selector, ) from dmax_app.orchestration.orchestration_manager.resources.get_default_config import ( get_default_job_config, get_op_default_retry, ) from dmax_app.orchestration.orchestration_manager.resources.snowflake_crawler_resource import ( SnowflakeCrawlConnection, ) from dmax_app.orchestration.orchestration_manager.executors.executor_selector import ( executor_selector, ) from dmax_app.orchestration.orchestration_manager.loggers.setup_datamax_logger import ( setup_logger, ) from dmax_app.orchestration.orchestration_manager.resources.op_config_tag_selector import ( op_tag_selector, ) from dmax_app.orchestration.orchestration_manager.resources.get_default_config import ( get_default_job_config, get_op_default_retry, ) from dmax_app.orchestration.orchestration_manager.resources.metadata_resource import metadata_resource from dmax_app.orchestration.orchestration_enums import ( ENUM_DB_CREDENTIAL_DICT, ENUM_DB_DATABRICKS, ENUM_DW_CREDENTIAL_DICT, ) from dmax_app.orchestration.orchestration_config import ENUM_DW_DB from dmax_app.orchestration.orchestration_enums import ( ENUM_DB_DATABRICKS, ENUM_DB_SNOWFLAKE, ENUM_DB_REDSHIFT, ENUM_DW_CREDENTIAL_DICT, ENUM_DWH_MASTER_DB_PASSWORD, ENUM_DWH_MASTER_DB_USER, ENUM_WAREHOUSE_SF_ROLE_MASTER, ) from dmax_app.orchestration.orchestration_manager.resources.warehouse_resource import warehouse_resource_selector wh_res = SnowflakeCrawlConnection() l_dw_database = ENUM_DW_CREDENTIAL_DICT["database"] l_logger = setup_logger() l_db_schema = ENUM_DB_CREDENTIAL_DICT["schema"] dir_path = os.path.dirname(os.path.realpath(file)) # db_obj = metadata_resource def define_resources(): return { "sio": socket_resource, "db_obj1": metadata_resource, "dw_obj": warehouse_resource_selector(), # Add other resources as needed } def engine_creation(): database_name = ENUM_DB_CREDENTIAL_DICT["database"] user = ENUM_DB_CREDENTIAL_DICT["user"] password = ENUM_DB_CREDENTIAL_DICT["password"] host = ENUM_DB_CREDENTIAL_DICT["host"] port = ENUM_DB_CREDENTIAL_DICT["port"] # l_logger.info(database_name,user,password,host,port) engine = create_engine( f"postgresql//{user}{password}@{host}:{port}/{database_name}" ) return engine # @asset( resource_defs={ # "sio": socket_resource, # "db_obj1": metadata_resource, # "dw_obj": warehouse_resource_selector(), # }) @asset(resource_defs=define_resources()) def show_schema_asset(context) -> pd.DataFrame: db_obj = context.resources.db_obj1 dw_obj = context.resources.dw_obj l_query = f"SHOW SCHEMAS IN DATABASE {l_dw_database};" l_logger.info(l_query) # result = context.resources.db_obj.execute_query_as_dataframe_crawl(sql_query=l_query) df_schema = dw_obj.execute_query_as_dataframe(sql_query=l_query) engine = engine_creation() df_schema.to_sql( "crawler_schemas", engine, schema=l_db_schema, if_exists="replace", index=False ) # @asset( resource_defs={"sio", "db_obj1","dw_obj"}) @asset(resource_defs=define_resources()) def show_table_asset(context) -> pd.DataFrame: db_obj = context.resources.db_obj1 dw_obj = context.resources.dw_obj l_query = f"SHOW TABLES IN DATABASE {l_dw_database};" l_logger.info(l_query) df_table = dw_obj.execute_query_as_dataframe(sql_query=l_query) engine = engine_creation() df_table.to_sql( "crawler_tables", engine, schema=l_db_schema, if_exists="replace", index=False ) # @asset( resource_defs={ # "sio": socket_resource, # "db_obj1": metadata_resource, # "dw_obj": warehouse_resource_selector(), # }) # def show_col_asset(context) -> pd.DataFrame: # dw_obj = context.resources.dw_obj # l_query = f"SHOW COLUMNS IN DATABASE {l_dw_database};" # l_logger.info(l_query) # df_col = dw_obj.execute_query_as_dataframe(sql_query=l_query) # # l_logger.info(df_col) # df_col = df_col.sort_values(by=["schema_name", "table_name"]).reset_index(drop=True) # df_col["ordinal_position"] = df_col.index # engine = engine_creation() # df_col.to_sql( # "crawler_columns", engine, schema=l_db_schema, if_exists="replace", index=False # ) # @asset( resource_defs={ # "sio": socket_resource, # "db_obj1": metadata_resource, # "dw_obj": warehouse_resource_selector(), # }) # def show_view_asset(context) -> pd.DataFrame: # dw_obj = context.resources.dw_obj # l_query = f"SHOW VIEWS IN DATABASE {l_dw_database};" # l_logger.info(l_query) # df_view = dw_obj.execute_query_as_dataframe(sql_query=l_query) # engine = engine_creation() # df_view.to_sql( # "crawler_views", engine, schema=l_db_schema, if_exists="replace", index=False # ) # @asset( resource_defs={ # "sio": socket_resource, # "db_obj1": metadata_resource, # "dw_obj": warehouse_resource_selector(), # }) # def show_keys_asset(context) -> pd.DataFrame: # dw_obj = context.resources.dw_obj # l_query = f"SHOW IMPORTED KEYS IN DATABASE {l_dw_database};" # l_logger.info(l_query) # df_keys =dw_obj.execute_query_as_dataframe(sql_query=l_query) # engine = engine_creation() # df_keys.to_sql( # "crawler_constraints", # engine, # schema=l_db_schema, # if_exists="replace", # index=False, # ) # @asset( resource_defs={ # "sio": socket_resource, # "db_obj1": metadata_resource, # "dw_obj": warehouse_resource_selector(), # }) # def show_pk_keys_asset(context) -> pd.DataFrame: # dw_obj = context.resources.dw_obj # l_query = f"SHOW PRIMARY KEYS IN DATABASE {l_dw_database};" # l_logger.info(l_query) # df_pk_keys = dw_obj.execute_query_as_dataframe(sql_query=l_query) # engine = engine_creation() # df_pk_keys.to_sql( # "crawler_pk", engine, schema=l_db_schema, if_exists="replace", index=False # ) # @asset( resource_defs={ # "sio": socket_resource, # "db_obj1": metadata_resource, # "dw_obj": warehouse_resource_selector(), # }) # def show_prod_dmax_tables(context) -> pd.DataFrame: # dw_obj = context.resources.dw_obj # l_query = f"SELECT * FROM DMAX_CHANGE_VERSION.DMAX_TABLES" # l_logger.info(l_query) # df_dmax_tables = dw_obj.execute_query_as_dataframe(sql_query=l_query) # l_logger.info(df_dmax_tables) # engine = engine_creation() # df_dmax_tables.to_sql( # "crawler_prod_tables", # engine, # schema=l_db_schema, # if_exists="replace", # index=False, # ) @job(resource_defs=define_resources() ) def all_assets_job(): show_schema_asset() show_table_asset() # show_col_asset() # show_view_asset() # show_keys_asset() # show_pk_keys_asset() # show_prod_dmax_tables() # Repository definition that includes the assets and the job @repository def my_repo(): return [ show_schema_asset, show_table_asset, # show_col_asset, # show_view_asset, # show_keys_asset, # show_pk_keys_asset, # show_prod_dmax_tables, all_assets_job, ] # Execute the job in process def crawler_asset_job(p_in_db_obj, l_db_schema): try: db_obj = p_in_db_obj # schema = l_db_schema dir_path = os.path.dirname(os.path.realpath(file)) drop_view_query = """ drop table if exists {schema}.DWH_METADATA;""".format( schema=l_db_schema ) db_obj.execute_query(drop_view_query) # Asset job triggers result = my_repo.get_job("all_assets_job").execute_in_process() if not result.success: raise Exception("Asset job execution failed") return result except Exception as e: print("Asset job execution failed") if name == "__main__": crawler_asset_job() E dagster._core.errors.DagsterInvalidDefinitionError: Conflicting versions of resource with key 'dw_obj' were provided to different assets. When constructing a job, all resource definitions provided to assets must match by reference equality for a given key.
s

Scout

03/09/2024, 8:27 AM
The error message you're encountering suggests that there are still conflicting versions of the resource with the key
'dw_obj'
provided to different assets. This can happen if the function
warehouse_resource_selector()
returns different instances of the resource each time it is called, rather than a single, consistent instance. To ensure that the same instance of the resource is used across all assets, you can instantiate the resource once and then pass that instance to the assets and job. Here's how you can modify your
define_resources
function to achieve this:
Copy code
python def define_resources():     # Instantiate the resources once     sio_instance = socket_resource()     db_obj1_instance = metadata_resource()     dw_obj_instance = warehouse_resource_selector()      # Return a dictionary of the instantiated resources     return {         "sio": sio_instance,         "db_obj1": db_obj1_instance,         "dw_obj": dw_obj_instance,         # Add other resources as needed     }
By doing this, you ensure that the same instance of each resource is used consistently across all assets and the job. This should resolve the
DagsterInvalidDefinitionError
related to conflicting resource versions. Make sure to update your assets and job to use the
define_resources
function as shown in the previous example:
Copy code
python @asset(resource_defs=define_resources()) def show_schema_asset(context) -> pd.DataFrame:     # Your asset code here     pass  # Repeat for other assets...  @job(resource_defs=define_resources()) def all_assets_job():     show_schema_asset()     show_table_asset()     # Include other assets as needed
After making these changes, try executing the job again to see if the issue is resolved. If the error persists, double-check that the
warehouse_resource_selector()
function does not create a new instance each time it is called, and ensure that all assets and the job are using the
define_resources
function to obtain their resources.
s

SUBHADEEP ROY

03/09/2024, 8:31 AM
@Scout after applying your changes i am getting this error E dagster._core.errors.DagsterInvalidInvocationError: Resource initialization function has context argument, but no context was provided when invoking.
s

Scout

03/09/2024, 8:31 AM
The error message
DagsterInvalidInvocationError: Resource initialization function has context argument, but no context was provided when invoking
indicates that a resource initialization function expects a context object, but it was not provided during the invocation of the resource. This error typically occurs when you are trying to directly instantiate a resource that requires a context object for its initialization, outside of the Dagster-managed environment where the context would be automatically provided. To resolve this issue, you should use Dagster's resource initialization APIs to create instances of your resources. These APIs manage the context for you. For testing or creating resources outside of a Dagster pipeline execution, you can use
build_init_resource_context
to construct a context that can be passed to the resource initialization function. Here's an example of how you might use `build_init_resource_context`:
Copy code
python from dagster import build_init_resource_context  def define_resources():     # Create a context for resource initialization     with build_init_resource_context() as context:         # Instantiate the resources with the context         sio_instance = socket_resource(context)         db_obj1_instance = metadata_resource(context)         dw_obj_instance = warehouse_resource_selector(context)          # Return a dictionary of the instantiated resources         return {             "sio": sio_instance,             "db_obj1": db_obj1_instance,             "dw_obj": dw_obj_instance,             # Add other resources as needed         }
Please note that the above code is just an example, and the actual implementation may vary based on how your resources are defined and what arguments their initialization functions expect. If your resources do not require any specific context for initialization, you might be able to use
build_resources
to build them directly:
Copy code
python from dagster import build_resources  def define_resources():     # Instantiate and return the resources using build_resources     return build_resources({         "sio": socket_resource,         "db_obj1": metadata_resource,         "dw_obj": warehouse_resource_selector,         # Add other resources as needed     })
Again, please adjust the code to fit your specific resource definitions and initialization requirements. After making these changes, try executing the job again to see if the issue is resolved. If you continue to encounter problems, please ensure that your resource definitions and their initialization functions are correctly implemented according to the documentation.
s

SUBHADEEP ROY

03/09/2024, 8:44 AM
@Scout while using result = my_repo.get_job("all_assets_job").execute_in_process() can i pass some parameters in it so that it can be used by assets
s

Scout

03/09/2024, 8:44 AM
Yes, you can pass parameters to
execute_in_process()
that can be used by assets during execution. Specifically, you can provide
run_config
,
resources
, and
tags
as parameters to configure the execution environment for the job. Here's how you can pass these parameters: - `run_config`: This is a dictionary that specifies the configuration for the run. It can include configurations for resources, loggers, and any other configurable components of the job. - `resources`: This is a dictionary that allows you to override the resources defined in the job with new instances or configurations. This can be useful for testing or when you want to provide different resource implementations. - `tags`: This is a dictionary of string key-value pairs that allows you to attach metadata to the run. Tags can be used for filtering and organizing runs in the Dagster UI. Here's an example of how you might use
execute_in_process()
with these parameters:
Copy code
python result = my_repo.get_job("all_assets_job").execute_in_process(     run_config={         "solids": {             "my_solid": {                 "config": {                     "param1": "value1",                     "param2": "value2",                 }             }         }     },     resources={         "my_resource": my_resource_instance,     },     tags={         "key1": "value1",         "key2": "value2",     } )
In this example,
run_config
provides configuration for a hypothetical solid called
my_solid
,
resources
provides a new instance of
my_resource
, and
tags
adds metadata to the run. Please adjust the
run_config
,
resources
, and
tags
according to your job's and assets' specific requirements. The keys and structure of the
run_config
dictionary will depend on the configuration schema defined by your job and its components. Keep in mind that when using
execute_in_process
, the
executor_def
on the job will be ignored, and replaced with the in-process executor. If using the default
io_manager
, it will switch from filesystem to in-memory. This is important to consider if your assets rely on specific IO behavior.
s

SUBHADEEP ROY

03/09/2024, 8:49 AM
@Scout i have passed resources like this result = my_repo.get_job("all_assets_job").execute_in_process( resources={ "db_obj": db_obj, "dw_obj": dw_obj, }) now how can i use them in @asset
s

Scout

03/09/2024, 8:49 AM
To use the resources you've passed to
execute_in_process
within an
@asset
, you need to ensure that your asset function is set up to require the resources by specifying
required_resource_keys
. Then, within the body of the asset function, you can access the resources through the
context
object. Here's an example of how you might define an asset that requires
db_obj
and
dw_obj
resources:
Copy code
python from dagster import asset, ResourceDefinition  # Define your resources as ResourceDefinitions if not already done db_obj_resource = ResourceDefinition.hardcoded_resource(db_obj) dw_obj_resource = ResourceDefinition.hardcoded_resource(dw_obj)  # Define your asset with required_resource_keys @asset(required_resource_keys={"db_obj", "dw_obj"}) def my_asset(context):     # Access the resources from the context     db = context.resources.db_obj     dw = context.resources.dw_obj      # Use the resources as needed     db_data = db.get_data()     dw_data = dw.get_data()      # Your asset logic here     combined_data = db_data + dw_data     return combined_data
When you execute the job in process and pass the resources, the
db_obj
and
dw_obj
inside the context will be the instances you provided:
Copy code
python result = my_repo.get_job("all_assets_job").execute_in_process(     resources={         "db_obj": db_obj_resource,         "dw_obj": dw_obj_resource,     } )
Please note that in the
execute_in_process
call, you should pass instances of
ResourceDefinition
for
db_obj
and
dw_obj
. If
db_obj
and
dw_obj
are already instances of
ResourceDefinition
, you can pass them directly; otherwise, you need to wrap them using
ResourceDefinition.hardcoded_resource
. Make sure that the keys you use in
required_resource_keys
for your assets match the keys you use in the
resources
dictionary when executing the job. This ensures that the correct resources are injected into the asset's context during execution.
When you pass resources to
execute_in_process
, you are overriding the resources defined in the job's
resource_defs
. To use these resources within an
@asset
, you need to ensure that the asset has the appropriate
required_resource_keys
set, and then you can access the resources via the
context
object provided to the asset function. Here's an example of how you might define an asset that uses the
db_obj
and
dw_obj
resources:
Copy code
python from dagster import asset  @asset(required_resource_keys={"db_obj", "dw_obj"}) def my_asset(context):     # Access the resources from the context     db_resource = context.resources.db_obj     dw_resource = context.resources.dw_obj      # Use the resources as needed     result_from_db = db_resource.some_method()     result_from_dw = dw_resource.another_method()      # Return the result of your asset computation     return result_from_db, result_from_dw
In this example,
some_method
and
another_method
are placeholder methods that you would replace with the actual methods provided by your
db_obj
and
dw_obj
resources. When you execute the job with
execute_in_process
and provide the resources as shown in your example, the
db_obj
and
dw_obj
passed at execution time will be used by the asset during its computation:
Copy code
python result = my_repo.get_job("all_assets_job").execute_in_process(     resources={         "db_obj": db_obj,         "dw_obj": dw_obj,     } )
Make sure that the keys used in the
required_resource_keys
of your asset match the keys you are passing in the
resources
dictionary when calling
execute_in_process
. This ensures that the correct resources are injected into the asset's execution context.
s

SUBHADEEP ROY

03/09/2024, 8:53 AM
@Scout according to your above response i refactored my code and getting error import pandas as pd import psycopg2 from dagster import Definitions, asset, build_resources, job, repository, resource import os # from dagster_snowflake import SnowflakeResource from dagster import asset from psycopg2.extensions import connection from sqlalchemy import create_engine from typing import Dict, Any from dagster import Definitions, asset from dmax_app.orchestration.orchestration_manager.resources.patch_resource_def import ( patch_cloud_storage_selector, ) from dagster import job from dmax_app.orchestration.orchestration_manager.hooks.status_notif_hook import ( update_inprogress_status_hook, update_completed_status_hook, update_failed_status_hook, ) from dmax_app.orchestration.orchestration_manager.jobs.crawler.op_initiate_sync_crawler_job import ( initiate_sync_crawler_job, ) from dmax_app.orchestration.orchestration_manager.resources.metadata_resource import metadata_resource from dmax_app.orchestration.orchestration_manager.resources.socketio_resource import socket_resource from dmax_app.orchestration.orchestration_manager.resources.metadata_query_resource import ( metadata_query_resource, ) from dmax_app.orchestration.orchestration_manager.resources.file_storage_repo_resource import ( file_storage_repo_selector, ) from dmax_app.orchestration.orchestration_manager.resources.warehouse_resource import ( warehouse_resource_selector, ) from dmax_app.orchestration.orchestration_manager.resources.warehouse_query_provider_resource import ( warehouse_query_resource_selector, ) from dmax_app.orchestration.orchestration_manager.executors.executor_selector import ( executor_selector, ) from dmax_app.orchestration.orchestration_manager.resources.patch_resource_def import ( patch_cloud_storage_selector, ) from dmax_app.orchestration.orchestration_manager.resources.get_default_config import ( get_default_job_config, get_op_default_retry, ) from dmax_app.orchestration.orchestration_manager.resources.snowflake_crawler_resource import ( SnowflakeCrawlConnection, ) from dmax_app.orchestration.orchestration_manager.executors.executor_selector import ( executor_selector, ) from dmax_app.orchestration.orchestration_manager.loggers.setup_datamax_logger import ( setup_logger, ) from dmax_app.orchestration.orchestration_manager.resources.op_config_tag_selector import ( op_tag_selector, ) from dmax_app.orchestration.orchestration_manager.resources.get_default_config import ( get_default_job_config, get_op_default_retry, ) from dmax_app.orchestration.orchestration_manager.resources.metadata_resource import metadata_resource from dmax_app.orchestration.orchestration_enums import ( ENUM_DB_CREDENTIAL_DICT, ENUM_DB_DATABRICKS, ENUM_DW_CREDENTIAL_DICT, ) from dmax_app.orchestration.orchestration_config import ENUM_DW_DB from dmax_app.orchestration.orchestration_enums import ( ENUM_DB_DATABRICKS, ENUM_DB_SNOWFLAKE, ENUM_DB_REDSHIFT, ENUM_DW_CREDENTIAL_DICT, ENUM_DWH_MASTER_DB_PASSWORD, ENUM_DWH_MASTER_DB_USER, ENUM_WAREHOUSE_SF_ROLE_MASTER, ) from dmax_app.orchestration.orchestration_manager.resources.warehouse_resource import warehouse_resource_selector wh_res = SnowflakeCrawlConnection() l_dw_database = ENUM_DW_CREDENTIAL_DICT["database"] l_logger = setup_logger() l_db_schema = ENUM_DB_CREDENTIAL_DICT["schema"] dir_path = os.path.dirname(os.path.realpath(file)) def engine_creation(): database_name = ENUM_DB_CREDENTIAL_DICT["database"] user = ENUM_DB_CREDENTIAL_DICT["user"] password = ENUM_DB_CREDENTIAL_DICT["password"] host = ENUM_DB_CREDENTIAL_DICT["host"] port = ENUM_DB_CREDENTIAL_DICT["port"] # l_logger.info(database_name,user,password,host,port) engine = create_engine( f"postgresql//{user}{password}@{host}:{port}/{database_name}" ) return engine # @asset( resource_defs={ # "sio": socket_resource, # "db_obj1": metadata_resource, # "dw_obj": warehouse_resource_selector(), # }) @asset(required_resource_keys={"db_obj", "dw_obj"}) def show_schema_asset(context) -> pd.DataFrame: db_obj = context.resources.db_obj1 dw_obj = context.resources.dw_obj l_query = f"SHOW SCHEMAS IN DATABASE {l_dw_database};" l_logger.info(l_query) # result = context.resources.db_obj.execute_query_as_dataframe_crawl(sql_query=l_query) df_schema = dw_obj.execute_query_as_dataframe(sql_query=l_query) engine = engine_creation() df_schema.to_sql( "crawler_schemas", engine, schema=l_db_schema, if_exists="replace", index=False ) # @asset( resource_defs={"sio", "db_obj1","dw_obj"}) # @asset(resource_defs=define_resources()) # def show_table_asset(context) -> pd.DataFrame: # db_obj = context.resources.db_obj1 # dw_obj = context.resources.dw_obj # l_query = f"SHOW TABLES IN DATABASE {l_dw_database};" # l_logger.info(l_query) # df_table = dw_obj.execute_query_as_dataframe(sql_query=l_query) # engine = engine_creation() # df_table.to_sql( # "crawler_tables", engine, schema=l_db_schema, if_exists="replace", index=False # ) # @asset( resource_defs={ # "sio": socket_resource, # "db_obj1": metadata_resource, # "dw_obj": warehouse_resource_selector(), # }) # def show_col_asset(context) -> pd.DataFrame: # dw_obj = context.resources.dw_obj # l_query = f"SHOW COLUMNS IN DATABASE {l_dw_database};" # l_logger.info(l_query) # df_col = dw_obj.execute_query_as_dataframe(sql_query=l_query) # # l_logger.info(df_col) # df_col = df_col.sort_values(by=["schema_name", "table_name"]).reset_index(drop=True) # df_col["ordinal_position"] = df_col.index # engine = engine_creation() # df_col.to_sql( # "crawler_columns", engine, schema=l_db_schema, if_exists="replace", index=False # ) # @asset( resource_defs={ # "sio": socket_resource, # "db_obj1": metadata_resource, # "dw_obj": warehouse_resource_selector(), # }) # def show_view_asset(context) -> pd.DataFrame: # dw_obj = context.resources.dw_obj # l_query = f"SHOW VIEWS IN DATABASE {l_dw_database};" # l_logger.info(l_query) # df_view = dw_obj.execute_query_as_dataframe(sql_query=l_query) # engine = engine_creation() # df_view.to_sql( # "crawler_views", engine, schema=l_db_schema, if_exists="replace", index=False # ) # @asset( resource_defs={ # "sio": socket_resource, # "db_obj1": metadata_resource, # "dw_obj": warehouse_resource_selector(), # }) # def show_keys_asset(context) -> pd.DataFrame: # dw_obj = context.resources.dw_obj # l_query = f"SHOW IMPORTED KEYS IN DATABASE {l_dw_database};" # l_logger.info(l_query) # df_keys =dw_obj.execute_query_as_dataframe(sql_query=l_query) # engine = engine_creation() # df_keys.to_sql( # "crawler_constraints", # engine, # schema=l_db_schema, # if_exists="replace", # index=False, # ) # @asset( resource_defs={ # "sio": socket_resource, # "db_obj1": metadata_resource, # "dw_obj": warehouse_resource_selector(), # }) # def show_pk_keys_asset(context) -> pd.DataFrame: # dw_obj = context.resources.dw_obj # l_query = f"SHOW PRIMARY KEYS IN DATABASE {l_dw_database};" # l_logger.info(l_query) # df_pk_keys = dw_obj.execute_query_as_dataframe(sql_query=l_query) # engine = engine_creation() # df_pk_keys.to_sql( # "crawler_pk", engine, schema=l_db_schema, if_exists="replace", index=False # ) # @asset( resource_defs={ # "sio": socket_resource, # "db_obj1": metadata_resource, # "dw_obj": warehouse_resource_selector(), # }) # def show_prod_dmax_tables(context) -> pd.DataFrame: # dw_obj = context.resources.dw_obj # l_query = f"SELECT * FROM DMAX_CHANGE_VERSION.DMAX_TABLES" # l_logger.info(l_query) # df_dmax_tables = dw_obj.execute_query_as_dataframe(sql_query=l_query) # l_logger.info(df_dmax_tables) # engine = engine_creation() # df_dmax_tables.to_sql( # "crawler_prod_tables", # engine, # schema=l_db_schema, # if_exists="replace", # index=False, # ) @job( ) def all_assets_job(): show_schema_asset() # show_table_asset() # show_col_asset() # show_view_asset() # show_keys_asset() # show_pk_keys_asset() # show_prod_dmax_tables() # Repository definition that includes the assets and the job @repository def my_repo(): return [ show_schema_asset, # show_table_asset, # show_col_asset, # show_view_asset, # show_keys_asset, # show_pk_keys_asset, # show_prod_dmax_tables, all_assets_job, ] # Execute the job in process def crawler_asset_job(p_in_db_obj, l_db_schema,p_in_dw_obj): try: db_obj = p_in_db_obj dw_obj = p_in_dw_obj # schema = l_db_schema dir_path = os.path.dirname(os.path.realpath(file)) drop_view_query = """ drop table if exists {schema}.DWH_METADATA;""".format( schema=l_db_schema ) db_obj.execute_query(drop_view_query) # Asset job triggers result = my_repo.get_job("all_assets_job").execute_in_process( resources={ "db_obj": db_obj, "dw_obj": dw_obj, }) if not result.success: raise Exception("Asset job execution failed") return result except Exception as e: print("Asset job execution failed") if name == "__main__": crawler_asset_job() E dagster._core.errors.DagsterInvalidDefinitionError: resource with key 'db_obj' required by op 'show_schema_asset' was not provided. Please provide a <class 'dagster._core.definitions.resource_definition.ResourceDefinition'> to key 'db_obj', or change the required key to one of the following keys which points to an <class 'dagster._core.definitions.resource_definition.ResourceDefinition'>: ['io_manager']
s

Scout

03/09/2024, 8:53 AM
The error message you're encountering indicates that the
show_schema_asset
asset requires a resource with the key
'db_obj'
, but this resource was not provided when invoking the job. The error message also suggests that you should provide a
ResourceDefinition
for the key
'db_obj'
. Looking at your code, there are a few issues that need to be addressed: 1. The
required_resource_keys
in the
show_schema_asset
asset function is set to
{"db_obj", "dw_obj"}
, but within the function, you are trying to access
db_obj1
instead of
db_obj
. You need to correct the key to match the required resource keys. 2. You are passing instances of
db_obj
and
dw_obj
directly to
execute_in_process
, but Dagster expects
ResourceDefinition
objects, not instances. You need to ensure that the resources you pass to
execute_in_process
are wrapped in a
ResourceDefinition
or are callable functions that return the resource instance when invoked by Dagster. 3. The
@job
decorator is empty. If you have specific resources that need to be associated with the job, you should define them in the
resource_defs
argument of the
@job
decorator. Here's how you can refactor the
show_schema_asset
function to use the correct resource keys:
Copy code
python @asset(required_resource_keys={"db_obj", "dw_obj"}) def show_schema_asset(context) -> pd.DataFrame:     db_obj = context.resources.db_obj  # Use 'db_obj' instead of 'db_obj1'     dw_obj = context.resources.dw_obj     # ... rest of your code ...
To pass the resources to
execute_in_process
, you need to wrap them in
ResourceDefinition
if they are not already. If
db_obj
and
dw_obj
are functions that return the resource instance, you can use them directly. Otherwise, you'll need to create a
ResourceDefinition
that returns the instance. Here's an example of how you might do this:
Copy code
python from dagster import resource  # Assuming db_obj and dw_obj are functions that return the resource instance db_obj_resource = resource(db_obj) dw_obj_resource = resource(dw_obj)  result = my_repo.get_job("all_assets_job").execute_in_process(     resources={         "db_obj": db_obj_resource,         "dw_obj": dw_obj_resource,     } )
Make sure to adjust the
resource
definitions according to how your
db_obj
and
dw_obj
are implemented. After making these changes, try executing the job again to see if the issue is resolved. If you continue to encounter problems, please ensure that your resource definitions and their initialization functions are correctly implemented according to Dagster's documentation.
s

SUBHADEEP ROY

03/09/2024, 8:56 AM
@Scout in this can i pass simple parameters like some string value which can be used by assets , can u pls provide an example
s

Scout

03/09/2024, 8:56 AM
To pass simple parameters like string values to assets, you can use the
config
parameter in the asset definition. This allows you to specify configurable parameters that can be accessed during asset execution. Here's an example of how to define an asset that accepts a string parameter through its config:
Copy code
python from dagster import asset, Field, String  @asset(     config_schema={         "param": Field(String, is_required=True, description="A string parameter")     } ) def my_asset(context) -> str:     # Access the string parameter from the asset config     param = context.op_config["param"]     # Use the parameter in your asset logic     result = f"Received parameter: {param}"     return result
In this example,
my_asset
is an asset that requires a config with a key
param
. The
param
is a string that you can provide when you execute the job. To provide the config when executing the job, you can use the
run_config
parameter in `execute_in_process`:
Copy code
python result = my_repo.get_job("all_assets_job").execute_in_process(     run_config={         "ops": {             "my_asset": {                 "config": {                     "param": "my_string_value"                 }             }         }     } )
In the
run_config
, under the
ops
key, you specify the name of the asset (which is the same as the function name by default) and provide the config for it. The
param
key inside the
config
dictionary is where you pass the string value you want to use in the asset. Please adjust the asset name in the
run_config
to match the actual name of your asset in the job. After making these changes, the string value you pass in the
run_config
will be accessible in the asset during execution.
s

SUBHADEEP ROY

03/09/2024, 9:13 AM
@Scout in this job import pandas as pd import psycopg2 from dagster import Definitions, asset, build_resources, job, repository, resource import os # from dagster_snowflake import SnowflakeResource from dagster import asset from psycopg2.extensions import connection from sqlalchemy import create_engine from typing import Dict, Any from dagster import Definitions, asset from dmax_app.orchestration.orchestration_manager.resources.patch_resource_def import ( patch_cloud_storage_selector, ) from dagster import job from dmax_app.orchestration.orchestration_manager.hooks.status_notif_hook import ( update_inprogress_status_hook, update_completed_status_hook, update_failed_status_hook, ) from dmax_app.orchestration.orchestration_manager.jobs.crawler.op_initiate_sync_crawler_job import ( initiate_sync_crawler_job, ) from dmax_app.orchestration.orchestration_manager.resources.metadata_resource import metadata_resource from dmax_app.orchestration.orchestration_manager.resources.socketio_resource import socket_resource from dmax_app.orchestration.orchestration_manager.resources.metadata_query_resource import ( metadata_query_resource, ) from dmax_app.orchestration.orchestration_manager.resources.file_storage_repo_resource import ( file_storage_repo_selector, ) from dmax_app.orchestration.orchestration_manager.resources.warehouse_resource import ( warehouse_resource_selector, ) from dmax_app.orchestration.orchestration_manager.resources.warehouse_query_provider_resource import ( warehouse_query_resource_selector, ) from dmax_app.orchestration.orchestration_manager.executors.executor_selector import ( executor_selector, ) from dmax_app.orchestration.orchestration_manager.resources.patch_resource_def import ( patch_cloud_storage_selector, ) from dmax_app.orchestration.orchestration_manager.resources.get_default_config import ( get_default_job_config, get_op_default_retry, ) from dmax_app.orchestration.orchestration_manager.resources.snowflake_crawler_resource import ( SnowflakeCrawlConnection, ) from dmax_app.orchestration.orchestration_manager.executors.executor_selector import ( executor_selector, ) from dmax_app.orchestration.orchestration_manager.loggers.setup_datamax_logger import ( setup_logger, ) from dmax_app.orchestration.orchestration_manager.resources.op_config_tag_selector import ( op_tag_selector, ) from dmax_app.orchestration.orchestration_manager.resources.get_default_config import ( get_default_job_config, get_op_default_retry, ) from dmax_app.orchestration.orchestration_manager.resources.metadata_resource import metadata_resource from dmax_app.orchestration.orchestration_enums import ( ENUM_DB_CREDENTIAL_DICT, ENUM_DB_DATABRICKS, ENUM_DW_CREDENTIAL_DICT, ) from dmax_app.orchestration.orchestration_config import ENUM_DW_DB from dmax_app.orchestration.orchestration_enums import ( ENUM_DB_DATABRICKS, ENUM_DB_SNOWFLAKE, ENUM_DB_REDSHIFT, ENUM_DW_CREDENTIAL_DICT, ENUM_DWH_MASTER_DB_PASSWORD, ENUM_DWH_MASTER_DB_USER, ENUM_WAREHOUSE_SF_ROLE_MASTER, ) from dmax_app.orchestration.orchestration_manager.resources.warehouse_resource import warehouse_resource_selector wh_res = SnowflakeCrawlConnection() l_dw_database = ENUM_DW_CREDENTIAL_DICT["database"] l_logger = setup_logger() l_db_schema = ENUM_DB_CREDENTIAL_DICT["schema"] dir_path = os.path.dirname(os.path.realpath(file)) def engine_creation(): database_name = ENUM_DB_CREDENTIAL_DICT["database"] user = ENUM_DB_CREDENTIAL_DICT["user"] password = ENUM_DB_CREDENTIAL_DICT["password"] host = ENUM_DB_CREDENTIAL_DICT["host"] port = ENUM_DB_CREDENTIAL_DICT["port"] # l_logger.info(database_name,user,password,host,port) engine = create_engine( f"postgresql//{user}{password}@{host}:{port}/{database_name}" ) return engine # @asset @asset(required_resource_keys={"db_obj", "dw_obj"}) def show_schema_asset(context) -> pd.DataFrame: db_obj = context.resources.db_obj1 dw_obj = context.resources.dw_obj l_query = f"SHOW SCHEMAS IN DATABASE {l_dw_database};" l_logger.info(l_query) # result = context.resources.db_obj.execute_query_as_dataframe_crawl(sql_query=l_query) df_schema = dw_obj.execute_query_as_dataframe(sql_query=l_query) engine = engine_creation() df_schema.to_sql( "crawler_schemas", engine, schema=l_db_schema, if_exists="replace", index=False ) # @asset # @asset(resource_defs=define_resources()) # def show_table_asset(context) -> pd.DataFrame: # db_obj = context.resources.db_obj1 # dw_obj = context.resources.dw_obj # l_query = f"SHOW TABLES IN DATABASE {l_dw_database};" # l_logger.info(l_query) # df_table = dw_obj.execute_query_as_dataframe(sql_query=l_query) # engine = engine_creation() # df_table.to_sql( # "crawler_tables", engine, schema=l_db_schema, if_exists="replace", index=False # ) # @asset( resource_defs={ # "sio": socket_resource, # "db_obj1": metadata_resource, # "dw_obj": warehouse_resource_selector(), # }) # def show_col_asset(context) -> pd.DataFrame: # dw_obj = context.resources.dw_obj # l_query = f"SHOW COLUMNS IN DATABASE {l_dw_database};" # l_logger.info(l_query) # df_col = dw_obj.execute_query_as_dataframe(sql_query=l_query) # # l_logger.info(df_col) # df_col = df_col.sort_values(by=["schema_name", "table_name"]).reset_index(drop=True) # df_col["ordinal_position"] = df_col.index # engine = engine_creation() # df_col.to_sql( # "crawler_columns", engine, schema=l_db_schema, if_exists="replace", index=False # ) # @asset( resource_defs={ # "sio": socket_resource, # "db_obj1": metadata_resource, # "dw_obj": warehouse_resource_selector(), # }) # def show_view_asset(context) -> pd.DataFrame: # dw_obj = context.resources.dw_obj # l_query = f"SHOW VIEWS IN DATABASE {l_dw_database};" # l_logger.info(l_query) # df_view = dw_obj.execute_query_as_dataframe(sql_query=l_query) # engine = engine_creation() # df_view.to_sql( # "crawler_views", engine, schema=l_db_schema, if_exists="replace", index=False # ) # @asset( resource_defs={ # "sio": socket_resource, # "db_obj1": metadata_resource, # "dw_obj": warehouse_resource_selector(), # }) # def show_keys_asset(context) -> pd.DataFrame: # dw_obj = context.resources.dw_obj # l_query = f"SHOW IMPORTED KEYS IN DATABASE {l_dw_database};" # l_logger.info(l_query) # df_keys =dw_obj.execute_query_as_dataframe(sql_query=l_query) # engine = engine_creation() # df_keys.to_sql( # "crawler_constraints", # engine, # schema=l_db_schema, # if_exists="replace", # index=False, # ) # @asset( resource_defs={ # "sio": socket_resource, # "db_obj1": metadata_resource, # "dw_obj": warehouse_resource_selector(), # }) # def show_pk_keys_asset(context) -> pd.DataFrame: # dw_obj = context.resources.dw_obj # l_query = f"SHOW PRIMARY KEYS IN DATABASE {l_dw_database};" # l_logger.info(l_query) # df_pk_keys = dw_obj.execute_query_as_dataframe(sql_query=l_query) # engine = engine_creation() # df_pk_keys.to_sql( # "crawler_pk", engine, schema=l_db_schema, if_exists="replace", index=False # ) # @asset( resource_defs={ # "sio": socket_resource, # "db_obj1": metadata_resource, # "dw_obj": warehouse_resource_selector(), # }) # def show_prod_dmax_tables(context) -> pd.DataFrame: # dw_obj = context.resources.dw_obj # l_query = f"SELECT * FROM DMAX_CHANGE_VERSION.DMAX_TABLES" # l_logger.info(l_query) # df_dmax_tables = dw_obj.execute_query_as_dataframe(sql_query=l_query) # l_logger.info(df_dmax_tables) # engine = engine_creation() # df_dmax_tables.to_sql( # "crawler_prod_tables", # engine, # schema=l_db_schema, # if_exists="replace", # index=False, # ) @job( ) def all_assets_job(): show_schema_asset() # show_table_asset() # show_col_asset() # show_view_asset() # show_keys_asset() # show_pk_keys_asset() # show_prod_dmax_tables() # Repository definition that includes the assets and the job @repository def my_repo(): return [ show_schema_asset, # show_table_asset, # show_col_asset, # show_view_asset, # show_keys_asset, # show_pk_keys_asset, # show_prod_dmax_tables, all_assets_job, ] # Execute the job in process def crawler_asset_job(p_in_db_obj, l_db_schema,p_in_dw_obj): try: db_obj = p_in_db_obj dw_obj = p_in_dw_obj # schema = l_db_schema dir_path = os.path.dirname(os.path.realpath(file)) drop_view_query = """ drop table if exists {schema}.DWH_METADATA;""".format( schema=l_db_schema ) db_obj.execute_query(drop_view_query) # Asset job triggers result = my_repo.get_job("all_assets_job").execute_in_process() if not result.success: raise Exception("Asset job execution failed") return result except Exception as e: print("Asset job execution failed") if name == "__main__": crawler_asset_job() how can i pass db_obj and dw_obj in my_repo.get_job("all_assets_job").execute_in_process() so that they can be used by asset
s

Scout

03/09/2024, 9:13 AM
To pass
db_obj
and
dw_obj
so that they can be used by an asset within a job, you need to provide them as resources when calling
execute_in_process()
. However, you cannot pass raw objects directly; they must be wrapped in a
ResourceDefinition
. If
db_obj
and
dw_obj
are functions that return the resource instance, you can use the
resource
decorator to create a
ResourceDefinition
. Here's an example of how you might wrap
db_obj
and
dw_obj
in
ResourceDefinition
and pass them to `execute_in_process()`:
Copy code
python from dagster import resource  # Assuming db_obj and dw_obj are functions that return the resource instance @resource def db_obj_resource(_):     return db_obj()  @resource def dw_obj_resource(_):     return dw_obj()  # Now you can pass the resources when executing the job result = my_repo.get_job("all_assets_job").execute_in_process(     resources={         "db_obj": db_obj_resource,         "dw_obj": dw_obj_resource,     } )
In the asset definition, you need to ensure that the
required_resource_keys
match the keys you are passing in the
resources
dictionary:
Copy code
python @asset(required_resource_keys={"db_obj", "dw_obj"}) def show_schema_asset(context) -> pd.DataFrame:     # Access the resources from the context     db_obj = context.resources.db_obj     dw_obj = context.resources.dw_obj     # ... rest of your code ...
Make sure that the
db_obj
and
dw_obj
functions are correctly defined and return the expected resource instances. After making these changes, the
db_obj
and
dw_obj
resources will be accessible in the asset during execution.
5 Views