Skip to content

redis-field-engineering/redis-flink-connector-dist

Repository files navigation

Redis Flink Connector

The Redis Flink Connector is a highly performant, scalable Flink Source and Sink connector for Redis. It is designed and built to provide a simple, scalable means of using Redis as a source and Sink for your stream-processing use cases in Flink.

Topics

A topic in the Redis Flink Connector is a logical stream of data that is physically implemented as a collection of Redis Streams. Topics provide a familiar abstraction similar to Kafka topics, making it easier to work with Redis Streams in a distributed streaming context.

How Topics Work

When you configure a topic with a name (e.g., "orders") and a number of partitions (e.g., 4), the connector creates multiple Redis Stream keys following the pattern:

<topicName>:<partitionNumber>

For example, a topic named "orders" with 4 partitions creates these Redis Stream keys:

  • orders:0

  • orders:1

  • orders:2

  • orders:3

Partitioning and Message Distribution

Messages are distributed across partitions based on their key. The connector uses a hash-based partitioning strategy:

partition = abs(key.hashCode()) % numPartitions

This ensures that:

  • Messages with the same key always go to the same partition (maintaining ordering per key)

  • Messages are evenly distributed across partitions for load balancing

  • Each partition can be processed independently by different Flink task instances

Consumer Groups

For source connectors, topics work with Redis Stream consumer groups. Each consumer group maintains its own read position across all partitions of a topic, enabling:

  • Multiple independent consumers to read from the same topic

  • Automatic tracking of which messages have been processed

  • Exactly-once processing semantics when combined with Flink checkpointing

Configuration Example

// Sink configuration - writes to a topic with 4 partitions
RedisSinkConfig sinkConfig = RedisSinkConfig.builder()
        .host("redis")
        .topicName("orders")           // Topic name
        .numPartitions(4)              // Number of partitions
        .build();

// Source configuration - reads from a topic with 4 partitions
RedisSourceConfig sourceConfig = RedisSourceConfig.builder()
        .host("redis")
        .topicName("orders")           // Same topic name
        .consumerGroup("order-processor") // Consumer group name
        .numPartitions(4)              // Must match the topic's partitions
        .build();

Benefits of Topics

  • Scalability: Distribute load across multiple Redis Stream keys and Flink task instances

  • Ordering: Maintain message ordering within each partition (per key)

  • Parallelism: Process different partitions concurrently

  • Fault Tolerance: Isolate failures to individual partitions

  • Cluster Support: Spread partitions across a Redis Cluster for horizontal scaling

Partitioned Streams

The Redis Flink Connector supports partitioned streams, allowing you to configure how many separate partitions you want for your stream of data. This allows you to scale your stream across a Redis Cluster, allowing Flink to manage the work of coordinating which consumer owns which stream.

Parallelism vs Partitions

Understanding the relationship between Flink parallelism and topic partitions is crucial for optimal performance and resource utilization.

Sink Behavior

Partitions Parallelism Behavior

1

3

Similar to 1-1 configuration, but opens 3 connections to Redis

3

1

Data is partitioned across 3 streams, but only 1 connection is opened

3

4

Data is partitioned across 3 streams, 4 connections are opened

Key Points for Sink:

  • Increasing or decreasing parallelism scales writes to Redis accordingly

  • Data partitioning is determined by the number of partitions, not parallelism

  • More parallel tasks than partitions will result in multiple connections, but data distribution remains based on partition count

Source Behavior

Partitions Parallelism Behavior

1

3

3 connections created, but 2 sit idle with a single consumer group

3

1

3 consumer groups with 1 consumer each. Uses single connection, blocks for 500ms on each stream in round-robin fashion

3

4

Similar to 3-3 configuration (not 4 connections). Extra parallelism has no effect

Key Points for Source:

  • Parallelism < Partitions: Some connections will be shared between consumer groups in round-robin fashion

  • Parallelism = Partitions: Optimal configuration - each parallel task handles one partition

  • Parallelism > Partitions: Additional parallelism has no effect; extra connections sit idle

Recommendation: For best performance, set parallelism equal to the number of partitions for source connectors.

Checkpointing Behavior

