Isaac Harris-Holt
05/23/2022, 2:25 PMDynamicOut
, but I'm struggling to then get the new ops created and executed. What's the best way to achieve this? I also want to collect the results from all the syncs as some downstream ops will be dependent on them all having completed (whether failed or otherwise)Scott Hood
05/23/2022, 2:46 PMdynamic_outputs = dynamic_output_op()
collected_out = dynamic_outputs.map(op_handling_individual_output).collect()
op_parsing_all_outputs(collected_out)
Isaac Harris-Holt
05/23/2022, 2:48 PMairbyte_sync_op
in the dagster_airbyte
package. I think so, at least. I could be getting this completely backwardsowen
05/23/2022, 5:13 PMowen
05/23/2022, 5:14 PMowen
05/23/2022, 5:14 PMIsaac Harris-Holt
05/24/2022, 8:06 AMsar
09/09/2022, 2:05 PMsar
09/09/2022, 2:07 PMsync_and_poll
method on the AirbyteResource
class and passing in the connection_ids that I want to sync, but haven’t been able to get it to workIsaac Harris-Holt
09/09/2022, 2:17 PM@op(required_resource_keys={'airbyte'})
def get_workspaces(context) -> List[str]:
airbyte = context.resources.airbyte
response = airbyte.make_request('/workspaces/list', data=None)
return [
workspace['workspaceId']
for workspace in response['workspaces']
]
@op(
required_resource_keys={'airbyte'},
out=DynamicOut(),
)
def get_connections_for_workspaces(context, workspace_ids):
airbyte = context.resources.airbyte
for workspace_id in workspace_ids:
response = airbyte.make_request(
'/connections/list',
data={'workspaceId': workspace_id},
)
for connection in response['connections']:
if (
connection['status'] == 'active' and
connection.get('schedule') is None
):
yield DynamicOutput(
value=connection['connectionId'],
mapping_key=connection['connectionId'].replace('-', '_'),
)
@op(required_resource_keys={'airbyte'})
def sync_connection(context, connection_id) -> AirbyteOutput:
airbyte = context.resources.airbyte
airbyte_output = airbyte.sync_and_poll(
connection_id=connection_id,
poll_interval=5,
)
return airbyte_output
@op
def collate_results(
context,
airbyte_outputs: List[AirbyteOutput],
) -> Counter:
counter = Counter([
output.job_details.get('job', {}).get('status')
for output in airbyte_outputs
])
<http://context.log.info|context.log.info>(f'Job statuses: {counter}')
return counter
@graph
def run_enabled_syncs() -> Counter:
airbyte_workspaces = get_workspaces()
airbyte_results = (
get_connections_for_workspaces(airbyte_workspaces)
.map(sync_connection)
.collect()
)
return collate_results(airbyte_results)
sar
09/09/2022, 3:30 PM<http://GraphDefinition.to|GraphDefinition.to>_job