https://dagster.io/ logo
#ask-community
Title
# ask-community
d

Diego Fabiano

08/02/2023, 1:58 AM
Can you chain Dynamic Outputs in Dagster? say I have an
op
that fetches the batch size for my runs, as it returns 5 objects, and for each of those objects I have to run N tasks, I created an op that yields each object individually as an
DynamicOutput
but I cannot find any examples online on how to pass those as inputs to another dynamic output generator. Any pointers to documentation? Here is what I am trying to do, but it is giving me an invalid definition error
Copy code
@op(out=DynamicOut())

def get_ml_job_batch() -> List[MLJob]:  # noqa: D103
    batch_size = 5
    session = Session(engine)
    ml_jobs = (
        session.query(MLJob)
        .filte(MLJob.status == Status.STAGED)
        .order_by(MLJob.id.asc())
        .limit(batch_size)
    )
    session.close()
    for idx, ml_job in enumerate(ml_jobs):
        yield DynamicOutput(ml_job, mapping_key=idx)

@op(out=DynamicOut())
def get_ml_job_tasks(ml_job) -> List[MLTask]:  # noqa: D103
    session = Session(engine)
    tasks = (
        session.query(MLTask).filter(MLTask.parent_id == ml_job.id).order_by(MLTask.id.asc())
    )
    session.close()
    for idx, task in enumerate(tasks):
        yield DynamicOutput(task, mapping_key=idx)

@op
def compute_extraction(task):
    collection_json = json.loads(Path(task.path).read_text())
    api_path = http:///models/brazil_sanctions/predict  # noqa: E501

    session = Session(engine)


    extraction = requests.get(api_path, json=collection_json)

    task.status = Status.DONE
    session.commit()
    session.close()

@job
def execute_ml_jobs():
    session = Session(engine)
    jobs = get_ml_job_batch().collect()
    for _job in jobs:
        get_ml_job_tasks(_job).map(compute_extraction)
        _job.status = Status.DONE
    session.commit()
c

chris

08/02/2023, 5:09 AM
dynamic outputs cannot be chained. https://github.com/dagster-io/dagster/issues/4364
d

Diego Fabiano

08/02/2023, 11:42 AM
Well I am collecting first and then chaining, as described in the discussion, any clues as why I am getting a `
Copy code
DagsterInvalidDefinitionError: In @job execute_ml_jobs, received invalid type <class 'str'> for input "ml_job"
z

Zach

08/02/2023, 4:08 PM
A couple things I'm noticing. 1. It looks like you're not mapping over
get_ml_job_batch
, although it's supposed to be yielding DynamicOutputs. I don't think you can collect directly on an op that yields DynamicOutputs. If the purpose is to get a list of jobs for
get_ml_job_tasks
to use, I would probably just have it return the list of jobs and have
get_ml_job_tasks
be able to take a list of jobs as an input 2. iterating over the jobs within the
@job
definition isn't going to do what you think it will - the code within
@job
is executed when your code is loaded, not at runtime, and should really only be for defining relationships between ops. For the same reason, the Session usage within the
@job
definition also won't do what you think it will, as it will also be executed when your code is loaded. Session management should be handled within resources or within ops themselves.
❤️ 1
c

chris

08/02/2023, 4:52 PM
Ah apologies for the confusion. We get the direct dynamic output chaining question a lot, so kind of assumed it was another one of those 😅 . Responded in the discussion that you posted.
❤️ 1
3 Views