The Redis Flink Connector’s behavior is tightly integrated with Flink’s checkpointing mechanism, providing different guarantees for sources and sinks.

Source Checkpointing

Messages are consumed from the source according to the configured ackStrategy.

Acknowledgment Timing:

  • ON_CHECKPOINT: messages are acknowledged on checkpoint completion

  • ON_EMIT: messages are acknowledged after each emitted record

  • NOACK: messages are read with XREADGROUP …​ NOACK and never enter the pending list

With ON_CHECKPOINT, if messages are processed but the component fails before checkpoint, messages will be redelivered. You may need to implement deduplication logic in your application to handle redelivered messages.

At-Least-Once Guarantee:

In case of failure before checkpoint:

  1. Messages that were consumed but not yet checkpointed will be redelivered

  2. This ensures no message loss, but may result in duplicate processing

  3. Implement idempotent processing or deduplication to handle this scenario

  4. ON_EMIT trades a smaller replay window for weaker guarantees than checkpoint-aligned acknowledgment

Sink Checkpointing

The sink offers two buffering strategies that affect when data is written to Redis:

Buffer-Based Flushing

When flushOnCheckpoint(false) (default):

RedisSinkConfig config = RedisSinkConfig.builder()
        .host("redis")
        .topicName("output")
        .numPartitions(3)
        .flushOnCheckpoint(false)
        .numMessagesToBuffer(100)      // Flush after 100 messages
        .flushIntervalMillis(5000)     // Or flush every 5 seconds
        .build();
  • Messages are buffered in memory

  • Flushed to Redis when buffer reaches numMessagesToBuffer messages

  • Can also flush periodically using flushIntervalMillis

  • Provides better throughput but messages may be lost on failure before flush

Checkpoint-Based Flushing

When flushOnCheckpoint(true):

RedisSinkConfig config = RedisSinkConfig.builder()
        .host("redis")
        .topicName("output")
        .numPartitions(3)
        .flushOnCheckpoint(true)
        .build();
  • Messages are buffered inside the connector

  • Written to Redis only on checkpoint completion

  • Provides stronger delivery guarantees aligned with Flink’s checkpointing

  • May have lower throughput due to less frequent writes

Choosing a Strategy:

  • Use flushOnCheckpoint(true) for stronger consistency guarantees

  • Use flushOnCheckpoint(false) with buffering for higher throughput when some data loss is acceptable

Exactly-Once Semantics

The Redis Flink Connector supports exactly-once semantics. This is tied into the checkpointing mechanism in Flink. Please note that "exactly once" refers to is at the checkpoint level, so in the case of a failure in your pipeline you may see messages within a checkpoint being delivered more than once

Gradle

Add the following to your build.gradle file

build.gradle
dependencies {
    implementation 'com.redis:redis-flink-connector-spring:0.0.24-2.0'
}

Using the Stream Source

To use the Flink stream source, you can create a RedisSourceConfig.

The configuration options are as follows:

The following table describes the fields in that class:

Field Type Default Value Required

host

String

"localhost"

No

port

int

6379

No

password

String

"" (empty string)

No

user

String

"default"

No

consumerGroup

String

N/A

Yes

topicName

String

N/A

Yes

numPartitions

int

N/A

Yes

useClusterApi

boolean

false

No

ackStrategy

RedisAckStrategy

ON_CHECKPOINT

No

startingId

StreamEntryID

StreamEntryID.XGROUP_LAST_ENTRY

No

failedDeserializationStreamName

String

""

No

useTls

boolean

false

No

mtlsParameters

MtlsParameters

null

No

otelServiceName

String

null

No

You can then initialize the Source Builder using:

RedisSourceBuilder<RedisMessage> sourceBuilder = new RedisSourceBuilder<>(sourceConfig, new RedisMessageDeserializer());

Set otelServiceName to enable source-side OpenTelemetry spans for XREADGROUP, emit-record, and XACK when the incoming Redis stream entries already carry trace metadata.

After that, all that’s left is to use your environment to add the source to your pipeline:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(globalConfig);
env.enableCheckpointing(5000);
env.setParallelism(4);
TypeInformation<RedisMessage> typeInfo = TypeInformation.of(RedisMessage.class);
String sourceName = "Redis to Redis";
env.fromSource(sourceBuilder.build(), WatermarkStrategy.noWatermarks(), sourceName, typeInfo).sinkTo(sinkBuilder.build());

