Rohan Ahire
09/21/2021, 3:07 PMselect count(*) from {full_table_name}
. Here is what I have tried out so far. I want to make sure that this is a correct approach and please suggest if there are better alternatives.
import os
from pathlib import Path
from dagster import ModeDefinition, execute_pipeline, pipeline, repository, resource, solid
from datalake_operations_with_dagster.resources import teradata_resource
project_root_dir = Path(__file__).parent.parent
@solid(required_resource_keys={"teradata"})
def sql_parameterization_execution(context):
full_table_name = context.solid_config["full_table_name"]
with open(f"{project_root_dir}/queries/read_table.sql") as f:
query = f.read()
with context.resources.teradata as conn:
<http://context.log.info|context.log.info>("Executing query")
<http://context.log.info|context.log.info>(query.format(full_table_name=full_table_name))
conn.execute(query.format(full_table_name=full_table_name))
@pipeline(
mode_defs=[
ModeDefinition(
resource_defs={"teradata": teradata_resource},
),
]
)
def read_table_row_count_from_teradata():
sql_parameterization_execution()
if __name__ == "__main__":
execute_pipeline(read_table_row_count_from_teradata)
Josh Lloyd
09/21/2021, 3:18 PMfull_table_name = context.solid_config["full_table_name"]
to work you are going to need to add a config_schema
to the decorator parameters. Something like this would do the trick:
@solid(
config_schema={
'full_table_name': Field(
str,
is_required=True,
description="Fully qualified name for the table to be queried",
),
},
required_resource_keys={"teradata"}
)
Rohan Ahire
09/21/2021, 3:42 PMJosh Lloyd
09/21/2021, 3:43 PMRohan Ahire
09/21/2021, 3:44 PM