@Scout i have an asset job import pandas as pd
from dagster import asset, job, repository
import os
# from dagster_snowflake import SnowflakeResource
from dagster import asset
from sqlalchemy import create_engine
from dagster import asset
from dmax_app.orchestration.orchestration_manager.resources.metadata_resource import metadata_resource
from dmax_app.orchestration.orchestration_manager.resources.patch_resource_def import patch_cloud_storage_selector
from dmax_app.orchestration.orchestration_manager.resources.snowflake_crawler_resource import (
SnowflakeCrawlConnection,
)
from dmax_app.orchestration.orchestration_manager.loggers.setup_datamax_logger import (
setup_logger,
)
from dmax_app.orchestration.orchestration_enums import (
ENUM_DB_CREDENTIAL_DICT,
ENUM_DW_CREDENTIAL_DICT,
)
from dmax_app.orchestration.orchestration_enums import (
ENUM_DW_CREDENTIAL_DICT,
)
from dmax_app.orchestration.orchestration_manager.resources.warehouse_resource import warehouse_resource_selector
# wh_res = SnowflakeCrawlConnection()
l_dw_database = ENUM_DW_CREDENTIAL_DICT["database"]
l_logger = setup_logger()
l_db_schema = ENUM_DB_CREDENTIAL_DICT["schema"]
dir_path = os.path.dirname(os.path.realpath(
file))
def engine_creation():
database_name = ENUM_DB_CREDENTIAL_DICT["database"]
user = ENUM_DB_CREDENTIAL_DICT["user"]
password = ENUM_DB_CREDENTIAL_DICT["password"]
host = ENUM_DB_CREDENTIAL_DICT["host"]
port = ENUM_DB_CREDENTIAL_DICT["port"]
#
l_logger.info(database_name,user,password,host,port)
engine = create_engine(
f"postgresql
//{user}{password}@{host}:{port}/{database_name}"
)
return engine
dw_obj =warehouse_resource_selector()
@asset( resource_defs=patch_cloud_storage_selector({
"db_obj": metadata_resource,
"dw_obj": dw_obj,
}))
def show_schema_asset(context) -> pd.DataFrame:
# wh_ress = warehouse_resource_selector()
db_obj = context.resources.db_obj
dw_obj = context.resources.dw_obj
l_query = f"SHOW SCHEMAS IN DATABASE {l_dw_database};"
l_logger.info(l_query)
# result = context.resources.db_obj.execute_query_as_dataframe(sql_query=l_query)
df_schema = dw_obj.execute_query_as_dataframe(sql_query=l_query)
engine = engine_creation()
df_schema.to_sql(
"crawler_schemas", engine, schema=l_db_schema, if_exists="replace", index=False
)
@asset( resource_defs=patch_cloud_storage_selector({
"db_obj": metadata_resource,
"dw_obj": dw_obj,
}))
def show_table_asset(context) -> pd.DataFrame:
db_obj = context.resources.db_obj
dw_obj = context.resources.dw_obj
l_query = f"SHOW TABLES IN DATABASE {l_dw_database};"
l_logger.info(l_query)
df_table = dw_obj.execute_query_as_dataframe(sql_query=l_query)
engine = engine_creation()
df_table.to_sql(
"crawler_tables", engine, schema=l_db_schema, if_exists="replace", index=False
)
@asset( resource_defs=patch_cloud_storage_selector({
"db_obj": metadata_resource,
"dw_obj": dw_obj,
}))
def show_col_asset(context) -> pd.DataFrame:
db_obj = context.resources.db_obj
dw_obj = context.resources.dw_obj
l_query = f"SHOW COLUMNS IN DATABASE {l_dw_database};"
l_logger.info(l_query)
df_col = dw_obj.execute_query_as_dataframe(sql_query=l_query)
#
l_logger.info(df_col)
df_col = df_col.sort_values(by=["schema_name", "table_name"]).reset_index(drop=True)
df_col["ordinal_position"] = df_col.index
engine = engine_creation()
df_col.to_sql(
"crawler_columns", engine, schema=l_db_schema, if_exists="replace", index=False
)
@asset( resource_defs=patch_cloud_storage_selector({
"db_obj": metadata_resource,
"dw_obj": dw_obj,
}))
def show_view_asset(context) -> pd.DataFrame:
db_obj = context.resources.db_obj
dw_obj = context.resources.dw_obj
l_query = f"SHOW VIEWS IN DATABASE {l_dw_database};"
l_logger.info(l_query)
df_view = dw_obj.execute_query_as_dataframe(sql_query=l_query)
engine = engine_creation()
df_view.to_sql(
"crawler_views", engine, schema=l_db_schema, if_exists="replace", index=False
)
@asset( resource_defs=patch_cloud_storage_selector({
"db_obj": metadata_resource,
"dw_obj": dw_obj,
}))
def show_keys_asset(context) -> pd.DataFrame:
db_obj = context.resources.db_obj
dw_obj = context.resources.dw_obj
l_query = f"SHOW IMPORTED KEYS IN DATABASE {l_dw_database};"
l_logger.info(l_query)
df_keys = dw_obj.execute_query_as_dataframe(sql_query=l_query)
engine = engine_creation()
df_keys.to_sql(
"crawler_constraints",
engine,
schema=l_db_schema,
if_exists="replace",
index=False,
)
@asset( resource_defs=patch_cloud_storage_selector({
"db_obj": metadata_resource,
"dw_obj": dw_obj,
}))
def show_pk_keys_asset(context) -> pd.DataFrame:
db_obj = context.resources.db_obj
dw_obj = context.resources.dw_obj
l_query = f"SHOW PRIMARY KEYS IN DATABASE {l_dw_database};"
l_logger.info(l_query)
df_pk_keys = dw_obj.execute_query_as_dataframe(sql_query=l_query)
engine = engine_creation()
df_pk_keys.to_sql(
"crawler_pk", engine, schema=l_db_schema, if_exists="replace", index=False
)
@asset( resource_defs=patch_cloud_storage_selector({
"db_obj": metadata_resource,
"dw_obj": dw_obj,
}))
def show_prod_dmax_tables(context) -> pd.DataFrame:
db_obj = context.resources.db_obj
dw_obj = context.resources.dw_obj
l_query = f"SELECT * FROM DMAX_CHANGE_VERSION.DMAX_TABLES"
l_logger.info(l_query)
df_dmax_tables = dw_obj.execute_query_as_dataframe(sql_query=l_query)
l_logger.info(df_dmax_tables)
engine = engine_creation()
df_dmax_tables.to_sql(
"crawler_prod_tables",
engine,
schema=l_db_schema,
if_exists="replace",
index=False,
)
@asset( resource_defs=patch_cloud_storage_selector({
"db_obj": metadata_resource,
"dw_obj": dw_obj,
}))
def dwh_metadata_creation(context,show_schema_asset,show_table_asset,show_col_asset,show_view_asset,show_keys_asset,show_pk_keys_asset,show_prod_dmax_tables) -> pd.DataFrame:
db_obj = context.resources.db_obj
dw_obj = context.resources.dw_obj
l_query_dwh_metadata = dir_path + "/delta_generation_scripts/dwh_metadata.sql"
l_query_dwh = open(l_query_dwh_metadata).read().format(schema=l_db_schema)
l_query_dwh_metadata = """CREATE TABLE DWH_METADATA AS {query}""".format(
query=l_query_dwh
)
db_obj.execute_query(l_query_dwh_metadata)
@asset( resource_defs=patch_cloud_storage_selector({
"db_obj": metadata_resource,
"dw_obj": dw_obj,
}))
def insert_delta_asset(context,dwh_metadata_creation):
db_obj = context.resources.db_obj
dw_obj = context.resources.dw_obj
l_query_insert_delta = (
dir_path + "/delta_generation_scripts/crawler_identify_inserts.sql"
)
l_query_insert = open(l_query_insert_delta).read().format(schema=l_db_schema)
l_query_insert_delta = """drop table if exists crawler_identify_inserts;
CREATE TABLE crawler_identify_inserts AS {query}""".format(
query=l_query_insert
)
db_obj.execute_query(l_query_insert_delta)
@asset( resource_defs=patch_cloud_storage_selector({
"db_obj": metadata_resource,
"dw_obj": dw_obj,
}))
def delete_delta_asset(context,dwh_metadata_creation):
db_obj = context.resources.db_obj
dw_obj = context.resources.dw_obj
l_query_delete_delta = (
dir_path + "/delta_generation_scripts/crawler_identify_deletes.sql"
)
l_query_delete = open(l_query_delete_delta).read().format(schema=l_db_schema)
l_query_delete_delta = """drop table if exists crawler_identify_ideletes;
CREATE TABLE crawler_identify_ideletes AS {query}""".format(
query=l_query_delete
)
db_obj.execute_query(l_query_delete_delta)
@asset( resource_defs=patch_cloud_storage_selector({
"db_obj": metadata_resource,
"dw_obj": dw_obj,
}))
def update_delta_asset(context,dwh_metadata_creation):
db_obj = context.resources.db_obj
dw_obj = context.resources.dw_obj
l_query_update_delta = (
dir_path + "/delta_generation_scripts/crawler_identify_updates.sql"
)
l_query_update = open(l_query_update_delta).read().format(schema=l_db_schema)
l_query_update_delta = """drop table if exists crawler_identify_updates;
CREATE TABLE crawler_identify_updates AS {query}""".format(
query=l_query_update
)
db_obj.execute_query(l_query_update_delta)
@job(
resource_defs=patch_cloud_storage_selector({
"db_obj": metadata_resource,
"dw_obj": warehouse_resource_selector(),
})
)
def all_assets_job():
show_schema_asset()
show_table_asset()
show_schema_asset()
show_table_asset()
show_col_asset()
show_view_asset()
show_keys_asset()
show_pk_keys_asset()
show_prod_dmax_tables()
dwh_metadata_creation(show_schema_asset(),show_table_asset(),show_col_asset(),show_view_asset(),show_keys_asset(),show_pk_keys_asset(),show_prod_dmax_tables())
insert_delta_asset(dwh_metadata_creation())
delete_delta_asset(dwh_metadata_creation())
update_delta_asset(dwh_metadata_creation())
# Repository definition that includes the assets and the job
@repository
def my_repo():
return [
show_schema_asset,
show_table_asset,
show_col_asset,
show_view_asset,
show_keys_asset,
show_pk_keys_asset,
show_prod_dmax_tables,
dwh_metadata_creation,
insert_delta_asset,
delete_delta_asset,
update_delta_asset,
all_assets_job
]
# Execute the job in process
def crawler_asset_job(p_in_db_obj, l_db_schema):
try:
db_obj = p_in_db_obj
# schema = l_db_schema
dir_path = os.path.dirname(os.path.realpath(
file))
drop_view_query = """ drop table if exists {schema}.DWH_METADATA;""".format(
schema=l_db_schema
)
db_obj.execute_query(drop_view_query)
# Asset job triggers
result = my_repo.get_job("all_assets_job").execute_in_process()
if not result.success:
raise Exception("Asset job execution failed")
return result
except Exception as e:
print("Asset job execution failed")
if
name == "__main__":
crawler_asset_job() i am getting this error 'Missing required config entry "ops" at the root. Sample config for missing entry: {\'ops\': {\'dwh_metadata_creation_2\': {\'inputs\': {\'show_col_asset\': \'<selector>\', \'show_keys_asset\': \'<selector>\', \'show_pk_keys_asset\': \'<selector>\', \'show_prod_dmax_tables\': \'<selector>\', \'show_schema_asset\': \'<selector>\', \'show_table_asset\': \'<selector>\', \'show_view_asset\': \'<selector>\'}}, \'dwh_metadata_creation_3\': {\'inputs\': {\'show_col_asset\': \'<selector>\', \'show_keys_asset\': \'<selector>\', \'show_pk_keys_asset\': \'<selector>\', \'show_prod_dmax_tables\': \'<selector>\', \'show_schema_asset\': \'<selector>\', \'show_table_asset\': \'<selector>\', \'show_view_asset\': \'<selector>\'}}, \'dwh_metadata_creation_4\': {\'inputs\': {\'show_col_asset\': \'<selector>\', \'show_keys_asset\': \'<selector>\', \'show_pk_keys_asset\': \'<selector>\', \'show_prod_dmax_tables\': \'<selector>\', \'show_schema_asset\': \'<selector>\', \'show_table_asset\': \'<selector>\', \'show_view_asset\': \'<selector>\'}}}}' solve this issue