Shared Sink Configuration

The Redis Flink Connector ships three sinks - the Stream sink (RedisSinkConfig / RedisSink), the JSON sink (RedisJsonSinkConfig / RedisJsonSink), and the Hash sink (RedisHashSinkConfig / RedisHashSink). All three configs extend an AbstractRedisSinkConfig base that exposes the shared connection, buffering, error-stream, and tracing fields below. Each sink-specific section further down only lists the fields that are unique to that sink.

Field Type Default Value Required description

host

String

"localhost"

No

the Redis host name

port

int

6379

No

the Redis port

password

String

"" (empty string)

No

the Redis password

user

String

"default"

No

the Redis user

flushOnCheckpoint

boolean

false

No

whether to flush writes on checkpoint

numMessagesToBuffer

int

0

No

number of messages to buffer before flushing (when > 0)

flushIntervalMillis

int

0

No

time interval in milliseconds for periodic flushing (when > 0)

failedSerializationStreamName

String

null

No

the stream name to write serialization errors to

useTls

boolean

false

No

whether to use TLS

mtlsParameters

RedisMtlsParameters

null

No

parameters to use for mTLS

traceConfiguration

SinkTraceConfiguration

null

No

configuration for tracing events processed by the sink

otelServiceName

String

null

No

service name used for OpenTelemetry spans emitted by the sink

Every sink config exposes these via its Lombok @SuperBuilder, so a builder call like .host("redis").useTls(true).flushOnCheckpoint(true) works identically across RedisSinkConfig, RedisJsonSinkConfig, and RedisHashSinkConfig.

Using the Redis Stream Sink

The Stream sink writes each record as an entry on a partitioned Redis stream (via XADD). RedisSinkConfig adds the following fields on top of the shared configuration:

Field Type Default Value Required description

topicName

String

N/A

Yes

the Topic Name

numPartitions

int

N/A

Yes

the number of partitions

useClusterApi

boolean

false

No

use the Redis cluster API

You then have to initialize the builder and sink to it:

RedisSinkBuilder<RedisMessage> sinkBuilder = new RedisSinkBuilder<>(new RedisPassthroughSerializer(), sinkConfig);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(globalConfig);
env.enableCheckpointing(5000);
env.setParallelism(4);
TypeInformation<RedisMessage> typeInfo = TypeInformation.of(RedisMessage.class);
String sourceName = "Redis to Redis";
env.fromSource(sourceBuilder.build(), WatermarkStrategy.noWatermarks(), sourceName, typeInfo).sinkTo(sinkBuilder.build());

Using the Redis JSON Sink

The JSON sink writes each record as a Redis JSON document via JSON.SET, transactionally combined with an optional EXPIRE. RedisJsonSinkConfig adds the following fields on top of the shared configuration:

Field Type Default Value Required description

jsonPath

String

"$"

No

JSON path the document is written to

ttlSeconds

long

0

No

TTL in seconds applied to the key (0 = no expiration)

Provide a RedisJsonSerializer<T> to control the JSON payload (the default RedisJsonObjectSerializer<T> lets Jedis serialize your POJO directly), and a RedisKeyExtractor<T> to pick the Redis key per record:

RedisJsonSinkConfig config = RedisJsonSinkConfig.builder()
        .host("redis")
        .ttlSeconds(300)
        .build();

RedisJsonSink<Order> sink = new RedisJsonSinkBuilder<Order>(
        new RedisJsonObjectSerializer<>(), config)
    .keyExtractor(order -> "order:" + order.getId())
    .build();

env.fromCollection(orders).sinkTo(sink);

See samples/jobs/json-sink/ and ./example-json-sink.sh for an end-to-end demo against the Docker compose stack.

Using the Redis Hash Sink

The Hash sink writes each record as a Redis Hash via HSET, transactionally combined with an optional EXPIRE. RedisHashSinkConfig adds the following fields on top of the shared configuration:

Field Type Default Value Required description

ttlSeconds

long

0

No

TTL in seconds applied to the key (0 = no expiration)

