hoping someone can point me in the right direction...
# ask-community
j
hoping someone can point me in the right direction of docs as well as just conceptually explain: • i have an op made with the purpose of reusing it (it takes a query string as the argument, then hits a redshift database for a result) • the op uses a resource • i want to create a job where the op above is used as the first step how can i pass that query and make this work:
Copy code
def qry_redshift():
   result = my_reusable_op(my_query_string)
   get_dagster_logger().info(f"*******{result}")
   return "done"


@op
def qry_bigquery(qry_redshift):
   result = op_bigquery_query()
   get_dagster_logger().info(f"*******{result}")
   return "done"


@job(resource_defs={"rsrc_bigquery": rsrc_bigquery,
                    "rsrc_redshift": rsrc_redshift})
def one_two_punch():
   qry_bigquery(qry_redshift())
thanks!
🤖 1
z
these docs around op configuration might help
j
thanks for taking the time, so i tried to have the resuable op as part of the new op config like so:
Copy code
@op(config_schema={"reusable_op": Any})
def qry_redshift():
   result = reusable_op(qry_str=one_off_qry)
   get_dagster_logger().info(f"*******{result}")
   return "done"
the resuable_op op actually has a context parameter:
Copy code
@op(required_resource_keys={"rsrc_redshift"},
   config_schema={"execute_many": Field(bool, default_value=False),
               "qry_str": str,
               "qry_vals": Field(list, default_value=[]),
               "qry_type": Field(str, default_value='DQL')
               })
def reusable_op(context):
   with context.resources.rsrc_redshift(placeholder=None).client as redshift_rsrc:
and when i try to pass a context like so:
Copy code
result = reusable_op(context, qry_str=one_off_qry)
that doesn't seem to work - so really a bit confused about passing that
qry_str
into the passed op
reusable_op
z
the next section below the one I linked has a good example
you code would be something like
Copy code
@op(config_schema={"query_string": str}
def qry_redshift(context):
   result = my_reusable_op(context.op_config["query_string")
   get_dagster_logger().info(f"*******{result}")
   return "done"


@op
def qry_bigquery(qry_redshift):
   result = op_bigquery_query()
   get_dagster_logger().info(f"*******{result}")
   return "done"


@job(resource_defs={"rsrc_bigquery": rsrc_bigquery,
                    "rsrc_redshift": rsrc_redshift})
def one_two_punch():
   qry_bigquery(qry_redshift())

one_to_punch.execute_in_process(run_config={"ops": {"config": { "qry_redshift": "SELECT * FROM blah"}}})
in a unit testing context you can build the context object using
build_op_context
https://docs.dagster.io/concepts/testing#testing-ops
j
thank you for taking the time!! my only issue is the resuable op takes a context argument because there is a
config_schema
there, and if i do that i get
UserWarning: Error loading repository location etl_dev:dagster._core.errors.DagsterInvalidInvocationError: Comput
e function of op 'op_redshift_query' has context argument, but no context was provided when invoking.
so i tried to pass the context but it doesnt register...
🤔 1
z
yeah you shouldn't need to pass the context argument explicitly. your code looks like the example I posted?
j
yep this is the reusable op:
Copy code
@op(required_resource_keys={"rsrc_redshift"},
   config_schema={"execute_many": Field(bool, default_value=False),
               "qry_str": str,
               "qry_vals": Field(list, default_value=[]),
               "qry_type": Field(str, default_value='DQL')
               })
def my_reusable_op(context):
   with context.resources.rsrc_redshift(placeholder=None).client as redshift_rsrc:
so maybe this one needs to have the context removed?
z
ohh I missed the part where you're calling one op from inside another op. that'd kind of a no-no in dagster. you either need to use a vanilla python function for your reusable function, or define the reusable op as part of your graph, with it taking the output value of the
qry_redshift
function as an argument
j
ahhhh got it
i wonder if u could use a vanilla function that returns the reusable op
z
if you try to call anything that's decorated with @op within another op you're going to get the missing context error, because Dagster implicitly provides the context for you when calling ops on the graph level and isn't able to do that from within the op
j
ok ok i think im getting it, thank you!
z
no problem!
j
if you dont mind i have a more execution related question... is there a workaround to running dagit, while also having the custom run configuration set in the python script? meaning, id like to keep
one_to_punch.execute_in_process(run_config={"ops": {"config": { "qry_redshift": "SELECT * FROM blah"}}})
in the python file, while also having the job be able to launch through dagit
from the docs it seems mutually exclusive
i end up having to scaffold the config in the ui and enter the query again
z
you can pre-configure jobs / assets using the configured api. the
execute_in_process
method will actually execute the job when dagster loads your code (if it's defined at the top-level anyway), which I don't think is what you want
💯 1
j
just tested this and it seems like the configured api is actually a great use case for piggy backing on those shared ops
thank you so much this was a major breakthrough
🎉 1
q
Another design choice is you not use a reusable op but a resource that accepts a query string and hits the database for a result. You could let the query execute in the
load_input
section of a custom io manager or you could let any op that uses the resource execute the query. If the query string is not going to change, you can use use the asset api instead of op and just load the value of the asset anytime you call it. This is where the the magic of the custom io manager comes in. Because every time the asset is loaded, the results are updated and not stale.
❤️ 1