Hi I've been trying to set up a simple op to read ...
# ask-community
p
Hi I've been trying to set up a simple op to read from a query, run that query against redshift and then load a dataframe. The only way I've been able to get it working is to have this intermediary
sql_query() -> str:
. Any time I've tried to pull the sql variable directly into
load_dataframe()
I get a error. Am I missing something or is this the general design pattern? Ideally I would like to avoid having to hardcode sql as a part of my ops script and would like to import from an actual sql file in the job. This was just testing to get started.
Copy code
# The SQL query you'd like to use
sql = """
    select *
    from schema.table
    limit 100
"""


@op(out=Out(metadata={"sql": sql}))
def sql_query() -> str:
    return sql


@op(ins={"sql": In(str)}, out=Out(pd.DataFrame))
def load_dataframe(sql: str) -> pd.DataFrame:
    connection = sa.create_engine("postgresql+psycopg2://")
    df = pd.read_sql_query(sql, connection)
    return df
z
I think the concept you're looking for is run configuration
👍 1
p
I've been reading the config and this seems so overly complicated just to have a simple op that will read in sql to query a database. Do you have any actual examples of this implemented somewhere to provide context. I've tried this approach
Copy code
if __name__ == "__main__":
    sql = """
        SELECT *
        FROM schema.table
    """

    result = execute_job(
        job=generate_file,
        run_config={
            "ops": {
                "load_dataframe": {
                    "config": {
                        "sql": sql
                    }
                }
            }
        },
    )
In prefect this was pretty straightforward.
when using this approach I get an error in the WebUI when looking to launch the job
Copy code
Missing required config entry "ops" at the root. Sample config for missing entry: {'ops': {'load_dataframe': {'config': {'sql': '...'}}}}
this is the op I've constructed to load the dataframe
Copy code
@op(config_schema={"sql": Field(String, description="The SQL query to be executed")}, out=Out(pd.DataFrame))
def load_dataframe(context) -> pd.DataFrame:
    sql = context.op_config["sql"]
    connection = sa.create_engine("postgresql+psycopg2://")
    df = pd.read_sql_query(sql, connection)
    return df
z
what did you enter into the launchpad UI for your configuration?
if you really want to hardcode your SQL query, you can just do
Copy code
@op(out=Out(metadata={"sql": sql}))
def sql_query() -> str:
    # The SQL query you'd like to use
    sql = """
        select *
        from schema.table
        limit 100
    """
    return sql
your issue now is that you're creating a config schema for both ops with the SQL field, when it seems like you probably only want to configure it in one place
I'd drop the sql_query op entirely, it doesn't really seem to be doing anything
💯 1
Copy code
@op(config_schema={"sql": Field(String, description="The SQL query to be executed")}, out=Out(pd.DataFrame))
def load_dataframe(context) -> pd.DataFrame:
    sql = context.op_config["sql"]
    connection = sa.create_engine("postgresql+psycopg2://")
    df = pd.read_sql_query(sql, connection)
    return df

@job
def run_load_dataframe():
    load_dataframe()
then you run it in the launchpad like
Copy code
ops:
  run_load_dataframe:
    sql:
      "drop table users"
p
Yep I got that approach to work but then the sql is stored with the ops and has to be imported. I'm just looking to set up a general op that can be imported into any job, query redshift, and load a dataframe with the results of the query. In prefect I could do this through a hardcoded sql variable or I could import more complex sql through a file
yep I've dropped the sql_query op
Copy code
@op(config_schema={"sql": Field(String, description="The SQL query to be executed")}, out=Out(pd.DataFrame))
def load_dataframe(context) -> pd.DataFrame:
    sql = context.op_config["sql"]
    connection = sa.create_engine("postgresql+psycopg2://")
    df = pd.read_sql_query(sql, connection)
    return df
z
if you want a default SQL query you can do something like
Copy code
default_query = """
    select *
    from schema.table
    limit 100
"""

@op(config_schema={"sql": Field(String, description="The SQL query to be executed", default_value=default_query)}, out=Out(pd.DataFrame))
def load_dataframe(context) -> pd.DataFrame:
    sql = context.op_config["sql"]
    connection = sa.create_engine("postgresql+psycopg2://")
    df = pd.read_sql_query(sql, connection)
    return df
