Kirk Stennett
08/26/2022, 2:52 PMError loading repository location repo:TypeError: Object of type function is not JSON serializable
. Is there any alternative here or am I forced to couple the logic and make one-off ops?typing.Callable
was not a valid dagster type. But when using it, it determined that there HAD to be a supplied value for it, thoughts?
OptionalDagsterCallable = DagsterType(
lambda _, val: callable(val) if val is not None else True,
key="DagsterCallableType",
name="DagsterCallable",
description="An override class for Dagster to have a callable type",
)
sandy
08/26/2022, 3:02 PMI want to use simple lambdas to customize an op run, but I'm gettingWould you be able to share code that's producing this error?Error loading repository location repo:TypeError: Object of type function is not JSON serializable
But when using it, it determined that there HAD to be a supplied value for it, thoughts?That's surprising to me - what error were you seeing?
Kirk Stennett
08/26/2022, 3:12 PMexecutor_def=multiprocess_executor,
config={
"ops": {
"io_dynamicize_list": {"inputs": {"mapping_key_fn": {"value": lambda val, index: val.split(".")[0]}}},
"execution": {"config": {"max_concurrent": 1}},
so maybe a better question is: is there any way to do something like this?OptionalDagsterCallable
type, but setting it to None returns this:
Error loading repository location repo:dagster.core.errors.DagsterInvalidConfigError: Invalid default_value for Field.
sandy
08/26/2022, 3:24 PMdef make_job(fn: Callable):
@op
def my_op():
fn(...)
@job
def my_job():
my_op()
my_job = make_job(lambda val, index: val.split(".")[0])
Kirk Stennett
08/26/2022, 3:31 PM@op
def io_dynamicize_list(context, list_to_split, mapping_key_fn):
for index, entry in enumerate(list_to_split):
yield DynamicOutput(
entry, mapping_key=mapping_key_fn(entry, index), output_name="list_item"
)
Is my only option then to do something like:
def make_job(fn):
@op
def my_op():
io_dynamicize_list(fn)
@job
def my_job():
my_op()
my_job = make_job(lambda x ...)
Or am I forced to do something more hacky then and set the callable as a default? on the op and make it an op factory?sandy
08/26/2022, 3:36 PMio_dynamicize_list
accepts an context
argument and arguments, you'll need to pass them through:
def make_job(fn):
@op
def my_op(context, list_to_split):
io_dynamicize_list(context, list_to_split, fn)
@job
def my_job():
my_op()
my_job = make_job(lambda x ...)
Kirk Stennett
08/26/2022, 5:02 PMsandy
08/26/2022, 6:30 PMinput_values
arg of <http://GraphDefinition.to|GraphDefinition.to>_job
should make this significantly easier. I was able to get this working:
from dagster import op, graph, repository, get_dagster_logger
@op
def op1(callable):
get_dagster_logger().info(callable())
@graph
def graph1(callable):
op1(callable)
job1 = graph1.to_job(input_values={"callable": lambda: 5})
@repository
def repo():
return [job1]
job1.execute_in_process()
Callable
type. that's a bug that I'll file an issue forKirk Stennett
08/29/2022, 12:10 PM