Hey all, is there a simple way to allow for anythi...
# ask-community
k
Hey all, is there a simple way to allow for anything of the Callable type as inputs to ops? I want to use simple lambdas to customize an op run, but I'm getting
Error loading repository location repo:TypeError: Object of type function is not JSON serializable
. Is there any alternative here or am I forced to couple the logic and make one-off ops?
Separate but somewhat related, I attempted to make an optional callable type since I was getting an error that
typing.Callable
was not a valid dagster type. But when using it, it determined that there HAD to be a supplied value for it, thoughts?
Copy code
OptionalDagsterCallable = DagsterType(
    lambda _, val: callable(val) if val is not None else True,
    key="DagsterCallableType",
    name="DagsterCallable",
    description="An override class for Dagster to have a callable type",
)
s
I want to use simple lambdas to customize an op run, but I'm getting
Error loading repository location repo:TypeError: Object of type function is not JSON serializable
Would you be able to share code that's producing this error?
But when using it, it determined that there HAD to be a supplied value for it, thoughts?
That's surprising to me - what error were you seeing?
k
I believe it's throwing the error here, as I have it specified in an input part of my job config:
Copy code
executor_def=multiprocess_executor,
    config={
        "ops": {
            "io_dynamicize_list": {"inputs": {"mapping_key_fn": {"value": lambda val, index: val.split(".")[0]}}},
            
        "execution": {"config": {"max_concurrent": 1}},
so maybe a better question is: is there any way to do something like this?
I suppose I have to set some sort of default for that
OptionalDagsterCallable
type, but setting it to None returns this:
Error loading repository location repo:dagster.core.errors.DagsterInvalidConfigError: Invalid default_value for Field.
s
ahh, I see - you're supplying the input via config. alas, this won't work, because all config needs to be JSON-serializable. not necessarily easy, but my recommendation would be to create some sort of "job factory" function. e.g.
Copy code
def make_job(fn: Callable):
    @op
    def my_op():
        fn(...)

    @job
    def my_job():
        my_op()

my_job = make_job(lambda val, index: val.split(".")[0])
k
Would this still work in the case of a pre-existing op? Here's the use case I have:
Copy code
@op
def io_dynamicize_list(context, list_to_split, mapping_key_fn):
    for index, entry in enumerate(list_to_split):
        yield DynamicOutput(
            entry, mapping_key=mapping_key_fn(entry, index), output_name="list_item"
        )
Is my only option then to do something like:
Copy code
def make_job(fn):
    @op
    def my_op():
        io_dynamicize_list(fn)

    @job
    def my_job():
        my_op()

my_job = make_job(lambda x ...)
Or am I forced to do something more hacky then and set the callable as a default? on the op and make it an op factory?
s
is my understanding correct that you're saying you don't want `def io_dynamicize_list...`to live inside the factory function? if so, what you wrote looks right. except, if
io_dynamicize_list
accepts an
context
argument and arguments, you'll need to pass them through:
Copy code
def make_job(fn):
    @op
    def my_op(context, list_to_split):
        io_dynamicize_list(context, list_to_split, fn)

    @job
    def my_job():
        my_op()

my_job = make_job(lambda x ...)
k
Yep that's exactly it, thanks for helping get this all together. Is there any future plan to make callables a viable entry for input/config? Or does the factory flow better fit Dagster's design? My only thinking is that it's a little awkward to have what I would consider configuration as a part of the graph/job definition
s
ok wait actually what I told you was partly wrong - the
input_values
arg of
<http://GraphDefinition.to|GraphDefinition.to>_job
should make this significantly easier. I was able to get this working:
Copy code
from dagster import op, graph, repository, get_dagster_logger


@op
def op1(callable):
    get_dagster_logger().info(callable())


@graph
def graph1(callable):
    op1(callable)


job1 = graph1.to_job(input_values={"callable": lambda: 5})


@repository
def repo():
    return [job1]


job1.execute_in_process()
I noticed an error when I tried annotating the callable argument of op1 with the
Callable
type. that's a bug that I'll file an issue for
k
Great, thanks for making that!