Charlie Bini
02/15/2022, 11:36 PM@graph
def configurable_graph():
client = get_client()
for query in graph_config["queries"]:
op1, op2, op3 = op_factory(query)
out1 = op1(client)
out2 = op2(out1)
out3 = op3(out2)
job1 = configurable_graph.to_job(
resource_defs={
"foo": bar1,
"spam": eggs1,
},
config=config_from_files(
[
f"./config/resource.yaml",
f"./config/queries-1.yaml",
]
),
)
job2 = configurable_graph.to_job(
resource_defs={
"foo": bar2,
"spam": eggs2,
},
config=config_from_files(
[
f"./config/resource.yaml",
f"./config/queries-2.yaml",
]
),
)
queries-#.yaml
and be accessible through what I called graph_config["queries"]
, but I don't know what Dagster component graph_config
actually is, if it even existsDavid Farnan-Williams
02/16/2022, 12:03 AMdef get_query_dataframe_op(
name: str,
connection_resource: str,
query: Optional[Dict[str, Any]] = None,
parameters: Optional[Dict[str, Any]] = None,
) -> SolidDefinition:
if "sql" not in query:
sql_query_file_path = file_relative_path(__file__, f"queries/{name}.sql")
with open(sql_query_file_path, "r") as query_file:
query["sql"] = query_file.read()
output_dataframe_name = f"{PREFIX_DATAFRAME}{name}"
out = {
output_dataframe_name: Out(
Any,
metadata={"label": name},
# asset_key=AssetKey(name),
io_manager_key="dataset_io_manager",
)
}
@op(
name=f"query_{name}",
description=f"\n```SQL\n{query['sql']}```",
out=out,
required_resource_keys={connection_resource},
)
def query_dataframe(context: OpExecutionContext,) -> Iterator[pd.DataFrame]:
if parameters is not None:
query["sql"] = query["sql"].format(**parameters)
dataframe = pd.read_sql_query(
con=getattr(context.resources, connection_resource).engine, **query
)
return Output(value=dataframe, output_name=output_dataframe_name)
return query_dataframe
Charlie Bini
02/16/2022, 12:31 AMDavid Farnan-Williams
02/16/2022, 2:12 AMfrom typing import Tuple, List
from dagster import DynamicOutput, DynamicOutputDefinition, In, graph, job, op, resource
from dagster.utils.yaml_utils import load_yaml_from_path
import sqlalchemy as sa
@op(config_schema={"yaml_path":str},output_defs=[DynamicOutputDefinition(Tuple[str,str])])
def query_iterable_from_yaml(context):
<http://context.log.info|context.log.info>("Loading query yaml: " + context.op_config["yaml_path"])
yaml_data = load_yaml_from_path(context.op_config["yaml_path"])
for name,query_config in yaml_data["queries"].items():
yield DynamicOutput((name, query_config["sql"]), mapping_key=name)
@op(required_resource_keys={"sqlalchemy_connection"})
def execute_query(context, query_tuple):
name, sql = query_tuple
result = context.resources.sqlalchemy_connection.execute(sql)
return result
@op
def result_fan_in(results: List):
for result in results:
pass
@graph
def query_graph():
query_iterable = query_iterable_from_yaml()
results = query_iterable.map(execute_query).collect()
result_collection = result_fan_in(results)
return result_collection
@resource(config_schema={"connection_string": str})
def sqlalchemy_resource(context) -> sa.engine.Engine:
engine = sa.create_engine(context.resource_config["connection_string"])
return engine
@job(resource_defs={"sqlalchemy_connection": sqlalchemy_resource.configured({
"connection_string": "<mssql+pyodbc://server/database?driver=ODBC+Driver+17+for+SQL+Server&trusted_connection=yes>"
})})
def query_job():
query_graph()
query_job.execute_in_process(run_config={"ops":{"query_graph": {"ops":{"query_iterable_from_yaml":{"config":{"yaml_path": "query.yaml"}}}}}})