<@U0667DNC02Y> i have an asset job given below in ...
# ask-ai
s
@Scout i have an asset job given below in which i am using resources in asset ,now how can i use these resources in below assets too?import pandas as pd from dagster import asset, job, repository import os # from dagster_snowflake import SnowflakeResource from dagster import asset from sqlalchemy import create_engine from dagster import asset from dmax_app.orchestration.orchestration_manager.resources.metadata_resource import metadata_resource from dmax_app.orchestration.orchestration_manager.resources.patch_resource_def import patch_cloud_storage_selector from dmax_app.orchestration.orchestration_manager.resources.snowflake_crawler_resource import ( SnowflakeCrawlConnection, ) from dmax_app.orchestration.orchestration_manager.loggers.setup_datamax_logger import ( setup_logger, ) from dmax_app.orchestration.orchestration_enums import ( ENUM_DB_CREDENTIAL_DICT, ENUM_DW_CREDENTIAL_DICT, ) from dmax_app.orchestration.orchestration_enums import ( ENUM_DW_CREDENTIAL_DICT, ) from dmax_app.orchestration.orchestration_manager.resources.warehouse_resource import warehouse_resource_selector wh_res = SnowflakeCrawlConnection() l_dw_database = ENUM_DW_CREDENTIAL_DICT["database"] l_logger = setup_logger() l_db_schema = ENUM_DB_CREDENTIAL_DICT["schema"] dir_path = os.path.dirname(os.path.realpath(file)) def engine_creation(): database_name = ENUM_DB_CREDENTIAL_DICT["database"] user = ENUM_DB_CREDENTIAL_DICT["user"] password = ENUM_DB_CREDENTIAL_DICT["password"] host = ENUM_DB_CREDENTIAL_DICT["host"] port = ENUM_DB_CREDENTIAL_DICT["port"] # l_logger.info(database_name,user,password,host,port) engine = create_engine( f"postgresql//{user}{password}@{host}:{port}/{database_name}" ) return engine @asset( resource_defs=patch_cloud_storage_selector({ "db_obj": metadata_resource, "dw_obj": warehouse_resource_selector(), })) def show_schema_asset(context) -> pd.DataFrame: # wh_ress = warehouse_resource_selector() db_obj = context.resources.db_obj dw_obj = context.resources.dw_obj l_query = f"SHOW SCHEMAS IN DATABASE {l_dw_database};" l_logger.info(l_query) # result = context.resources.db_obj.execute_query_as_dataframe_crawl(sql_query=l_query) df_schema = wh_res.execute_query_as_dataframe_crawl(sql_query=l_query) return df_schema @asset def show_table_asset(context) -> pd.DataFrame: l_query = f"SHOW TABLES IN DATABASE {l_dw_database};" l_logger.info(l_query) df_table = wh_res.execute_query_as_dataframe_crawl(sql_query=l_query) return df_table @asset def show_col_asset(context) -> pd.DataFrame: l_query = f"SHOW COLUMNS IN DATABASE {l_dw_database};" l_logger.info(l_query) df_col = wh_res.execute_query_as_dataframe_crawl(sql_query=l_query) # l_logger.info(df_col) df_col = df_col.sort_values(by=["schema_name", "table_name"]).reset_index(drop=True) df_col["ordinal_position"] = df_col.index return df_col @asset def show_view_asset(context) -> pd.DataFrame: l_query = f"SHOW VIEWS IN DATABASE {l_dw_database};" l_logger.info(l_query) df_view = wh_res.execute_query_as_dataframe_crawl(sql_query=l_query) return df_view @asset def show_keys_asset(context) -> pd.DataFrame: l_query = f"SHOW IMPORTED KEYS IN DATABASE {l_dw_database};" l_logger.info(l_query) df_keys = wh_res.execute_query_as_dataframe_crawl(sql_query=l_query) return df_keys @asset def show_pk_keys_asset(context) -> pd.DataFrame: l_query = f"SHOW PRIMARY KEYS IN DATABASE {l_dw_database};" l_logger.info(l_query) df_pk_keys = wh_res.execute_query_as_dataframe_crawl(sql_query=l_query) return df_pk_keys @asset def show_prod_dmax_tables(context) -> pd.DataFrame: l_query = f"SELECT * FROM DMAX_CHANGE_VERSION.DMAX_TABLES" l_logger.info(l_query) df_dmax_tables = wh_res.execute_query_as_dataframe_crawl(sql_query=l_query) l_logger.info(df_dmax_tables) return df_dmax_tables @asset() def dmax_schema_asset(show_schema_asset): engine = engine_creation() show_schema_asset.to_sql( "crawler_schemas", engine, schema=l_db_schema, if_exists="replace", index=False ) @asset def dmax_table_asset(show_table_asset): engine = engine_creation() show_table_asset.to_sql( "crawler_tables", engine, schema=l_db_schema, if_exists="replace", index=False ) @asset def dmax_col_asset(show_col_asset): engine = engine_creation() show_col_asset.to_sql( "crawler_columns", engine, schema=l_db_schema, if_exists="replace", index=False ) @asset def dmax_view_asset(show_view_asset): engine = engine_creation() show_view_asset.to_sql( "crawler_views", engine, schema=l_db_schema, if_exists="replace", index=False ) @asset def dmax_key_asset(show_keys_asset): engine = engine_creation() show_keys_asset.to_sql( "crawler_constraints", engine, schema=l_db_schema, if_exists="replace", index=False, ) @asset def dmax_pk_key_asset(show_pk_keys_asset): engine = engine_creation() show_pk_keys_asset.to_sql( "crawler_pk", engine, schema=l_db_schema, if_exists="replace", index=False ) @asset def dmax_prod_table_asset(show_prod_dmax_tables): engine = engine_creation() show_prod_dmax_tables.to_sql( "crawler_prod_tables", engine, schema=l_db_schema, if_exists="replace", index=False, ) @job( resource_defs=patch_cloud_storage_selector({ "db_obj": metadata_resource, "dw_obj": warehouse_resource_selector(), }) ) def all_assets_job(): dmax_schema_asset(show_schema_asset()) dmax_table_asset(show_table_asset()) dmax_col_asset(show_col_asset()) dmax_view_asset(show_view_asset()) dmax_key_asset(show_keys_asset()) dmax_pk_key_asset(show_pk_keys_asset()) dmax_prod_table_asset(show_prod_dmax_tables()) # Repository definition that includes the assets and the job @repository def my_repo(): return [ show_schema_asset, show_table_asset, show_col_asset, show_view_asset, show_keys_asset, show_pk_keys_asset, show_prod_dmax_tables, dmax_schema_asset, dmax_table_asset, dmax_col_asset, dmax_view_asset, dmax_key_asset, dmax_pk_key_asset, dmax_prod_table_asset, all_assets_job, ] # Execute the job in process def crawler_asset_job(p_in_db_obj, l_db_schema): try: db_obj = p_in_db_obj # schema = l_db_schema dir_path = os.path.dirname(os.path.realpath(file)) drop_view_query = """ drop table if exists {schema}.DWH_METADATA;""".format( schema=l_db_schema ) db_obj.execute_query(drop_view_query) # Asset job triggers result = my_repo.get_job("all_assets_job").execute_in_process() if not result.success: raise Exception("Asset job execution failed") # # Creating deltas insert , update , delete along with DWH metadata # l_query_dwh_metadata = dir_path + "/delta_generation_scripts/dwh_metadata.sql" # l_query_dwh = open(l_query_dwh_metadata).read().format(schema=l_db_schema) # l_query_dwh_metadata = """CREATE TABLE DWH_METADATA AS {query}""".format( # query=l_query_dwh # ) # db_obj.execute_query(l_query_dwh_metadata) # # Insert Delta table # l_query_insert_delta = ( # dir_path + "/delta_generation_scripts/crawler_identify_inserts.sql" # ) # l_query_insert = open(l_query_insert_delta).read().format(schema=l_db_schema) # l_query_insert_delta = """drop table if exists crawler_identify_inserts; # CREATE TABLE crawler_identify_inserts AS {query}""".format( # query=l_query_insert # ) # db_obj.execute_query(l_query_insert_delta) # # Delete Delta table # l_query_delete_delta = ( # dir_path + "/delta_generation_scripts/crawler_identify_deletes.sql" # ) # l_query_delete = open(l_query_delete_delta).read().format(schema=l_db_schema) # l_query_delete_delta = """drop table if exists crawler_identify_ideletes; # CREATE TABLE crawler_identify_ideletes AS {query}""".format( # query=l_query_delete # ) # db_obj.execute_query(l_query_delete_delta) # # Update Delta table # l_query_update_delta = ( # dir_path + "/delta_generation_scripts/crawler_identify_updates.sql" # ) # l_query_update = open(l_query_update_delta).read().format(schema=l_db_schema) # l_query_update_delta = """drop table if exists crawler_identify_updates; # CREATE TABLE crawler_identify_updates AS {query}""".format( # query=l_query_update # ) # db_obj.execute_query(l_query_update_delta) return result except Exception as e: print("Asset job execution failed") if name == "__main__": crawler_asset_job()
s
To use resources across multiple assets in Dagster, you can define the resources at the job level and then reference them within each asset. In the code you've provided, you've already defined a job called
all_assets_job
with a
resource_defs
argument that specifies the resources to be used by the assets within that job. Here's a step-by-step guide on how to use these resources in your assets: 1. Define your resources: You have already defined your resources such as
metadata_resource
,
warehouse_resource_selector
, and others. 2. Configure your job with resources: You have a job
all_assets_job
that is configured with the resources using
resource_defs
. 3. Use the resources in your assets: Within each asset function, you can access the resources through the
context
object. For example, in your
show_schema_asset
asset, you are accessing
db_obj
and
dw_obj
resources with
context.resources.db_obj
and
context.resources.dw_obj
. To ensure that all assets have access to the same resources, you should define the
resource_defs
at the job level, as you have done, and then all assets within that job will have access to those resources. If you want to use the same resources in assets that are not part of the
all_assets_job
, you would need to define a new job that includes those assets and is configured with the same
resource_defs
. Here's an example of how you might define another job that uses the same resources:
Copy code
python @job(     resource_defs=patch_cloud_storage_selector({         "db_obj": metadata_resource,         "dw_obj": warehouse_resource_selector(),     }) ) def another_assets_job():     # Define the execution logic for your assets here     pass
Then, include this job in your repository:
Copy code
python @repository def my_repo():     return [         # ... other assets and jobs ...         another_assets_job,     ]
Remember that when you execute a job, all the assets within that job will have access to the resources defined in the
resource_defs
of that job. If you need to use the same resources across different jobs, you should configure each job with the same
resource_defs
. Lastly, ensure that the resources are correctly instantiated and passed to the job when you execute it, either through environment variables, configuration files, or direct instantiation, as shown in your
crawler_asset_job
function.