Rene Silva
01/26/2023, 5:10 PMairbyte_sync_cdc
op, which is just an airbyte sync op configured for a connection provided by a resource
@resource
def airbyte_connection_ids(context):
return {
'cdc_connection_id': 'xpto',
'full_refresh_connection_id': 'frxpxo',
}
@op(required_resource_keys={"airbyte_connection_ids"})
def airbyte_sync_cdc(context):
return airbyte_sync_op.configured({"connection_id": context.resources.airbyte_connection_ids['cdc_connection_id']}, name="sync_pg_cdc")(context)
@job(resource_defs={
'airbyte_connection_ids': airbyte_connection_ids.configured({}),
})
def job2():
res = airbyte_sync_cdc()
However, whenever I try to run this, I get Compute function of op 'sync_pg_cdc' has context argument, but no context was provided when invoking.
Can someone point me my mistakes? Thanks a lot in advance!Gabe Schine
01/27/2023, 3:19 AMairbyte_sync_cdc = configured(airbyte_sync_op, {"connection_id": context.resources.airbyte_connection_ids['cdc_connection_id']}, name="sync_pg_cdc")
?