Provide a RedisHashSerializer<T> (returns Map<String,String>) to control the hash field layout. The default RedisHashObjectSerializer<T> flattens your POJO to a Map<String,String> via Jackson, using JavaBean getters as field names:

RedisHashSinkConfig config = RedisHashSinkConfig.builder()
        .host("redis")
        .ttlSeconds(300)
        .build();

RedisHashSink<Order> sink = new RedisHashSinkBuilder<Order>(
        new RedisHashObjectSerializer<>(), config)
    .keyExtractor(order -> "order:" + order.getId())
    .build();

env.fromCollection(orders).sinkTo(sink);

See samples/jobs/hash-sink/ and ./example-hash-sink.sh for an end-to-end demo against the Docker compose stack.

Serializers and Keys

The Redis Flink Connector’s natural data type is the RedisMessage class. This class contains the data of the message (a Map<String,String>) and the key for the Message (a String). The RedisPasssthroughSerializer and the RedisMessageDeserializer are a simple serializer/deserializer pair that allows you to work directly with the RedisMessage object.

The RedisObjectSerializer and RedisObjectDeserializer are generic serializers/deserializers that allow you to work with your standard serializable POJOs. You can use these if you want to work with your own objects domain objects, the object is serialized to JSON and added as the data field of the Stream Message that is sent to Redis. If you need to add specific modules to the ObjectMapper (e.g. JavaTimeModule), you can do so by passing in an ObjectMapperSupplier to the RedisObjectSerializer and RedisObjectDeserializer constructors. E.g.

RedisObjectSerializer<Person> serializer = new RedisObjectSerializer<>(() -> {
            ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.registerModule(new JavaTimeModule());
            return objectMapper;
        });

If you use these, you may also want to provide a RedisKeyExtractor to extract the key from the object, otherwise, a hashcode extracted from the JSON payload of the object will act as the key.

The key determines what partition that a message will be sent to.

Configure Serializer and Key Extractor

You can configure the serializer and key extractor in the RedisSinkBuilder:

RedisSinkBuilder<Person> sinkBuilder = new RedisSinkBuilder<Person>(new RedisObjectSerializer<>(), sinkConfig).keyExtractor(Person::getName);

And you can configure which deserializer to use in the RedisSourceBuilder:

RedisSourceBuilder<Person> sourceBuilder = new RedisSourceBuilder<>(sourceConfig, new RedisObjectDeserializer<>(Person.class));

Quick Start

You can run the demo in this repo by running:

docker compose up -d
./example-redis-job.sh

This will spin up Redis, a Flink Job Manager and Task Manager, and start a Job with Redis as the Source and Sink.

Sink Trace Configuration

The Redis Sink supports configurable tracing of events processed by the sink. This allows you to track the success or failure of event writes with different levels of detail. Tracing can be configured using the SinkTraceConfiguration class with the following options:

Trace Levels

The SinkTraceLevel enum controls which events are traced:

Level Description

NONE

No tracing (default when configuration is null)

FAILURES

Only trace failed writes

ALL

Trace all events (both successful and failed writes)

Trace Contents

The SinkTraceContents enum controls what information is included in the trace:

Contents Description

NONE

No content (minimal tracing)

METADATA

Include metadata about the event (timestamps, keys, etc.)

ALL

Include full message contents along with metadata

Example Configuration

// Configure tracing for all events with full message contents
SinkTraceConfiguration traceConfig = new SinkTraceConfiguration(SinkTraceLevel.ALL, SinkTraceContents.ALL);

// Use the configuration when building your sink
RedisSinkConfig config = RedisSinkConfig.builder()
        .host("redis")
        .topicName("output")
        .numPartitions(3)
        .traceConfiguration(traceConfig)
        .build();

Support

Redis Flink Connector is supported by Redis, Inc. for enterprise-tier customers as a 'Developer Tool' under the Redis Software Support Policy. For non enterprise-tier customers we supply support for Redis Flink Connector on a good-faith basis. To report bugs, request features, or receive assistance, please file an issue.

License

Redis Flink Connector is licensed under the Business Source License 1.1. Copyright © 2024 Redis, Inc. See LICENSE for details.

About

Flink Source and Sink for Redis Streams

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages