Hi all, I'm having some trouble getting Dagster's ...
# ask-community
i
Hi all, I'm having some trouble getting Dagster's dynamic graphs working with Airbyte. I've got an op that gets all connection ids for a given Airbyte deployment, and I'm looking to dynamically create and execute `airbyte_sync_op`s using this info. The op that gets connection IDs works fine with a
DynamicOut
, but I'm struggling to then get the new ops created and executed. What's the best way to achieve this? I also want to collect the results from all the syncs as some downstream ops will be dependent on them all having completed (whether failed or otherwise)
🤖 1
s
So you are doing something like that?
Copy code
dynamic_outputs = dynamic_output_op()
collected_out = dynamic_outputs.map(op_handling_individual_output).collect()
op_parsing_all_outputs(collected_out)
i
Sort of, but I need to actually dynamically generate the ops as I'm using the
airbyte_sync_op
in the
dagster_airbyte
package. I think so, at least. I could be getting this completely backwards
o
hi @Isaac Harris-Holt! the airbyte_sync_op isn't really designed to change which sync it references at runtime. For this usecase, you'd want to create your own op that takes as input a connection id. With dynamic orchestration feature that Scott mentions above, dagster will automatically handle creating multiple copies of op_handling_individual_output for you.
the implementation of the airbyte_sync_op is pretty simple (it mostly just leverages the airbyte resource to kick off a sync)
here, you'd just want to create an op that gets the connection_id from an input rather than from the op_config
i
@owen you're a saint, thank you! That makes a lot of sense
🙌 1
s
@Isaac Harris-Holt Any chance you can share your code tidbit as i’m trying to do the same thing however i still can’t wrap my head around how to get it done
I’m trying to call the
sync_and_poll
method on the
AirbyteResource
class and passing in the connection_ids that I want to sync, but haven’t been able to get it to work
i
Sure, here you go:
Copy code
@op(required_resource_keys={'airbyte'})
def get_workspaces(context) -> List[str]:
    airbyte = context.resources.airbyte

    response = airbyte.make_request('/workspaces/list', data=None)
    return [
        workspace['workspaceId']
        for workspace in response['workspaces']
    ]


@op(
    required_resource_keys={'airbyte'},
    out=DynamicOut(),
)
def get_connections_for_workspaces(context, workspace_ids):
    airbyte = context.resources.airbyte

    for workspace_id in workspace_ids:
        response = airbyte.make_request(
            '/connections/list',
            data={'workspaceId': workspace_id},
        )

        for connection in response['connections']:
            if (
                connection['status'] == 'active' and
                connection.get('schedule') is None
            ):
                yield DynamicOutput(
                    value=connection['connectionId'],
                    mapping_key=connection['connectionId'].replace('-', '_'),
                )


@op(required_resource_keys={'airbyte'})
def sync_connection(context, connection_id) -> AirbyteOutput:
    airbyte = context.resources.airbyte
    airbyte_output = airbyte.sync_and_poll(
        connection_id=connection_id,
        poll_interval=5,
    )
    return airbyte_output


@op
def collate_results(
    context,
    airbyte_outputs: List[AirbyteOutput],
) -> Counter:
    counter = Counter([
        output.job_details.get('job', {}).get('status')
        for output in airbyte_outputs
    ])
    <http://context.log.info|context.log.info>(f'Job statuses: {counter}')
    return counter


@graph
def run_enabled_syncs() -> Counter:
    airbyte_workspaces = get_workspaces()

    airbyte_results = (
        get_connections_for_workspaces(airbyte_workspaces)
        .map(sync_connection)
        .collect()
    )

    return collate_results(airbyte_results)
s
Great, thank you! How do you pass in the airbyte host and port configs though as it seems to require a
<http://GraphDefinition.to|GraphDefinition.to>_job