Jake Kagan
01/06/2023, 10:24 PMdef qry_redshift():
result = my_reusable_op(my_query_string)
get_dagster_logger().info(f"*******{result}")
return "done"
@op
def qry_bigquery(qry_redshift):
result = op_bigquery_query()
get_dagster_logger().info(f"*******{result}")
return "done"
@job(resource_defs={"rsrc_bigquery": rsrc_bigquery,
"rsrc_redshift": rsrc_redshift})
def one_two_punch():
qry_bigquery(qry_redshift())
thanks!Zach
01/06/2023, 10:45 PMJake Kagan
01/06/2023, 11:04 PM@op(config_schema={"reusable_op": Any})
def qry_redshift():
result = reusable_op(qry_str=one_off_qry)
get_dagster_logger().info(f"*******{result}")
return "done"
the resuable_op op actually has a context parameter:
@op(required_resource_keys={"rsrc_redshift"},
config_schema={"execute_many": Field(bool, default_value=False),
"qry_str": str,
"qry_vals": Field(list, default_value=[]),
"qry_type": Field(str, default_value='DQL')
})
def reusable_op(context):
with context.resources.rsrc_redshift(placeholder=None).client as redshift_rsrc:
and when i try to pass a context like so:
result = reusable_op(context, qry_str=one_off_qry)
that doesn't seem to work - so really a bit confused about passing that qry_str
into the passed op reusable_op
Zach
01/06/2023, 11:05 PM@op(config_schema={"query_string": str}
def qry_redshift(context):
result = my_reusable_op(context.op_config["query_string")
get_dagster_logger().info(f"*******{result}")
return "done"
@op
def qry_bigquery(qry_redshift):
result = op_bigquery_query()
get_dagster_logger().info(f"*******{result}")
return "done"
@job(resource_defs={"rsrc_bigquery": rsrc_bigquery,
"rsrc_redshift": rsrc_redshift})
def one_two_punch():
qry_bigquery(qry_redshift())
one_to_punch.execute_in_process(run_config={"ops": {"config": { "qry_redshift": "SELECT * FROM blah"}}})
build_op_context
https://docs.dagster.io/concepts/testing#testing-opsJake Kagan
01/06/2023, 11:26 PMconfig_schema
there, and if i do that i get UserWarning: Error loading repository location etl_dev:dagster._core.errors.DagsterInvalidInvocationError: Comput
e function of op 'op_redshift_query' has context argument, but no context was provided when invoking.
so i tried to pass the context but it doesnt register...Zach
01/06/2023, 11:29 PMJake Kagan
01/06/2023, 11:31 PM@op(required_resource_keys={"rsrc_redshift"},
config_schema={"execute_many": Field(bool, default_value=False),
"qry_str": str,
"qry_vals": Field(list, default_value=[]),
"qry_type": Field(str, default_value='DQL')
})
def my_reusable_op(context):
with context.resources.rsrc_redshift(placeholder=None).client as redshift_rsrc:
so maybe this one needs to have the context removed?Zach
01/06/2023, 11:33 PMqry_redshift
function as an argumentJake Kagan
01/06/2023, 11:34 PMZach
01/06/2023, 11:37 PMJake Kagan
01/06/2023, 11:38 PMZach
01/06/2023, 11:38 PMJake Kagan
01/06/2023, 11:41 PMone_to_punch.execute_in_process(run_config={"ops": {"config": { "qry_redshift": "SELECT * FROM blah"}}})
in the python file, while also having the job be able to launch through dagitZach
01/06/2023, 11:44 PMexecute_in_process
method will actually execute the job when dagster loads your code (if it's defined at the top-level anyway), which I don't think is what you wantJake Kagan
01/07/2023, 12:46 AMQwame
01/07/2023, 3:51 PMload_input
section of a custom io manager or you could let any op that uses the resource execute the query. If the query string is not going to change, you can use use the asset api instead of op and just load the value of the asset anytime you call it. This is where the the magic of the custom io manager comes in. Because every time the asset is loaded, the results are updated and not stale.