Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package org.springframework.integration.redis.inbound;

import java.time.Duration;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import org.jspecify.annotations.Nullable;

Expand Down Expand Up @@ -49,6 +49,7 @@
* @author Artem Bilan
* @author Gary Russell
* @author Matthias Jeschke
* @author Glenn Renfro
*
* @since 4.1
*/
Expand Down Expand Up @@ -77,7 +78,7 @@ public class RedisQueueInboundGateway extends MessagingGatewaySupport

private @Nullable RedisSerializer<?> serializer;

private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;
private Duration receiveTimeout = Duration.ofMillis(DEFAULT_RECEIVE_TIMEOUT);

private long recoveryInterval = DEFAULT_RECOVERY_INTERVAL;

Expand Down Expand Up @@ -123,20 +124,42 @@ public void setSerializer(RedisSerializer<?> serializer) {
this.serializerExplicitlySet = true;
}

/**
* This timeout is used when retrieving elements from the queue
* specified by {@link #boundListOperations}.
* <p>
* If the queue does contain elements, the data is retrieved immediately. However,
* if the queue is empty, the Redis connection is blocked until either an element
* can be retrieved from the queue or until the specified timeout passes.
* <p>
* A timeout of zero can be used to block indefinitely. If not set explicitly
* the timeout value will default to {@code 1000 millis}
* <p>
* See also: https://redis.io/commands/brpop
* @param receiveTimeout {@link Duration} containing the receive timeout.
* @since 7.1
*/
public void setReceiveDuration(Duration receiveTimeout) {
Assert.isTrue(!receiveTimeout.isNegative(), "'receiveTimeout' must be >= 0.");
this.receiveTimeout = receiveTimeout;
}

/**
* This timeout (milliseconds) is used when retrieving elements from the queue
* specified by {@link #boundListOperations}.
* <p> If the queue does contain elements, the data is retrieved immediately. However,
* <p>
* If the queue does contain elements, the data is retrieved immediately. However,
* if the queue is empty, the Redis connection is blocked until either an element
* can be retrieved from the queue or until the specified timeout passes.
* <p> A timeout of zero can be used to block indefinitely. If not set explicitly
* the timeout value will default to {@code 1000}
* <p> See also: https://redis.io/commands/brpop
* <p>
* A timeout of zero can be used to block indefinitely. If not set explicitly
* the timeout value will default to {@code 1000 millis}
* <p>
* See also: https://redis.io/commands/brpop
* @param receiveTimeout Must be non-negative. Specified in milliseconds.
*/
public void setReceiveTimeout(long receiveTimeout) {
Assert.isTrue(receiveTimeout >= 0, "'receiveTimeout' must be >= 0.");
this.receiveTimeout = receiveTimeout;
setReceiveDuration(Duration.ofMillis(receiveTimeout));
}

