https://dagster.io/ logo
Title
d

Dylan Hunt

01/21/2022, 1:37 PM
Hi All, I'm executing pipeline using python execute_pipeline api and for that I'm generating a pipeline using PipelineDefinition. Now when I try to execute the pipeline in multiprocess mode it is not possible to reconstruct a PipelineDefinition object. Now how I can make PipelineDefinition as reconstructable in order to execute it in execute_pipeline using python.
dagster.core.errors.DagsterInvariantViolationError: Reconstructable target should be a function or definition produced by a decorated function, got <class 'dagster.core.definitions.pipeline.PipelineDefinition'>.
d

daniel

01/21/2022, 3:28 PM
Hi Dylan - you can use the
reconstructable
function for this: https://docs.dagster.io/_apidocs/execution#dagster.reconstructable
or, I guess from the error message, you're already using it - but the docblock there has some examples of how you can use it (it needs to be passed in a function)
d

Dylan Hunt

01/21/2022, 3:58 PM
That's the issue, I'm dynamically generating the Pipeline using a yaml in runtime. That is why I'm using the PipelineDefinition class which cannot be passed on to the
reconstructable
function. I tried a decorated pipeline function with
reconstructable
and it works as expected. Now how I can reconstruct the PipelineDefinition class ?
p

prha

01/21/2022, 4:18 PM
Hi Dylan. The
reconstructable
function can also accept functions that produce a PipelineDefinition. So you can try passing in a lambda or creating a wrapping function around your PipelineDefinition.
def build_my_pipeline_def():
    return PipelineDefinition(...)

my_reconstructable_pipeline = reconstructable(build_my_pipeline_def)
👍 1
d

Dylan Hunt

01/21/2022, 4:22 PM
Hi @prha I'm already using a function only inside a reconstructable. The above is using below method only.
execute_pipeline(      pipeline=reconstructable(construct_pipeline(pipeline, solids)),            run_config=configurations,              instance=DagsterInstance.local_temp())
d

daniel

01/21/2022, 4:23 PM
I think you're passing in the result of the construct_pipeline function - you want to pass in the function itself for it to work
👍 1
d

Dylan Hunt

01/21/2022, 4:24 PM
@daniel let me try that quickly and confirm
d

daniel

01/21/2022, 4:25 PM
but that might require reorganizing things a bit - since the function that's passed into reconstructable shouldn't require inputs
d

Dylan Hunt

01/21/2022, 4:25 PM
Yes that is my question, How can I pass the pipeline dict and solid list?
p

prha

01/21/2022, 4:35 PM
I think in order for a pipeline to be reconstructable, there has to be a named value that can repeatedly rebuild the pipeline. So, in this case, you could have this:
my_pipeline_fn = lambda: construct_pipeline(pipeline, solids)
execute_pipeline(pipeline=reconstructable(my_pipeline_fn), ...)
👍 1
d

Dylan Hunt

01/21/2022, 4:41 PM
It is not working @prha. I'm going to try the function way. Please find the below error.
Reconstructable target can not be a lambda. Use a function or decorated function defined at module scope instead, or use build_reconstructable_target.
p

prha

01/21/2022, 4:41 PM
Ah, sorry about that. Maybe
def my_pipeline_fn():
    return construct_pipeline(pipeline, solids):
d

Dylan Hunt

01/21/2022, 5:06 PM
Thanks @prha it works! But one more question, when I done testing it in a script and try to implement it, where I use it inside a class, facing below error saying it is not defined at a module scope. Is there a way to build_reconstructable_pipeline as mentioned in the error?
Reconstructable target "build_pipeline" has a different __qualname__ "SomeClass.some_func.<locals>.build_pipeline" indicating it is not defined at module scope. Use a function or decorated function defined at module scope instead, or use build_reconstructable_pipeline.
p

prha

01/21/2022, 5:14 PM
Hi Dylan. If the pipeline is not at module scope (e.g. within a class), you’ll have to pierce through somehow to have a module-scoped value (so that we can reconstruct the pipeline in another process). The documentation for
build_reconstructable_pipeline
is pretty good. It allows for JSON-serializable arguments in addition to the pipeline factory name: https://docs.dagster.io/_apidocs/pipeline#dagster.core.definitions.reconstructable.build_reconstructable_pipeline
❤️ 1