This document provides information useful to developers working on confluent-kafka-python.
- Python 3.8 or higher
- Git
- librdkafka (for Kafka functionality)
-
Fork and Clone
git clone https://github.com/your-username/confluent-kafka-python.git cd confluent-kafka-python -
Create Virtual Environment
python3 -m venv venv source venv/bin/activate # On Windows: venv\Scripts\activate
Note: On Windows the variables for Visual Studio are named INCLUDE and LIB
- Install librdkafka (if not already installed)
See the main README.md for platform-specific installation instructions.
If librdkafka is installed in a non-standard location provide the include and library directories with:
C_INCLUDE_PATH=/path/to/include LIBRARY_PATH=/path/to/lib python -m build-
Install confluent-kafka-python (editable) with dev/test/docs extras
pip3 install -e .[dev,tests,docs]
Alternatively you can build the bundle independently with:
python3 -m build
-
Verify Setup
python3 -c "import confluent_kafka; print('Setup successful!')"
Alternative setup instructions tested with python 3.11
# Modify pyproject.toml to require python version >=3.11
# This fixes the cel-python dependency conflict
uv venv --python 3.11
source .venv/bin/activate
uv sync --extra dev --extra tests
uv pip install trivup setuptools
pytest tests/
# When making changes, change project.version in pyproject.toml before re-running:
uv sync --extra dev --extra testssrc/confluent_kafka/— core sync client APIssrc/confluent_kafka/aio/— AsyncIO Producer/Consumer (first-class asyncio, not generated)src/confluent_kafka/schema_registry/— Schema Registry clients and serdestests/— unit and integration tests (including async producer tests)examples/— runnable samples (includes asyncio example)tools/unasync.py— SR-only sync code generation from async sources
Install docs dependencies:
pip3 install .[docs]Build HTML docs:
make docsDocumentation will be generated in docs/_build/.
or:
python3 setup.py build_sphinxDocumentation will be generated in build/sphinx/html.
python3 tools/unasync.py
# Run the script with the --check flag to ensure the sync code is up to date
python3 tools/unasync.py --checkIf you make any changes to the async code (in src/confluent_kafka/schema_registry/_async and tests/integration/schema_registry/_async), you must run this script to generate the sync counterparts (in src/confluent_kafka/schema_registry/_sync and tests/integration/schema_registry/_sync). Otherwise, this script will be run in CI with the --check flag and fail the build.
Note: The AsyncIO Producer/Consumer under src/confluent_kafka/aio/ are first-class asyncio implementations and are not generated using unasync.
Source:
src/confluent_kafka/aio/producer/_AIOProducer.py(public async API)- Internal modules in
src/confluent_kafka/aio/producer/and helpers insrc/confluent_kafka/aio/_common.py
For a complete usage example, see examples/asyncio_example.py.
Architecture: See the AIOProducer Architecture Overview for component design and data flow details.
Design guidelines:
- Offload blocking librdkafka calls using
_common.async_callwith aThreadPoolExecutor. - Wrap common callbacks (
error_cb,throttle_cb,stats_cb,oauth_cb,logger) onto the event loop using_common.wrap_common_callbacks. - Batched async produce flushes on batch size or timeout; per-message headers are not supported in batched async produce.
- Ensure
await producer.flush()andawait producer.close()are called in shutdown paths to stop background tasks.
Performance considerations:
- AsyncIO producer adds up to 50% overhead compared to fire-and-forget sync produce in highest possible throughput cases. Normal application operations see no significant overhead.
- Batched async produce is more efficient than individual awaited produce calls. This is caused by thread contention while librdkafka queue locking mechanisms are in play.
- For maximum throughput without event loop integration, use the synchronous Producer.
- The official implementation outperforms custom AsyncIO wrappers due to optimized thread pool management.
- AsyncIO Schema Registry client maintains 100% interface parity with sync client - no unexpected gotchas or limitations.
Event loop safety:
- Do not block the event loop. Any new blocking operations must be routed using
_common.async_call. - When exposing callback entry points, use
asyncio.run_coroutine_threadsafeto re-enter the loop if invoked from non-loop threads.
Unit tests:
pytest -qRun async producer tests only:
pytest -q tests/test_AIOProducer.py
pytest -q -k AIOProducerIntegration tests (may require local/CI Kafka cluster; see tests/README.md):
pytest -q tests/integrationSee tests/README.md for instructions on how to run tests.
We use automated tools to maintain consistent code style:
- black: Code formatter
- isort: Import sorter
- flake8: Linter for code quality
# Check formatting
make style-check
# Fix formatting
make style-fix
# Check only changed files
make style-check-changed
make style-fix-changed# Check formatting
tox -e black,isort
# Check linting
tox -e flake8
# Check typing
tox -e mypy
# Run all formatting and linting checks
tox -e black,isort,flake8,mypySee “Generate Documentation” above; ensure examples and code blocks compile where applicable.
- Use topic branches (e.g.,
feature/asyncio-improvements). - Keep edits focused; include tests where possible.
- Follow existing code style and add type hints to public APIs where feasible.
- Build errors related to librdkafka: ensure headers and libraries are discoverable; see “Install librdkafka” above for
C_INCLUDE_PATHandLIBRARY_PATH. - Async tests hanging: check event loop usage and that
await producer.close()is called to stop background tasks.