Hi everyone, brand new to dagster - I am trying to...
# ask-community
d
Hi everyone, brand new to dagster - I am trying to implement it in my workplace, but it would have to co-exist with other event driven systems. Does Dagster support interoperability via a message queue (preferably kafka, otherwise rabbit/amqp/mqtt) - or does the whole pipeline need to be a dagster app - we don't have the option of letting dagster take over everything at once just yet.
basically, can dagster listen to kafka, also, can it write to kafka. would be really grateful to hear of anybody doing this successfully
d
Hi David, welcome! Dagster pipelines can absolutely interact with other message queues - since Kafka has a Python client, it's no problem to write to Kafka within a Dagster op. For listening to Kafka - that could be a good use case for a Dagster sensor, you could poll for new messages on the Kafka queue and kick off a job run when a new message comes in, for example.
d
in this case we would have a pretty steady rate of messages so the kind of 'online' model of sensors makes more sense. i was actually looking at that page, so that's good to confirm I'm reading the right thing. can't seem to get it working, though. ill probably fight with it for another week and come back with more specific questions and reproducible/sample code.
also, hi Daniel, thanks, and its great to be here!
condagster 1
d
Got it - just let us know if any questions come up
m
Replying here since I was searching Slack for folks doing similar Kafka<->Dagster type work. I've written a Kafka Consumer as a Dagster
@resource
and then two `@op`s: Consume Topic -> messages, and Commit Messages. The consumer op has a max message count before returning the messages as an
Out
, as well as a time limit on polling for messages. I've made committing the messages a separate Op so that we can do other mid-pipeline operations with the messages before ack'ing them as processed. The consumer op is running on a regular
@schedule
(every 60s with a run for 45s max). So far this is working as a proof of concept for our throughput amount. Perhaps we can share it as a pattern open-source another time.
d
sorry I never followed up here, but since kafka does not support polling without a persistent connection, we went with rabbit and did something similar. a different team member implemented the dagster aspect of the project, I can ask him for the details with regards to the dagster specifics.
🚀 1
m
yeah it really depends on your volume of messages as to whether a true long-lived long-poll process is connected and consuming versus my proof of concept using a pseudo-long-lived connection (via regularly, frequently running scheduled job that stays connected and polling while instantiated for a fixed amount of time)
blob thinking cool 1
d
it also depends on the load of the kafka cluster - based on my knowledge (which mostly comes from the jvm world - maybe i am missing something python specific), kafka ecosystem is very heavily tailored to the ideal use case - long lived consumers - and so they don't actually let the client wait for the broker to send it some messages - instead you have to add an arbitrary fixed/static timeout, and hope the broker sends it messages in time. the "poll" function in kafka does not perform IO. the poll function in the rabbit driver, actually goes to the broker and asks it for messages - which is a faster operation because the messages are already arranged by consumer in queues (in kafka there is no such decoupling, so it can potentially be time consuming, e.g. if it triggers a rebalance)
👀 1
but a pseudo long running approach sounds interesting enough to revisit
m
Yeah. Near-real-time processing is what our system is achieving. It's not true "stream processing" and my implementation might be kinda bursty (already seen it act this way)
d
as long as it doesn't stall - if your timeout is 1s and broker takes 5s to fully setup the connection, poll will always behave as if the topic is empty.
m
oh, no this is every 60s schedule, max run time 45s, or max messages = 1000 of course in dagster resource these are tunable configs daggy 3d
D 1
Copy code
def _consumer_loop(kafka_client_conn, topic_name=TOPIC_NAME, max_msgs=3, max_runtime_seconds=10) -> List[KMessage]:
    kafka_client_conn.subscribe([topic_name])
    messages = []
    consumed_count = 0
    start_time = datetime.utcnow()
    max_runtime_duration = timedelta(seconds=max_runtime_seconds)
    while True:
        if consumed_count == max_msgs:
            break
        if datetime.utcnow() > start_time + max_runtime_duration:
            print("max polling time limit reached")
            break
        msg = kafka_client_conn.poll(1)
        if msg is None:
            continue
        if msg.error():
            raise KafkaException(msg.error())
        else:
            print_msg(msg)
            messages.append(msg)
            consumed_count += 1
    return messages
🙏 1
op looks like this
Copy code
@op(
    required_resource_keys={"kafka"},
    ins={
      "topic_name": In(),
      "max_msgs": In(default_value=100),
      "max_runtime_seconds": In(default_value=45),
    },
)
def consume_topic(context, topic_name, max_msgs, max_runtime_seconds) -> List[KMessage]:
    with context.resources.kafka.connection() as conn:
      return _consumer_loop(conn, topic_name, max_msgs, max_runtime_seconds)
KMessage
is
confluent_kafka.Message
d
neat
I guess I would just ask if it is okay for dagster to busy loop like that? (or for python?) but otherwise, I guess we'll just try it out to see if it fits.
maybe in the data-science world code reviews are not a thing that can be failed based on, like, c coding standards lol
m
we are running our ops as ECS tasks, so the launch time for each is "kinda slow" and it's no big deal to be in a
wait loop
(it's not a busy-loop actually)
kafka_client_conn.poll(1)
makes the
while True
loop mostly sleeping, not spinning the cpu
d
it would be if it did IO, it does not do IO in the java client (idk about python)
m
no idea about java client vs python client but the latter uses the standard kafka c library underneath
the only param for
poll
is:
timeout (float) – Maximum time to block waiting for message
emphasis on block
👍 1
d
Mike did you consider implementing this as a sensor?
👍 1
there are some tradeoffs there but those can be very good for long-polling situations like that
m
Yeah, hey Daniel, saw you on the Dagster Day live stream dagsir. Generally our kafka topic is going to have messages in it at a regular rate, so running a sensor-task that "detects" if there are messages would make sense more if the stream were very bursty and usually sparse
However since our stream has a pretty constant rate of messages most of the time, "sensing" for new messages to process is almost wasted work
Does that square up with what you're suggesting though?
d
Makes sense! And there's some overhead per run too, so periodically launching a job that does a bunch of work with multiple messages is totally valid (vs. a run per message)
👍 1
m
Yeah, it would not scale at all with one-run-per-message
d
I've also seen people do an op per message using dynamic orchestration
dagster spin 1
(which also can have some overhead)
m
@daniel question, is there a way to explicitly signal "no op" or "pass" in an
op
? in my case, if no messages then the rest of the graph is moot, rather than to pass empty list along to the next op etc etc
d
I think you could do a modified version of this example and not return the output that the next op has as an input https://docs.dagster.io/concepts/ops-jobs-graphs/graphs#with-conditional-branching -
thank you box 1
👍 1
m
got it, that makes sense
thank you
for anyone following along, Kafka message type cannot be pickled to be passed on and re-hydrated for a commit op, so I'll be using some other mechanisms to construct _TopicPartition_ in the commit op
oh hey and for anyone still following along, another interesting alternative to directly consuming messages from kafka in dagster would be to use Kafka S3 Sink Connector and then use the dagster s3 sensor for objects written to the bucket by the connector
246 Views