David Ankin
06/15/2022, 12:05 AMDavid Ankin
06/15/2022, 12:12 AMdaniel
06/15/2022, 4:49 AMdaniel
06/15/2022, 4:51 AMDavid Ankin
06/15/2022, 12:32 PMDavid Ankin
06/15/2022, 12:35 PMdaniel
06/15/2022, 1:02 PMMike Atlas
09/08/2022, 4:21 PM@resource
and then two `@op`s: Consume Topic -> messages, and Commit Messages. The consumer op has a max message count before returning the messages as an Out
, as well as a time limit on polling for messages. I've made committing the messages a separate Op so that we can do other mid-pipeline operations with the messages before ack'ing them as processed. The consumer op is running on a regular @schedule
(every 60s with a run for 45s max).
So far this is working as a proof of concept for our throughput amount. Perhaps we can share it as a pattern open-source another time.David Ankin
09/08/2022, 4:24 PMMike Atlas
09/08/2022, 4:36 PMDavid Ankin
09/08/2022, 4:39 PMDavid Ankin
09/08/2022, 4:41 PMMike Atlas
09/08/2022, 4:57 PMDavid Ankin
09/08/2022, 4:58 PMMike Atlas
09/08/2022, 4:59 PMMike Atlas
09/08/2022, 4:59 PMdef _consumer_loop(kafka_client_conn, topic_name=TOPIC_NAME, max_msgs=3, max_runtime_seconds=10) -> List[KMessage]:
kafka_client_conn.subscribe([topic_name])
messages = []
consumed_count = 0
start_time = datetime.utcnow()
max_runtime_duration = timedelta(seconds=max_runtime_seconds)
while True:
if consumed_count == max_msgs:
break
if datetime.utcnow() > start_time + max_runtime_duration:
print("max polling time limit reached")
break
msg = kafka_client_conn.poll(1)
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
else:
print_msg(msg)
messages.append(msg)
consumed_count += 1
return messages
Mike Atlas
09/08/2022, 5:01 PMMike Atlas
09/08/2022, 5:03 PM@op(
required_resource_keys={"kafka"},
ins={
"topic_name": In(),
"max_msgs": In(default_value=100),
"max_runtime_seconds": In(default_value=45),
},
)
def consume_topic(context, topic_name, max_msgs, max_runtime_seconds) -> List[KMessage]:
with context.resources.kafka.connection() as conn:
return _consumer_loop(conn, topic_name, max_msgs, max_runtime_seconds)
Mike Atlas
09/08/2022, 5:03 PMKMessage
is confluent_kafka.Message
David Ankin
09/08/2022, 5:07 PMDavid Ankin
09/08/2022, 5:09 PMDavid Ankin
09/08/2022, 5:10 PMMike Atlas
09/08/2022, 5:10 PMwait loop
(it's not a busy-loop actually)Mike Atlas
09/08/2022, 5:12 PMkafka_client_conn.poll(1)
makes the while True
loop mostly sleeping, not spinning the cpuDavid Ankin
09/08/2022, 5:12 PMMike Atlas
09/08/2022, 5:13 PMMike Atlas
09/08/2022, 5:14 PMpoll
is:
timeout (float) – Maximum time to block waiting for message
Mike Atlas
09/08/2022, 5:15 PMdaniel
09/08/2022, 5:15 PMdaniel
09/08/2022, 5:16 PMMike Atlas
09/08/2022, 5:18 PMMike Atlas
09/08/2022, 5:19 PMMike Atlas
09/08/2022, 5:20 PMdaniel
09/08/2022, 5:20 PMMike Atlas
09/08/2022, 5:21 PMdaniel
09/08/2022, 5:21 PMdaniel
09/08/2022, 5:21 PMMike Atlas
09/08/2022, 5:29 PMop
? in my case, if no messages then the rest of the graph is moot, rather than to pass empty list along to the next op etc etcdaniel
09/08/2022, 5:31 PMMike Atlas
09/08/2022, 5:33 PMMike Atlas
09/08/2022, 5:33 PMMike Atlas
09/08/2022, 6:37 PMMike Atlas
09/13/2022, 4:59 PM