Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,26 @@

package org.springframework.integration.jms.channel;

import jakarta.jms.JMSException;

import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.jms.DefaultJmsHeaderMapper;
import org.springframework.integration.jms.DynamicJmsTemplateProperties;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessagingMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.util.Assert;

/**
* A base {@link AbstractMessageChannel} implementation for JMS-backed message channels.
*
* @author Mark Fisher
* @author Gary Russell
* @author Artem Bilan
*
* @since 7.0
*
Expand All @@ -36,27 +44,62 @@
*/
public abstract class AbstractJmsChannel extends AbstractMessageChannel {

private final JmsTemplate jmsTemplate;
protected final DefaultJmsHeaderMapper headerMapper = new DefaultJmsHeaderMapper();

protected final JmsTemplate jmsTemplate;

public AbstractJmsChannel(JmsTemplate jmsTemplate) {
Assert.notNull(jmsTemplate, "jmsTemplate must not be null");
this.jmsTemplate = jmsTemplate;
}

JmsTemplate getJmsTemplate() {
return this.jmsTemplate;
}

@Override
protected boolean doSend(Message<?> message, long timeout) {
try {
DynamicJmsTemplateProperties.setPriority(new IntegrationMessageHeaderAccessor(message).getPriority());
this.jmsTemplate.convertAndSend(message);
MessageConverter messageConverter = this.jmsTemplate.getMessageConverter();
this.jmsTemplate.send((session) -> {
jakarta.jms.Message jmsMessage = messageConverter.toMessage(message, session);
if (!(messageConverter instanceof MessagingMessageConverter)) {
MessageHeaders headers = message.getHeaders();
this.headerMapper.fromHeaders(headers, jmsMessage);
}
return jmsMessage;
});
}
finally {
DynamicJmsTemplateProperties.clearPriority();
}
return true;
}

protected Message<?> fromJmsMessage(jakarta.jms.Message message) {
MessageConverter converter = this.jmsTemplate.getMessageConverter();
try {
Object converted = converter.fromMessage(message);
Message<?> messageToSend;
if (converted instanceof Message<?> convertedMessage) {
messageToSend = convertedMessage;
if (!(converter instanceof MessagingMessageConverter)) {
messageToSend = getMessageBuilderFactory()
.fromMessage(messageToSend)
.copyHeadersIfAbsent(this.headerMapper.toHeaders(message))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would a user want the original header overwritten? Is this an option we want to add?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah...
First of all I don't want to expose headers manipulation options from these channels at all.
And secondly, I followed exactly same logic as MessagingMessageConverter.
This our logic here is to mimic that converter when Message is deserialized as is without any JMS headers.
If header is present there in the message, then it is indeed end-user preference.
If something else should be done, the custom MessageConverter is welcome.
Just not header mapping from the channel configuration.

.build();
}
}
else {
messageToSend = getMessageBuilderFactory()
.withPayload(converted)
.copyHeaders(this.headerMapper.toHeaders(message))
.build();
}

return messageToSend;
}
catch (JMSException ex) {
throw new MessagingException("failed to convert incoming JMS Message", ex);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ public void setMessageSelector(String messageSelector) {
}

@Override
@Nullable
public Message<?> receive(long timeout) {
public @Nullable Message<?> receive(long timeout) {
try {
DynamicJmsTemplateProperties.setReceiveTimeout(timeout);
return receive();
Expand All @@ -73,8 +72,7 @@ public Message<?> receive(long timeout) {
}

@Override
@Nullable
public Message<?> receive() {
public @Nullable Message<?> receive() {
ChannelInterceptorList interceptorList = getIChannelInterceptorList();
Deque<ChannelInterceptor> interceptorStack = null;
boolean counted = false;
Expand Down Expand Up @@ -109,23 +107,22 @@ public Message<?> receive() {
}
}

@Nullable
private Message<?> receiveAndConvertToMessage() {
Object object;
private @Nullable Message<?> receiveAndConvertToMessage() {
jakarta.jms.Message jmsMessage;
if (this.messageSelector == null) {
object = getJmsTemplate().receiveAndConvert();
jmsMessage = this.jmsTemplate.receive();
}
else {
object = getJmsTemplate().receiveSelectedAndConvert(this.messageSelector);
jmsMessage = this.jmsTemplate.receiveSelected(this.messageSelector);
}

if (object == null) {
if (jmsMessage == null) {
logger.trace(() -> "postReceive on channel '" + this + "', message is null");
return null;
}
Message<?> message = object instanceof Message<?> msg
? msg
: getMessageBuilderFactory().withPayload(object).build();

Message<?> message = fromJmsMessage(jmsMessage);

logger.debug(() -> "postReceive on channel '" + this + "', message: " + message);

return message;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,19 @@
package org.springframework.integration.jms.channel;

import jakarta.jms.MessageListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.integration.MessageDispatchingException;
import org.springframework.integration.channel.BroadcastCapableChannel;
import org.springframework.integration.dispatcher.AbstractDispatcher;
import org.springframework.integration.dispatcher.BroadcastingDispatcher;
import org.springframework.integration.dispatcher.MessageDispatcher;
import org.springframework.integration.dispatcher.RoundRobinLoadBalancingStrategy;
import org.springframework.integration.dispatcher.UnicastingDispatcher;
import org.springframework.integration.support.MessageBuilderFactory;
import org.springframework.integration.support.management.ManageableSmartLifecycle;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.AbstractMessageListenerContainer;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.util.Assert;

/**
Expand Down Expand Up @@ -79,15 +73,11 @@ public void setMaxSubscribers(int maxSubscribers) {

@Override
public boolean subscribe(MessageHandler handler) {
Assert.state(this.dispatcher != null,
"'MessageDispatcher' must not be null. This channel might not have been initialized");
return this.dispatcher.addHandler(handler);
}

@Override
public boolean unsubscribe(MessageHandler handler) {
Assert.state(this.dispatcher != null,
"'MessageDispatcher' must not be null. This channel might not have been initialized");
return this.dispatcher.removeHandler(handler);
}

Expand All @@ -102,12 +92,8 @@ public void onInit() {
return;
}
super.onInit();
boolean isPubSub = isBroadcast();
configureDispatcher(isPubSub);
MessageListener listener =
new DispatchingMessageListener(getJmsTemplate(), this.dispatcher, this, isPubSub,
getMessageBuilderFactory());
this.container.setMessageListener(listener);
configureDispatcher(isBroadcast());
this.container.setMessageListener((MessageListener) this::receiveAndDispatch);
if (!this.container.isActive()) {
this.container.afterPropertiesSet();
}
Expand Down Expand Up @@ -140,115 +126,54 @@ private void configureDispatcher(boolean isPubSub) {

@Override
public boolean isAutoStartup() {
return (this.container != null) && this.container.isAutoStartup();
return this.container.isAutoStartup();
}

@Override
public int getPhase() {
return (this.container != null) ? this.container.getPhase() : 0;
return this.container.getPhase();
}

@Override
public boolean isRunning() {
return (this.container != null) && this.container.isRunning();
return this.container.isRunning();
}

@Override
public void start() {
if (this.container != null) {
this.container.start();
}
this.container.start();
}

@Override
public void stop() {
if (this.container != null) {
this.container.stop();
}
this.container.stop();
}

@Override
public void stop(Runnable callback) {
if (this.container != null) {
this.container.stop(callback);
}
else {
callback.run();
}
this.container.stop(callback);
}

@Override
public void destroy() {
if (this.container != null) {
this.container.destroy();
}
this.container.destroy();
}

private static final class DispatchingMessageListener implements MessageListener {

private final Log logger = LogFactory.getLog(this.getClass());

private final JmsTemplate jmsTemplate;

private final MessageDispatcher dispatcher;

private final SubscribableJmsChannel channel;

private final boolean isPubSub;

private final MessageBuilderFactory messageBuilderFactory;

DispatchingMessageListener(JmsTemplate jmsTemplate,
MessageDispatcher dispatcher, SubscribableJmsChannel channel, boolean isPubSub,
MessageBuilderFactory messageBuilderFactory) {

this.jmsTemplate = jmsTemplate;
this.dispatcher = dispatcher;
this.channel = channel;
this.isPubSub = isPubSub;
this.messageBuilderFactory = messageBuilderFactory;
private void receiveAndDispatch(jakarta.jms.Message message) {
Message<?> messageToSend = fromJmsMessage(message);
try {
this.dispatcher.dispatch(messageToSend);
Comment thread
artembilan marked this conversation as resolved.
}

@SuppressWarnings("NullAway") // Dataflow analysis limitation
@Override
public void onMessage(jakarta.jms.Message message) {
Message<?> messageToSend = null;
try {
MessageConverter converter = this.jmsTemplate.getMessageConverter();
Object converted = null;
if (converter != null) {
converted = converter.fromMessage(message);
}
if (converted != null) {
messageToSend =
converted instanceof Message<?> convertedMessage
? convertedMessage
: this.messageBuilderFactory.withPayload(converted).build();
this.dispatcher.dispatch(messageToSend);
}
else if (this.logger.isWarnEnabled()) {
this.logger.warn("No converter found, or converter returned null for: " + message
+ ", no Message to dispatch");
}
catch (MessageDispatchingException ex) {
String exceptionMessage = ex.getMessage() + " for jms-channel '" + this.getFullChannelName() + "'.";
if (isBroadcast()) {
// log only for backwards compatibility with pub/sub
this.logger.warn(ex, exceptionMessage);
}
catch (MessageDispatchingException ex) {
String exceptionMessage = ex.getMessage() + " for jms-channel '"
+ this.channel.getFullChannelName() + "'.";
if (this.isPubSub) {
// log only for backwards compatibility with pub/sub
if (this.logger.isWarnEnabled()) {
this.logger.warn(exceptionMessage, ex);
}
}
else {
throw new MessageDeliveryException(messageToSend, exceptionMessage, ex);
}
}
catch (Exception ex) {
throw new MessagingException("failed to handle incoming JMS Message", ex);
else {
throw new MessageDeliveryException(messageToSend, exceptionMessage, ex);
}
}

}

}
Loading