p
it's the config at the time running the job that's confusing me
z
what in particular about it is confusing for you? maybe I can shed some light on it
p
I'm trying to avoid having the sql defined within my job though. Ideally I'd like to pass it as a variable to load_dataframe() while running in the job. Prefect is very straight forward.
Copy code
@task
def load_dataframe(sql):
    connection = sa.create_engine("postgresql+psycopg2://")
    df = connection.execute(sql)
    df = pd.DataFrame(df)
    return df
and then I'd be able to pass it any variable with sql loaded as text in the Flow
Copy code
sql = """
    select *
    from datascience.supplemental_file
"""


with Flow("file-flow", state_handlers=[flow_custom_state_handler]) as flow:
    df = load_dataframe(sql)
I could use this approach or import and read a .sql file either way all I needed to do was load it to a variable. I guess I'm not fully understanding why jobs aren't able to perform as simply in dagster
z
config schemas are the way to do that. now you can configure the SQL to run at runtime. the value you provide via the launchpad or execute_in_process call will be passed into your
load_dataframes
op.
you could use the approach I showed with the default value if you feel strongly about hardcoding your input values
p
I think I'm not following the flow. So I have my job defined as such
Copy code
@job
def supplemental_file_job():
    df = load_dataframe()
Then a main script where I'm defining the sql variable.
Copy code
if __name__ == "__main__":
    sql = """
        SELECT *
        FROM datascience.supplemental_file
    """

    result = execute_job(
        job=bcbsm_supplemental_file_job,
        run_config={
            "ops": {
                "load_dataframe": {
                    "config": {
                        "sql": sql
                    }
                }
            }
        },
    )
there are other steps in the job but this is the part I'm struggling to generalize
z
is that not working? I'm maybe having a little bit of trouble understanding what you're trying to do. do you want to be able to re-use the SQL query in a bunch of different places? or do you just want a general
load_dataframe
op that can take an arbitrary SQL query (which is what we just made)?
p
The second option. When I open my webUI and try to launch the job I encounter this error
haha I do feel less crazy though I thought this should work
z
yeah you still need to provide the configuration, which is what the error is trying to hint. in the big text box in the middle of the launchpad you would enter
Copy code
ops:
  run_load_dataframe:
    sql:
      "drop table users"
