@Scout i have an asset job given below in which i am using resources in asset ,now how can i use these resources in below assets too?import pandas as pd
from dagster import asset, job, repository
import os
# from dagster_snowflake import SnowflakeResource
from dagster import asset
from sqlalchemy import create_engine
from dagster import asset
from dmax_app.orchestration.orchestration_manager.resources.metadata_resource import metadata_resource
from dmax_app.orchestration.orchestration_manager.resources.patch_resource_def import patch_cloud_storage_selector
from dmax_app.orchestration.orchestration_manager.resources.snowflake_crawler_resource import (
SnowflakeCrawlConnection,
)
from dmax_app.orchestration.orchestration_manager.loggers.setup_datamax_logger import (
setup_logger,
)
from dmax_app.orchestration.orchestration_enums import (
ENUM_DB_CREDENTIAL_DICT,
ENUM_DW_CREDENTIAL_DICT,
)
from dmax_app.orchestration.orchestration_enums import (
ENUM_DW_CREDENTIAL_DICT,
)
from dmax_app.orchestration.orchestration_manager.resources.warehouse_resource import warehouse_resource_selector
wh_res = SnowflakeCrawlConnection()
l_dw_database = ENUM_DW_CREDENTIAL_DICT["database"]
l_logger = setup_logger()
l_db_schema = ENUM_DB_CREDENTIAL_DICT["schema"]
dir_path = os.path.dirname(os.path.realpath(
file))
def engine_creation():
database_name = ENUM_DB_CREDENTIAL_DICT["database"]
user = ENUM_DB_CREDENTIAL_DICT["user"]
password = ENUM_DB_CREDENTIAL_DICT["password"]
host = ENUM_DB_CREDENTIAL_DICT["host"]
port = ENUM_DB_CREDENTIAL_DICT["port"]
#
l_logger.info(database_name,user,password,host,port)
engine = create_engine(
f"postgresql
//{user}{password}@{host}:{port}/{database_name}"
)
return engine
@asset( resource_defs=patch_cloud_storage_selector({
"db_obj": metadata_resource,
"dw_obj": warehouse_resource_selector(),
}))
def show_schema_asset(context) -> pd.DataFrame:
# wh_ress = warehouse_resource_selector()
db_obj = context.resources.db_obj
dw_obj = context.resources.dw_obj
l_query = f"SHOW SCHEMAS IN DATABASE {l_dw_database};"
l_logger.info(l_query)
# result = context.resources.db_obj.execute_query_as_dataframe_crawl(sql_query=l_query)
df_schema = wh_res.execute_query_as_dataframe_crawl(sql_query=l_query)
return df_schema
@asset
def show_table_asset(context) -> pd.DataFrame:
l_query = f"SHOW TABLES IN DATABASE {l_dw_database};"
l_logger.info(l_query)
df_table = wh_res.execute_query_as_dataframe_crawl(sql_query=l_query)
return df_table
@asset
def show_col_asset(context) -> pd.DataFrame:
l_query = f"SHOW COLUMNS IN DATABASE {l_dw_database};"
l_logger.info(l_query)
df_col = wh_res.execute_query_as_dataframe_crawl(sql_query=l_query)
#
l_logger.info(df_col)
df_col = df_col.sort_values(by=["schema_name", "table_name"]).reset_index(drop=True)
df_col["ordinal_position"] = df_col.index
return df_col
@asset
def show_view_asset(context) -> pd.DataFrame:
l_query = f"SHOW VIEWS IN DATABASE {l_dw_database};"
l_logger.info(l_query)
df_view = wh_res.execute_query_as_dataframe_crawl(sql_query=l_query)
return df_view
@asset
def show_keys_asset(context) -> pd.DataFrame:
l_query = f"SHOW IMPORTED KEYS IN DATABASE {l_dw_database};"
l_logger.info(l_query)
df_keys = wh_res.execute_query_as_dataframe_crawl(sql_query=l_query)
return df_keys
@asset
def show_pk_keys_asset(context) -> pd.DataFrame:
l_query = f"SHOW PRIMARY KEYS IN DATABASE {l_dw_database};"
l_logger.info(l_query)
df_pk_keys = wh_res.execute_query_as_dataframe_crawl(sql_query=l_query)
return df_pk_keys
@asset
def show_prod_dmax_tables(context) -> pd.DataFrame:
l_query = f"SELECT * FROM DMAX_CHANGE_VERSION.DMAX_TABLES"
l_logger.info(l_query)
df_dmax_tables = wh_res.execute_query_as_dataframe_crawl(sql_query=l_query)
l_logger.info(df_dmax_tables)
return df_dmax_tables
@asset()
def dmax_schema_asset(show_schema_asset):
engine = engine_creation()
show_schema_asset.to_sql(
"crawler_schemas", engine, schema=l_db_schema, if_exists="replace", index=False
)
@asset
def dmax_table_asset(show_table_asset):
engine = engine_creation()
show_table_asset.to_sql(
"crawler_tables", engine, schema=l_db_schema, if_exists="replace", index=False
)
@asset
def dmax_col_asset(show_col_asset):
engine = engine_creation()
show_col_asset.to_sql(
"crawler_columns", engine, schema=l_db_schema, if_exists="replace", index=False
)
@asset
def dmax_view_asset(show_view_asset):
engine = engine_creation()
show_view_asset.to_sql(
"crawler_views", engine, schema=l_db_schema, if_exists="replace", index=False
)
@asset
def dmax_key_asset(show_keys_asset):
engine = engine_creation()
show_keys_asset.to_sql(
"crawler_constraints",
engine,
schema=l_db_schema,
if_exists="replace",
index=False,
)
@asset
def dmax_pk_key_asset(show_pk_keys_asset):
engine = engine_creation()
show_pk_keys_asset.to_sql(
"crawler_pk", engine, schema=l_db_schema, if_exists="replace", index=False
)
@asset
def dmax_prod_table_asset(show_prod_dmax_tables):
engine = engine_creation()
show_prod_dmax_tables.to_sql(
"crawler_prod_tables",
engine,
schema=l_db_schema,
if_exists="replace",
index=False,
)
@job(
resource_defs=patch_cloud_storage_selector({
"db_obj": metadata_resource,
"dw_obj": warehouse_resource_selector(),
})
)
def all_assets_job():
dmax_schema_asset(show_schema_asset())
dmax_table_asset(show_table_asset())
dmax_col_asset(show_col_asset())
dmax_view_asset(show_view_asset())
dmax_key_asset(show_keys_asset())
dmax_pk_key_asset(show_pk_keys_asset())
dmax_prod_table_asset(show_prod_dmax_tables())
# Repository definition that includes the assets and the job
@repository
def my_repo():
return [
show_schema_asset,
show_table_asset,
show_col_asset,
show_view_asset,
show_keys_asset,
show_pk_keys_asset,
show_prod_dmax_tables,
dmax_schema_asset,
dmax_table_asset,
dmax_col_asset,
dmax_view_asset,
dmax_key_asset,
dmax_pk_key_asset,
dmax_prod_table_asset,
all_assets_job,
]
# Execute the job in process
def crawler_asset_job(p_in_db_obj, l_db_schema):
try:
db_obj = p_in_db_obj
# schema = l_db_schema
dir_path = os.path.dirname(os.path.realpath(
file))
drop_view_query = """ drop table if exists {schema}.DWH_METADATA;""".format(
schema=l_db_schema
)
db_obj.execute_query(drop_view_query)
# Asset job triggers
result = my_repo.get_job("all_assets_job").execute_in_process()
if not result.success:
raise Exception("Asset job execution failed")
# # Creating deltas insert , update , delete along with DWH metadata
# l_query_dwh_metadata = dir_path + "/delta_generation_scripts/dwh_metadata.sql"
# l_query_dwh = open(l_query_dwh_metadata).read().format(schema=l_db_schema)
# l_query_dwh_metadata = """CREATE TABLE DWH_METADATA AS {query}""".format(
# query=l_query_dwh
# )
# db_obj.execute_query(l_query_dwh_metadata)
# # Insert Delta table
# l_query_insert_delta = (
# dir_path + "/delta_generation_scripts/crawler_identify_inserts.sql"
# )
# l_query_insert = open(l_query_insert_delta).read().format(schema=l_db_schema)
# l_query_insert_delta = """drop table if exists crawler_identify_inserts;
# CREATE TABLE crawler_identify_inserts AS {query}""".format(
# query=l_query_insert
# )
# db_obj.execute_query(l_query_insert_delta)
# # Delete Delta table
# l_query_delete_delta = (
# dir_path + "/delta_generation_scripts/crawler_identify_deletes.sql"
# )
# l_query_delete = open(l_query_delete_delta).read().format(schema=l_db_schema)
# l_query_delete_delta = """drop table if exists crawler_identify_ideletes;
# CREATE TABLE crawler_identify_ideletes AS {query}""".format(
# query=l_query_delete
# )
# db_obj.execute_query(l_query_delete_delta)
# # Update Delta table
# l_query_update_delta = (
# dir_path + "/delta_generation_scripts/crawler_identify_updates.sql"
# )
# l_query_update = open(l_query_update_delta).read().format(schema=l_db_schema)
# l_query_update_delta = """drop table if exists crawler_identify_updates;
# CREATE TABLE crawler_identify_updates AS {query}""".format(
# query=l_query_update
# )
# db_obj.execute_query(l_query_update_delta)
return result
except Exception as e:
print("Asset job execution failed")
if
name == "__main__":
crawler_asset_job()