KIP-932 : Implement Share consumer interface with poll API#2217
KIP-932 : Implement Share consumer interface with poll API#2217Kaushik Raina (k-raina) wants to merge 7 commits intodev_kip-932_queues-for-kafkafrom
Conversation
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
There was a problem hiding this comment.
Pull request overview
This PR introduces a new ShareConsumer Python API backed by a C-extension wrapper around librdkafka’s KIP-932 Share Consumer functionality, and wires it into the package distribution and type hints.
Changes:
- Add a new C-extension type
cimpl.ShareConsumerwithsubscribe(),unsubscribe(),subscription(),consume_batch(), andclose(). - Export
ShareConsumerthroughconfluent_kafka.cimplandconfluent_kafka.__init__, and add corresponding type stubs. - Add unit tests for the new
ShareConsumerAPI (skipped when ShareConsumer/librdkafka support is unavailable).
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/test_ShareConsumer.py | Adds test coverage for ShareConsumer construction, subscription handling, consume_batch behavior, and close semantics (with skip guard). |
| src/confluent_kafka/src/ShareConsumer.c | Implements the new ShareConsumer C-extension type and its methods. |
| src/confluent_kafka/src/confluent_kafka.h | Declares ShareConsumerType for module initialization. |
| src/confluent_kafka/src/confluent_kafka.c | Registers the ShareConsumer Python type in the cimpl module init. |
| src/confluent_kafka/cimpl.pyi | Adds ShareConsumer type stubs. |
| src/confluent_kafka/init.py | Exposes ShareConsumer at the package top-level and in __all__. |
| setup.py | Adds ShareConsumer.c to the extension sources for building. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| self->base.type = RD_KAFKA_CONSUMER; | ||
|
|
||
| if (!(conf = common_conf_setup(RD_KAFKA_CONSUMER, (Handle *)self, args, | ||
| kwargs))) | ||
| return -1; /* Exception raised by common_conf_setup() */ | ||
|
|
There was a problem hiding this comment.
ShareConsumer uses common_conf_setup(), which installs shared callbacks (error_cb/throttle_cb/stats_cb/logger/oauth). Those callbacks assume Handle.rk is a valid rd_kafka_t* and call rd_kafka_yield(h->rk) on callback crash/fatal paths. Since ShareConsumer never sets base.rk, a fatal error or a Python callback exception can end up calling rd_kafka_yield(NULL) and crash the process. Fix by either (a) updating the shared callbacks to yield using the rk argument they already receive (or rd_kafka_opaque(rk)), not h->rk, and using rk in oauth_cb as well, or (b) setting base.rk to an actual rd_kafka_t* for share consumers (if/when librdkafka exposes it).
| /* TODO: Remove after interface of librdkafka is updated to return double pointer */ | ||
| self->max_poll_records = 10005; | ||
|
|
There was a problem hiding this comment.
max_poll_records is hardcoded to 10005 and consume_batch() allocates an array based on it on every call. This both diverges from the docstring (which says batch size is controlled by the max.poll.records config) and can cause unnecessary per-call allocation overhead. Suggest reading max.poll.records from the provided config dict during init (with a sane default matching librdkafka/Kafka, e.g. 500), validating bounds, and storing it in self->max_poll_records.
| /* TODO: Remove after interface of librdkafka is updated to return double pointer */ | |
| self->max_poll_records = 10005; | |
| /* Initialize max_poll_records from config (if provided), else use sane default. */ | |
| self->max_poll_records = 500; /* default, aligned with typical max.poll.records */ | |
| if (args && PyTuple_Check(args) && PyTuple_Size(args) >= 1) { | |
| PyObject *conf_dict = PyTuple_GetItem(args, 0); /* borrowed ref */ | |
| if (conf_dict && PyDict_Check(conf_dict)) { | |
| PyObject *mpr_obj = | |
| PyDict_GetItemString(conf_dict, "max.poll.records"); /* borrowed */ | |
| if (mpr_obj) { | |
| long mpr_val = PyLong_AsLong(mpr_obj); | |
| if (mpr_val == -1 && PyErr_Occurred()) { | |
| /* Non-integer or conversion error */ | |
| PyErr_SetString(PyExc_TypeError, | |
| "max.poll.records must be an integer"); | |
| rd_kafka_conf_destroy(conf); | |
| return -1; | |
| } | |
| /* Validate bounds: require a positive, reasonably bounded value. */ | |
| if (mpr_val < 1 || mpr_val > 1000000) { | |
| PyErr_SetString( | |
| PyExc_ValueError, | |
| "max.poll.records must be between 1 and 1000000"); | |
| rd_kafka_conf_destroy(conf); | |
| return -1; | |
| } | |
| self->max_poll_records = (size_t)mpr_val; | |
| } | |
| } | |
| } |
| CallState_begin((Handle *)self, &cs); | ||
| err = rd_kafka_share_consumer_close(self->rkshare); | ||
| CallState_end((Handle *)self, &cs); | ||
|
|
||
| /* Always destroy handle, to ensures resources are freed even if broker communication fails. */ | ||
| CallState_begin((Handle *)self, &cs); | ||
| rd_kafka_share_destroy(self->rkshare); | ||
| CallState_end((Handle *)self, &cs); | ||
|
|
There was a problem hiding this comment.
ShareConsumer_close() ignores the return value of CallState_end(). If a Python signal is raised or a callback crashes during rd_kafka_share_consumer_close()/rd_kafka_share_destroy(), CallState_end() will return 0 and set an exception, but this code will continue and may mask the error or run with an inconsistent CallState. Mirror Consumer_close() by checking CallState_end() after each CallState_begin()/external call and returning NULL immediately on failure.
| if (!CallState_end((Handle *)self, &cs)) { | ||
| free(rkmessages); | ||
| if (error) | ||
| rd_kafka_error_destroy(error); | ||
| return NULL; | ||
| } |
There was a problem hiding this comment.
If CallState_end() fails in consume_batch(), the code frees the rkmessages pointer array (and destroys error), but it does not destroy any messages that may already have been returned into rkmessages (rkmessages_size > 0). This leaks rd_kafka_message_t objects and differs from Consumer_consume(), which destroys messages before returning NULL. Ensure any populated messages are destroyed on this failure path before freeing the array.
| " This method should be called to properly clean up the share consumer\n" | ||
| " and leave the share group.\n" | ||
| "\n" | ||
| " :raises RuntimeError: on error\n" |
There was a problem hiding this comment.
The close() docstring says it raises RuntimeError on error, but the implementation uses cfl_PyErr_Format(err, ...) which raises KafkaException (with the underlying rd_kafka_resp_err_t) on non-zero err. Please align the docstring with actual behavior (or change the exception type if RuntimeError is intended).
| " :raises RuntimeError: on error\n" | |
| " :raises KafkaException: on error\n" |
a7d9785 to
bc6101a
Compare
bc6101a to
fc5ef4f
Compare
Pratyush Ranjan (PratRanj07)
left a comment
There was a problem hiding this comment.
First pass comments
| rd_kafka_share_t *rkshare; | ||
|
|
||
| /* TODO: Remove after interface of librdkafka is updated to return double pointer */ | ||
| size_t max_poll_records; |
There was a problem hiding this comment.
Lets name it batch_size so as to not confuse with the actual configuration property max.poll.records
| 0, /*tp_as_buffer*/ | ||
| Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | | ||
| Py_TPFLAGS_HAVE_GC, /*tp_flags*/ | ||
| "A high-level Apache Kafka share consumer (KIP-932)\n" |
There was a problem hiding this comment.
| "A high-level Apache Kafka share consumer (KIP-932)\n" | |
| "A high-level Apache Kafka share consumer\n" |
src/confluent_kafka/cimpl.pyi
Outdated
| def subscribe(self, topics: List[str]) -> None: ... | ||
| def unsubscribe(self) -> None: ... | ||
| def subscription(self) -> List[str]: ... | ||
| def consume_batch(self, timeout: float = -1) -> List[Message]: ... |
There was a problem hiding this comment.
I think we should name this Poll only to match the signature of the Java Client atleast in Higher level clients
There was a problem hiding this comment.
In past, It was discussed that we would align naming with librdkafka APis.
There was a problem hiding this comment.
I think we still need to finalize this name. Mostly we will go with poll to adhere to Java. It was mentioned in one of the recent comments in the librdkafka public interface docs.
| * | ||
| */ | ||
| static PyObject * | ||
| ShareConsumer_consume_batch(ShareConsumerHandle *self, |
There was a problem hiding this comment.
Reduce the cyclomatic complexity of this function
There was a problem hiding this comment.
This function definition is aligned with Consumer.consume_batch for simplicity
|
|
I feel there will be issues related to the common callbacks in Python client. We are populating Handle but rk in that handle is not present. I can see that we are using this
|
Pranav Rathi (pranavrth)
left a comment
There was a problem hiding this comment.
Some high level comments. Still reviewing deeper.
src/confluent_kafka/cimpl.pyi
Outdated
| def subscribe(self, topics: List[str]) -> None: ... | ||
| def unsubscribe(self) -> None: ... | ||
| def subscription(self) -> List[str]: ... | ||
| def consume_batch(self, timeout: float = -1) -> List[Message]: ... |
There was a problem hiding this comment.
I think we still need to finalize this name. Mostly we will go with poll to adhere to Java. It was mentioned in one of the recent comments in the librdkafka public interface docs.
| int is_fatal = rd_kafka_error_is_fatal(error); | ||
| int is_retriable = rd_kafka_error_is_retriable(error); |
There was a problem hiding this comment.
We are not planning to fill this information judiciously. Let's not use this.
| Py_RETURN_NONE; | ||
|
|
||
| CallState_begin((Handle *)self, &cs); | ||
| err = rd_kafka_share_consumer_close(self->rkshare); |
There was a problem hiding this comment.
return type of close is going to change to error_t *. Add a TODO to change this when interface in librdkafka is changed.
rd_kafka_error_t *rd_kafka_share_consumer_close(rd_kafka_share_t *rkshare);
| " :raises KafkaException: on error\n" | ||
| "\n"}, | ||
|
|
||
| {NULL}}; |
There was a problem hiding this comment.
We will need to add set_sasl_credentials and Context manager functions.
| if (self->rkshare) { | ||
| CallState cs; | ||
| CallState_begin((Handle *)self, &cs); | ||
| rd_kafka_share_destroy(self->rkshare); |
There was a problem hiding this comment.
Use _flags variant for _destroy() here. If you cant add right now, add a TODO with TODO KIP-932.
| if (is_fatal) { | ||
| PyErr_Format(PyExc_RuntimeError, "Fatal error: %s", | ||
| error_str); | ||
| } else { | ||
| PyErr_Format(KafkaException, | ||
| "Error: %s (retriable: %s)", error_str, | ||
| is_retriable ? "yes" : "no"); | ||
| } | ||
|
|
There was a problem hiding this comment.
Check how Consumer is raising the exception. Use that way instead. cfl_PyErr_Format
|
Thanks Pranav Rathi (@pranavrth) for review
|
|


Summary
src/confluent_kafka/src/ShareConsumer.csubscribe(topics) / unsubscribe() / subscription()— topic subscription management viard_kafka_share_*APIsconsume_batch(timeout=-1)— batch-only consumption with chunked polling for Ctrl+C interruptibility; uses - CallState_begin/end and check_signals_between_chunks for correct GIL and TLS lifecycle managementclose()— graceful shutdown: attempts broker close then always destroys handleKnown limitations (TODOs in code)
rd_kafka_share_set_log_queue()wrapper to handle the share consumer handle.rd_kafka_share_sasl_background_callbacks_enable()export in the C API.librdkafkadouble-pointer API for dynamic config reads.Additional Changes