Phillip Kelly
04/21/2023, 7:26 PMsql_query() -> str:
. Any time I've tried to pull the sql variable directly into load_dataframe()
I get a error. Am I missing something or is this the general design pattern? Ideally I would like to avoid having to hardcode sql as a part of my ops script and would like to import from an actual sql file in the job. This was just testing to get started.
# The SQL query you'd like to use
sql = """
select *
from schema.table
limit 100
"""
@op(out=Out(metadata={"sql": sql}))
def sql_query() -> str:
return sql
@op(ins={"sql": In(str)}, out=Out(pd.DataFrame))
def load_dataframe(sql: str) -> pd.DataFrame:
connection = sa.create_engine("postgresql+psycopg2://")
df = pd.read_sql_query(sql, connection)
return df
Zach
04/21/2023, 8:09 PMPhillip Kelly
04/24/2023, 9:00 PMif __name__ == "__main__":
sql = """
SELECT *
FROM schema.table
"""
result = execute_job(
job=generate_file,
run_config={
"ops": {
"load_dataframe": {
"config": {
"sql": sql
}
}
}
},
)
Phillip Kelly
04/24/2023, 9:01 PMPhillip Kelly
04/24/2023, 9:01 PMMissing required config entry "ops" at the root. Sample config for missing entry: {'ops': {'load_dataframe': {'config': {'sql': '...'}}}}
Phillip Kelly
04/24/2023, 9:02 PM@op(config_schema={"sql": Field(String, description="The SQL query to be executed")}, out=Out(pd.DataFrame))
def load_dataframe(context) -> pd.DataFrame:
sql = context.op_config["sql"]
connection = sa.create_engine("postgresql+psycopg2://")
df = pd.read_sql_query(sql, connection)
return df
Zach
04/24/2023, 9:03 PMZach
04/24/2023, 9:04 PM@op(out=Out(metadata={"sql": sql}))
def sql_query() -> str:
# The SQL query you'd like to use
sql = """
select *
from schema.table
limit 100
"""
return sql
Zach
04/24/2023, 9:05 PMZach
04/24/2023, 9:05 PMZach
04/24/2023, 9:06 PM@op(config_schema={"sql": Field(String, description="The SQL query to be executed")}, out=Out(pd.DataFrame))
def load_dataframe(context) -> pd.DataFrame:
sql = context.op_config["sql"]
connection = sa.create_engine("postgresql+psycopg2://")
df = pd.read_sql_query(sql, connection)
return df
@job
def run_load_dataframe():
load_dataframe()
then you run it in the launchpad like
ops:
run_load_dataframe:
sql:
"drop table users"
Phillip Kelly
04/24/2023, 9:06 PMPhillip Kelly
04/24/2023, 9:07 PM@op(config_schema={"sql": Field(String, description="The SQL query to be executed")}, out=Out(pd.DataFrame))
def load_dataframe(context) -> pd.DataFrame:
sql = context.op_config["sql"]
connection = sa.create_engine("postgresql+psycopg2://")
df = pd.read_sql_query(sql, connection)
return df
Zach
04/24/2023, 9:08 PMdefault_query = """
select *
from schema.table
limit 100
"""
@op(config_schema={"sql": Field(String, description="The SQL query to be executed", default_value=default_query)}, out=Out(pd.DataFrame))
def load_dataframe(context) -> pd.DataFrame:
sql = context.op_config["sql"]
connection = sa.create_engine("postgresql+psycopg2://")
df = pd.read_sql_query(sql, connection)
return df
Phillip Kelly
04/24/2023, 9:08 PMZach
04/24/2023, 9:10 PMPhillip Kelly
04/24/2023, 9:11 PM@task
def load_dataframe(sql):
connection = sa.create_engine("postgresql+psycopg2://")
df = connection.execute(sql)
df = pd.DataFrame(df)
return df
and then I'd be able to pass it any variable with sql loaded as text in the Flow
sql = """
select *
from datascience.supplemental_file
"""
with Flow("file-flow", state_handlers=[flow_custom_state_handler]) as flow:
df = load_dataframe(sql)
Phillip Kelly
04/24/2023, 9:13 PMZach
04/24/2023, 9:13 PMload_dataframes
op.Zach
04/24/2023, 9:15 PMPhillip Kelly
04/24/2023, 9:15 PM@job
def supplemental_file_job():
df = load_dataframe()
Then a main script where I'm defining the sql variable.
if __name__ == "__main__":
sql = """
SELECT *
FROM datascience.supplemental_file
"""
result = execute_job(
job=bcbsm_supplemental_file_job,
run_config={
"ops": {
"load_dataframe": {
"config": {
"sql": sql
}
}
}
},
)
Phillip Kelly
04/24/2023, 9:15 PMZach
04/24/2023, 9:17 PMload_dataframe
op that can take an arbitrary SQL query (which is what we just made)?Phillip Kelly
04/24/2023, 9:18 PMPhillip Kelly
04/24/2023, 9:19 PMZach
04/24/2023, 9:19 PMops:
run_load_dataframe:
sql:
"drop table users"
it seems like you may also be thinking that Dagster is running your pipeline from your main script, which it is notZach
04/24/2023, 9:20 PMPhillip Kelly
04/24/2023, 9:20 PMZach
04/24/2023, 9:21 PMexecute_in_process
generally isn't how you want to execute things as it will execute everything in a single process and won't be able to parallelize thingsPhillip Kelly
04/24/2023, 9:21 PMZach
04/24/2023, 9:22 PM@op(config_schema={"path_to_query": Field(str)}
def load_query(context):
with open(context.op_config['path_to_query'], "r") as fin:
return fin.read()
@op
def load_dataframe_from_input_query(query):
#execute query
@job
def runit():
load_dataframe_from_input_query(load_query())
Phillip Kelly
04/24/2023, 9:23 PMselect * from table
quite a bit to pass dagster the data it needs to create some custom report, etcPhillip Kelly
04/24/2023, 9:23 PMZach
04/24/2023, 9:24 PMPhillip Kelly
04/24/2023, 9:24 PMPhillip Kelly
04/24/2023, 9:26 PMPhillip Kelly
04/24/2023, 9:27 PMPhillip Kelly
04/24/2023, 9:27 PMZach
04/24/2023, 9:28 PMPhillip Kelly
04/24/2023, 9:28 PMPhillip Kelly
04/24/2023, 9:29 PMZach
04/24/2023, 9:29 PMPhillip Kelly
04/24/2023, 9:30 PMPhillip Kelly
04/24/2023, 9:30 PMZach
04/24/2023, 9:31 PMPhillip Kelly
04/24/2023, 9:33 PMdefault_query = """
select *
from schema.table
limit 100
"""
@op(config_schema={"sql": Field(String, description="The SQL query to be executed", default_value=default_query)}, out=Out(pd.DataFrame))
def load_dataframe(context) -> pd.DataFrame:
sql = context.op_config["sql"]
connection = sa.create_engine("postgresql+psycopg2://")
df = pd.read_sql_query(sql, connection)
return df
I have the default_query set
and then my job has the sql variable called within the main script but it used the default_query
instead
if __name__ == "__main__":
sql = """
SELECT *
FROM schema.table
"""
result = execute_job(
job=file_job,
run_config={
"ops": {
"load_dataframe": {
"config": {
"sql": sql
}
}
}
},
)
Phillip Kelly
04/24/2023, 9:33 PM@job
?
@job
def bcbsm_supplemental_file_job():
df = load_dataframe()
Zach
04/24/2023, 9:34 PMop
level. did you run the job through the UI or through calling your main script directly with python
?Phillip Kelly
04/24/2023, 9:34 PMPhillip Kelly
04/24/2023, 9:34 PMZach
04/24/2023, 9:34 PMZach
04/24/2023, 9:35 PMpython main.py
Phillip Kelly
04/24/2023, 9:35 PMZach
04/24/2023, 9:36 PMif __name__ == "__main__":
sql = """
SELECT *
FROM schema.table
"""
result = execute_job(
job=file_job,
run_config={
"ops": {
"load_dataframe": {
"config": {
"sql": sql
}
}
}
},
)
is not being used at all by the Dagster UIPhillip Kelly
04/24/2023, 9:38 PMsql
here can't be set to be an input variable for load_dataframe()
@op(config_schema={"sql": Field(String, description="The SQL query to be executed", default_value=default_query)}, out=Out(pd.DataFrame))
def load_dataframe(context) -> pd.DataFrame:
sql = context.op_config["sql"]
connection = sa.create_engine("postgresql+psycopg2://")
df = pd.read_sql_query(sql, connection)
return df
So I could set my job up like
sql = """select * from table"""
@job
def bcbsm_supplemental_file_job():
df = load_dataframe(sql)
Phillip Kelly
04/24/2023, 9:38 PMZach
04/24/2023, 9:42 PM@configured
API. there's a whole step when Dagster loads your code that confuses people quite a bit when they first get going with Dagster. when a @graph
or @job
is loaded by Dagster, the code contained inside them is purely to indicate relationships between the ops
in the graph. when your code is loaded, Dagster will compile the graph using the dependencies created by your ops passing inputs and outputs between each other.
if you want to have a generalized op that you want to be able to configure in different ways you use a config_schema. if you want to have this generalized op have multiple different hardcoded "preset" configurations, you would use the configured APIPhillip Kelly
04/24/2023, 9:44 PMZach
04/24/2023, 9:45 PMPhillip Kelly
04/24/2023, 9:47 PMPhillip Kelly
04/24/2023, 9:47 PMPhillip Kelly
04/24/2023, 9:48 PMZach
04/24/2023, 10:06 PMPhillip Kelly
05/02/2023, 2:14 PMdef op_factory_from_query(query):
"""Create an op dynamically based on a query name.
Args:
query_name (str): The class name and query from the class class_name.QUERY_NAME
Returns:
OpDefinition: The new op.
"""
@op(out=Out(str))
def my_inner_op(context):
return query
return my_inner_op
Zach
05/02/2023, 10:45 PM