it seems like you may also be thinking that Dagster is running your pipeline from your main script, which it is not
when you run a job through the web UI Dagster will load objects and execute them on its own
p
aah so I can do that there? I saw one other approach that talked about setting up a config yaml file that loaded the sql
z
you'd have to run your main file directly if you wanted to use the configuration with that hardcoded SQL value. but
execute_in_process
generally isn't how you want to execute things as it will execute everything in a single process and won't be able to parallelize things
p
I also saw the Scaffold missing config and wasn't sure what this would do
z
if you want to load sql from a file, I'd add another op that can do so, and then pass the query as an input instead of through config schema
Copy code
@op(config_schema={"path_to_query": Field(str)}
def load_query(context):
   with open(context.op_config['path_to_query'], "r") as fin:
     return fin.read()

@op
def load_dataframe_from_input_query(query):
    #execute query

@job
def runit():
    load_dataframe_from_input_query(load_query())
p
There are many cases where we have all of the transformations performed by dbt and loading to a view in redshift so we'll actually
select * from table
quite a bit to pass dagster the data it needs to create some custom report, etc
which is why I was initially trying to hardcode it into this specific job file
z
yeah sounds like you want a config schema with a default value
p
ahh is that what I'm missing, was just re-reading
aaaahhh it works! or at least that error is no longer there. Thank you!
Sorry for being dense
Also I really appreciate your help on the file approach, I could see that with some more complex queries, but hopefully most of those can just be handled in dbt 🙂
z
haha you're good it's a complex tool and it's always hard adopting a new tool. glad I could be helpful!
p
I love the features so far, it was just rough stumbling on something that's generally been the starting point for airflow and prefect 😓
The support in this community is already way better than prefect's slack though
z
certainly, this community is wonderful glad you're a part of it now!
❤️ 1
p
I'm really excited about converting dbt to assets so we can trigger specific models to run the assets they need upstream rather than just setting it all on a daily cron
ah hmm so it ran, but it used the default query from the op rather than the query fed into the job
z
yeah I can't say I've really used dbt but the stuff people are doing with it and Dagster seems really really cool, seems like the Dagster team really nailed that integration
p
Copy code
default_query = """
    select *
    from schema.table
    limit 100
"""


@op(config_schema={"sql": Field(String, description="The SQL query to be executed", default_value=default_query)}, out=Out(pd.DataFrame))
def load_dataframe(context) -> pd.DataFrame:
    sql = context.op_config["sql"]
    connection = sa.create_engine("postgresql+psycopg2://")
    df = pd.read_sql_query(sql, connection)
    return df
I have the default_query set and then my job has the sql variable called within the main script but it used the
default_query
instead
Copy code
if __name__ == "__main__":
    sql = """
        SELECT *
        FROM schema.table
    """

    result = execute_job(
        job=file_job,
        run_config={
            "ops": {
                "load_dataframe": {
                    "config": {
                        "sql": sql
                    }
                }
            }
        },
    )
do I need to override the default within the
@job
?
Copy code
@job
def bcbsm_supplemental_file_job():
    df = load_dataframe()
z
no config schemas are applied at the
op
level. did you run the job through the UI or through calling your main script directly with
python
?
p
I ran it through the UI
aahh okay there's my confusion. I want the op to be agnostic to the sql it runs
z
yeah I need to stress that your main file has no bearing on how your code gets executed through the UI
it is not used or recognized by Dagster in any way, unless you run
python main.py
p
oh for clarity I've set this within my job file
z
I guess I just want to make sure that you know this
Copy code
if __name__ == "__main__":
    sql = """
        SELECT *
        FROM schema.table
    """

    result = execute_job(
        job=file_job,
        run_config={
            "ops": {
                "load_dataframe": {
                    "config": {
                        "sql": sql
                    }
                }
            }
        },
    )
is not being used at all by the Dagster UI
p
okay that's good to know. I guess I'm just confused why
sql
here can't be set to be an input variable for load_dataframe()
Copy code
@op(config_schema={"sql": Field(String, description="The SQL query to be executed", default_value=default_query)}, out=Out(pd.DataFrame))
def load_dataframe(context) -> pd.DataFrame:
    sql = context.op_config["sql"]
    connection = sa.create_engine("postgresql+psycopg2://")
    df = pd.read_sql_query(sql, connection)
    return df
So I could set my job up like
Copy code
sql = """select * from table"""

@job
def bcbsm_supplemental_file_job():
    df = load_dataframe(sql)
can you define a variable within your job script that can be picked up by an op like that?
z
not exactly like that. you may be looking for the
@configured
API. there's a whole step when Dagster loads your code that confuses people quite a bit when they first get going with Dagster. when a
@graph
or
@job
is loaded by Dagster, the code contained inside them is purely to indicate relationships between the
ops
in the graph. when your code is loaded, Dagster will compile the graph using the dependencies created by your ops passing inputs and outputs between each other. if you want to have a generalized op that you want to be able to configure in different ways you use a config_schema. if you want to have this generalized op have multiple different hardcoded "preset" configurations, you would use the configured API
p
This seems like it makes it challenging to follow DRY
z
how so? there's a way to make a generalized op that takes some arbitrary config, and then a way to create instances of that op with specific config tied to those instances
p
lol it just seems like a lot of extra steps to set up a simple function that takes a variable
I showed you how I did it in prefect. It made it pretty easy to just import that task anywhere and have it work
I'll read up on this more though and maybe it'll click. Thanks for your help!
🎉 1
z
good luck! I think what you were doing in prefect ( I haven't used that framework at all) will map pretty closely to the configured api - preset a configuration for an op, then import it and use it in a graph / job wherever you want with the config preset
p
@Zach hey just wanted to follow up I found a solution using an op factory to do what I needed. This can take any query from a class defining the queries and import them as a string within an op that can be passed to kick off the job
Copy code
def op_factory_from_query(query):
    """Create an op dynamically based on a query name.

    Args:
        query_name (str): The class name and query from the class class_name.QUERY_NAME

    Returns:
        OpDefinition: The new op.
    """

    @op(out=Out(str))
    def my_inner_op(context):
        return query

    return my_inner_op
🎉 1
z
looks like a good pattern!