https://dagster.io/ logo
Title
j

Jordan W

06/01/2021, 6:26 PM
Hello, I have been moving my data pipeline into the dagster workflow. Some of our solids are slow, so on top of refactoring the code, we want to use
async
where possible. I am having a little issue trying to get a basic working example to execute asynchronously. As of now, the basic example solids execute one after another. I saw in the docs that
execute_pipeline_iterator
might have to be used. I was wondering if there was any async execution for dagit/dagster cli (apart from
execute_pipeline_iterator
) and if I could get some help on my example. Appreciate any and all help.
Example
@solid
async def do_stuff(context):
    await asyncio.sleep(5)
    <http://context.log.info|context.log.info>("hello")
    yield Output(42)


@pipeline
def sample_async_pipeline():
    do_stuff()
    do_stuff()
j

johann

06/01/2021, 6:29 PM
cc @alex
a

alex

06/01/2021, 7:19 PM
we do not yet support simultaneous execution of solids in the same process - you can set execution to multiprocess for parallelism , in the
run_config
you can set
"execution": {"multiprocess": {}},
            "storage": {"filesystem": {}},
j

Jordan W

06/01/2021, 9:11 PM
thank you, i was able to get it working after using these params
after applying this to my solids, when the solid finishes executing, it throws this error. the solid returns a NamedTuple
dagster.core.errors.DagsterObjectStoreError: Error occurred during storing output "result" for step "{solid_name}" in "filesystem" object store using "pickle".
AttributeError: Can't pickle local object '{solid_name}.<locals>.<lambda>' was wondering if this error
Edit: Think i cant pickle a default dict. not a dagster issue