public void setTaskExecutor(Executor taskExecutor) {
Expand Down Expand Up @@ -191,7 +214,7 @@ private void handlePopException(Exception e) {
private void receiveAndReply() {
byte[] value;
try {
value = this.boundListOperations.rightPop(this.receiveTimeout, TimeUnit.MILLISECONDS);
value = this.boundListOperations.rightPop(this.receiveTimeout);
}
catch (Exception e) {
handlePopException(e);
Expand All @@ -208,7 +231,7 @@ private void receiveAndReply() {
return;
}
try {
value = this.template.boundListOps(uuid).rightPop(this.receiveTimeout, TimeUnit.MILLISECONDS);
value = this.template.boundListOps(uuid).rightPop(this.receiveTimeout);
}
catch (Exception e) {
handlePopException(e);
Expand Down Expand Up @@ -349,7 +372,7 @@ public boolean isListening() {
}

/**
* Returns the size of the Queue specified by {@link #boundListOperations}. The queue is
* Return the size of the Queue specified by {@link #boundListOperations}. The queue is
* represented by a Redis list. If the queue does not exist <code>0</code>
* is returned. See also <a href="https://redis.io/commands/llen">LLEN</a>
* @return Size of the queue. Never negative.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package org.springframework.integration.redis.inbound;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import org.jspecify.annotations.Nullable;

Expand Down Expand Up @@ -54,6 +54,7 @@
* @author Gary Russell
* @author Rainer Frey
* @author Matthias Jeschke
* @author Glenn Renfro
*
* @since 3.0
*/
Expand All @@ -80,7 +81,7 @@ public class RedisQueueMessageDrivenEndpoint extends MessageProducerSupport

private boolean expectMessage = false;

private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;
private Duration receiveTimeout = Duration.ofMillis(DEFAULT_RECEIVE_TIMEOUT);

private long recoveryInterval = DEFAULT_RECOVERY_INTERVAL;

Expand All @@ -91,7 +92,7 @@ public class RedisQueueMessageDrivenEndpoint extends MessageProducerSupport
private volatile @Nullable Runnable stopCallback;

/**
* @param queueName Must not be an empty String
* @param queueName Must not be an empty String
* @param connectionFactory Must not be null
*/
public RedisQueueMessageDrivenEndpoint(String queueName, RedisConnectionFactory connectionFactory) {
Expand Down Expand Up @@ -125,29 +126,51 @@ public void setSerializer(RedisSerializer<?> serializer) {
/**
* When data is retrieved from the Redis queue, does the returned data represent
* just the payload for a Message, or does the data represent a serialized
* {@link Message}?. {@code expectMessage} defaults to false. This means
* the retrieved data will be used as the payload for a new Spring Integration
* Message. Otherwise, the data is deserialized as Spring Integration Message.
* {@link Message}?. {@code expectMessage} defaults to false. This means the
* retrieved data will be used as the payload for a new Spring Integration Message.
* Otherwise, the data is deserialized as Spring Integration Message.
* @param expectMessage Defaults to false
*/
public void setExpectMessage(boolean expectMessage) {
this.expectMessage = expectMessage;
}

/**
* This timeout is used when retrieving elements from the queue
* specified by {@link BoundListOperations}.
* <p>
* If the queue does contain elements, the data is retrieved immediately. However,
* if the queue is empty, the Redis connection is blocked until either an element
* can be retrieved from the queue or until the specified timeout passes.
* <p>
* A timeout of zero can be used to block indefinitely. If not set explicitly
* the timeout value will default to {@code 1000 millis}
* <p>
* See also: https://redis.io/commands/brpop
* @param receiveTimeout {@link Duration} containing the receive timeout.
* @since 7.1
*/
public void setReceiveDuration(Duration receiveTimeout) {
Assert.isTrue(!receiveTimeout.isNegative(), "'receiveTimeout' must be >= 0.");
this.receiveTimeout = receiveTimeout;
}

/**
* This timeout (milliseconds) is used when retrieving elements from the queue
* specified by {@link #boundListOperations}.
* <p> If the queue does contain elements, the data is retrieved immediately. However,
* <p>
* If the queue does contain elements, the data is retrieved immediately. However,
* if the queue is empty, the Redis connection is blocked until either an element
* can be retrieved from the queue or until the specified timeout passes.
* <p> A timeout of zero can be used to block indefinitely. If not set explicitly
* the timeout value will default to {@code 1000}
* <p> See also: https://redis.io/commands/brpop
* <p>
* A timeout of zero can be used to block indefinitely. If not set explicitly
* the timeout value will default to {@code 1000 millis}
* <p>
* See also: https://redis.io/commands/brpop
* @param receiveTimeout Must be non-negative. Specified in milliseconds.
*/
public void setReceiveTimeout(long receiveTimeout) {
Assert.isTrue(receiveTimeout >= 0, "'receiveTimeout' must be >= 0.");
this.receiveTimeout = receiveTimeout;
setReceiveDuration(Duration.ofMillis(receiveTimeout));
}

public void setTaskExecutor(Executor taskExecutor) {
Expand Down Expand Up @@ -241,10 +264,10 @@ private void popMessageAndSend() {
byte[] value = null;
try {
if (this.rightPop) {
value = this.boundListOperations.rightPop(this.receiveTimeout, TimeUnit.MILLISECONDS);
value = this.boundListOperations.rightPop(this.receiveTimeout);
}
else {
value = this.boundListOperations.leftPop(this.receiveTimeout, TimeUnit.MILLISECONDS);
value = this.boundListOperations.leftPop(this.receiveTimeout);
}
}
catch (Exception ex) {
Expand Down Expand Up @@ -314,7 +337,7 @@ public boolean isListening() {
}

/**
* Returns the size of the Queue specified by {@link #boundListOperations}. The queue is
* Return the size of the Queue specified by {@link #boundListOperations}. The queue is
* represented by a Redis list. If the queue does not exist <code>0</code>
* is returned. See also https://redis.io/commands/llen
* @return Size of the queue. Never negative.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package org.springframework.integration.redis.outbound;

import java.util.concurrent.TimeUnit;
import java.time.Duration;

import org.jspecify.annotations.Nullable;

Expand All @@ -36,6 +36,7 @@
* @author David Liu
* @author Artem Bilan
* @author Gary Russell
* @author Glenn Renfro
*
* @since 4.1
*/
Expand All @@ -60,9 +61,10 @@ public class RedisQueueOutboundGateway extends AbstractReplyProducingMessageHand

private boolean serializerExplicitlySet;

private int receiveTimeout = TIMEOUT;
private Duration receiveTimeout = Duration.ofMillis(TIMEOUT);

public RedisQueueOutboundGateway(String queueName, RedisConnectionFactory connectionFactory) {

Assert.hasText(queueName, "'queueName' is required");
Assert.notNull(connectionFactory, "'connectionFactory' must not be null");
this.template.setConnectionFactory(connectionFactory);
Expand All @@ -80,8 +82,62 @@ public void setBeanClassLoader(ClassLoader beanClassLoader) {
}
}

public void setReceiveTimeout(int timeout) {
this.receiveTimeout = timeout;
/**
* This timeout is used when retrieving elements from the queue
* specified by {@link BoundListOperations}.
* <p>
* If the queue does contain elements, the data is retrieved immediately. However,
* if the queue is empty, the Redis connection is blocked until either an element
* can be retrieved from the queue or until the specified timeout passes.
* <p>
* A timeout of zero can be used to block indefinitely. If not set explicitly
* the timeout value will default to {@code 1000 millis}
* <p>
* See also: https://redis.io/commands/brpop
* @param receiveTimeout {@link Duration} containing the receive timeout.
* @since 7.1
*/
public void setReceiveDuration(Duration receiveTimeout) {
Assert.isTrue(!receiveTimeout.isNegative(), "'receiveTimeout' must be >= 0.");
this.receiveTimeout = receiveTimeout;
}

/**
* This timeout is used when retrieving elements from the queue
* specified by {@link BoundListOperations}.
* <p>
* If the queue does contain elements, the data is retrieved immediately. However,
* if the queue is empty, the Redis connection is blocked until either an element
* can be retrieved from the queue or until the specified timeout passes.
* <p>
* A timeout of zero can be used to block indefinitely. If not set explicitly
* the timeout value will default to {@code 1000 millis}
* <p>
* See also: https://redis.io/commands/brpop
* @param receiveTimeout Must be non-negative. Specified in milliseconds.
* @deprecated since 7.1 in favor of {@link #setReceiveTimeout(long)}
*/
@Deprecated(forRemoval = true, since = "7.1")
public void setReceiveTimeout(int receiveTimeout) {
setReceiveDuration(Duration.ofMillis(receiveTimeout));
}

/**
* This timeout is used when retrieving elements from the queue
* specified by {@link BoundListOperations}.
* <p>
* If the queue does contain elements, the data is retrieved immediately. However,
* if the queue is empty, the Redis connection is blocked until either an element
* can be retrieved from the queue or until the specified timeout passes.
* <p>
* A timeout of zero can be used to block indefinitely. If not set explicitly
* the timeout value will default to {@code 1000 millis}
* <p>
* See also: https://redis.io/commands/brpop
* @param receiveTimeout Must be non-negative. Specified in milliseconds.
*/
public void setReceiveTimeout(long receiveTimeout) {
setReceiveDuration(Duration.ofMillis(receiveTimeout));
}

public void setExtractPayload(boolean extractPayload) {
Expand Down Expand Up @@ -129,7 +185,7 @@ protected Object handleRequestMessage(Message<?> message) {
this.template.boundListOps(uuid).leftPush(value);

BoundListOperations<String, Object> boundListOperations = this.template.boundListOps(uuid + QUEUE_NAME_SUFFIX);
byte[] reply = (byte[]) boundListOperations.rightPop(this.receiveTimeout, TimeUnit.MILLISECONDS);
byte[] reply = (byte[]) boundListOperations.rightPop(this.receiveTimeout);
if (reply != null && reply.length > 0) {
return createReply(reply);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.springframework.integration.redis.config;

import java.time.Duration;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -92,8 +94,8 @@ void testRequestWithReply() {

@Test
void testInboundGatewayStop() {
Integer receiveTimeout = TestUtils.getPropertyValue(this.outboundGateway, "receiveTimeout");
this.outboundGateway.setReceiveTimeout(1);
Duration receiveTimeout = TestUtils.getPropertyValue(this.outboundGateway, "receiveTimeout");
this.outboundGateway.setReceiveTimeout(1L);
this.inboundGateway.stop();
try {
this.sendChannel.send(new GenericMessage<>("test1"));
Expand All @@ -102,14 +104,14 @@ void testInboundGatewayStop() {
assertThat(e.getMessage()).contains("No reply produced");
}
finally {
this.outboundGateway.setReceiveTimeout(receiveTimeout);
this.outboundGateway.setReceiveDuration(receiveTimeout);
}
}

@Test
void testNullSerializer() {
Integer receiveTimeout = TestUtils.getPropertyValue(this.outboundGateway, "receiveTimeout");
this.outboundGateway.setReceiveTimeout(1);
Duration receiveTimeout = TestUtils.getPropertyValue(this.outboundGateway, "receiveTimeout");
this.outboundGateway.setReceiveTimeout(1L);
this.inboundGateway.setSerializer(null);
try {
this.sendChannel.send(new GenericMessage<>("test1"));
Expand All @@ -119,7 +121,7 @@ void testNullSerializer() {
}
finally {
this.inboundGateway.setSerializer(new StringRedisSerializer());
this.outboundGateway.setReceiveTimeout(receiveTimeout);
this.outboundGateway.setReceiveDuration(receiveTimeout);
}
}

Expand Down
Loading