From e7d9acf13b9c82ee6cb3e141994a1c69a7705fed Mon Sep 17 00:00:00 2001 From: Radek Davidek Date: Tue, 17 Mar 2026 18:45:05 +0100 Subject: [PATCH] CorrelationId added --- .../connectors/messaging/IbmMqConnector.java | 12 ++++ .../harness/support/messaging/ImqRequest.java | 55 ++++++++++++++++++- .../messaging/ImqCorrelationIdTest.java | 25 +++++++++ .../system/messaging/ImqFirstVisionTest.java | 8 --- 4 files changed, 90 insertions(+), 10 deletions(-) create mode 100644 tests/src/test/java/cz/moneta/test/system/messaging/ImqCorrelationIdTest.java diff --git a/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/IbmMqConnector.java b/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/IbmMqConnector.java index 836bcfd..b0c9498 100644 --- a/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/IbmMqConnector.java +++ b/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/IbmMqConnector.java @@ -7,6 +7,8 @@ import cz.moneta.test.harness.messaging.ReceivedMessage; import cz.moneta.test.harness.messaging.exception.MessagingConnectionException; import cz.moneta.test.harness.messaging.exception.MessagingDestinationException; import cz.moneta.test.harness.messaging.exception.MessagingTimeoutException; +import cz.moneta.test.harness.support.messaging.ImqRequest; + import com.ibm.mq.jms.MQConnectionFactory; import com.ibm.msg.client.wmq.WMQConstants; import org.apache.commons.lang3.StringUtils; @@ -128,6 +130,16 @@ public class IbmMqConnector implements Connector { if (properties != null) { for (Map.Entry entry : properties.entrySet()) { try { + if (entry.getKey().equals(ImqRequest.PROP_JMS_CORRELATION_ID)) { + message.setJMSCorrelationID(entry.getValue()); + continue; + } else if (entry.getKey().equals(ImqRequest.PROP_JMS_TYPE)) { + message.setJMSType(entry.getValue()); + continue; + } else if (entry.getKey().equals(ImqRequest.PROP_JMS_MESSAGE_ID)) { + message.setJMSMessageID(entry.getValue()); + continue; + } message.setStringProperty(entry.getKey(), entry.getValue()); } catch (JMSException e) { LOG.warn("Failed to set property: {}", entry.getKey(), e); diff --git a/test-harness/src/main/java/cz/moneta/test/harness/support/messaging/ImqRequest.java b/test-harness/src/main/java/cz/moneta/test/harness/support/messaging/ImqRequest.java index abce61b..ef42b70 100644 --- a/test-harness/src/main/java/cz/moneta/test/harness/support/messaging/ImqRequest.java +++ b/test-harness/src/main/java/cz/moneta/test/harness/support/messaging/ImqRequest.java @@ -39,6 +39,12 @@ public final class ImqRequest { private static final ObjectMapper JSON_MAPPER = new ObjectMapper(); + private static final String PROP_LOGICAL_QUEUE = "LogicalQueue"; + private static final String PROP_SELECTOR = "Selector"; + public static final String PROP_JMS_CORRELATION_ID = "JMSCorrelationID"; + public static final String PROP_JMS_MESSAGE_ID = "JMSMessageID"; + public static final String PROP_JMS_TYPE = "JMSType"; + private ImqRequest() { } @@ -130,6 +136,21 @@ public final class ImqRequest { */ PayloadPhase appendToArray(String path, Object value); + /** + * Set JMS correlation ID. + */ + PayloadPhase withCorrelationId(String correlationId); + + /** + * Set JMS message ID. + */ + PayloadPhase withMessageId(String messageId); + + /** + * Set JMS type. + */ + PayloadPhase withJmsType(String jmsType); + /** * Send the message. */ @@ -215,6 +236,9 @@ public final class ImqRequest { private String payload; private Map fields = new HashMap<>(); private List> arrayAppends = new ArrayList<>(); + private String correlationId; + private String messageId; + private String jmsType; private Predicate filter; private Duration timeout; @@ -291,16 +315,43 @@ public final class ImqRequest { return this; } + @Override + public PayloadPhase withCorrelationId(String correlationId) { + this.correlationId = correlationId; + return this; + } + + @Override + public PayloadPhase withMessageId(String messageId) { + this.messageId = messageId; + return this; + } + + @Override + public PayloadPhase withJmsType(String jmsType) { + this.jmsType = jmsType; + return this; + } + @Override public void send() { String finalPayload = buildPayload(); Map properties = new HashMap<>(); if (logicalQueue != null) { - properties.put("LogicalQueue", logicalQueue.name()); + properties.put(PROP_LOGICAL_QUEUE, logicalQueue.name()); } if (selector != null && !selector.isBlank()) { - properties.put("Selector", selector); + properties.put(PROP_SELECTOR, selector); + } + if (correlationId != null) { + properties.put(PROP_JMS_CORRELATION_ID, correlationId); + } + if (messageId != null) { + properties.put(PROP_JMS_MESSAGE_ID, messageId); + } + if (jmsType != null) { + properties.put(PROP_JMS_TYPE, jmsType); } endpoint.send(getQueueName(), finalPayload, format, properties); diff --git a/tests/src/test/java/cz/moneta/test/system/messaging/ImqCorrelationIdTest.java b/tests/src/test/java/cz/moneta/test/system/messaging/ImqCorrelationIdTest.java new file mode 100644 index 0000000..522d95e --- /dev/null +++ b/tests/src/test/java/cz/moneta/test/system/messaging/ImqCorrelationIdTest.java @@ -0,0 +1,25 @@ +package cz.moneta.test.system.messaging; + +import java.util.concurrent.TimeUnit; + +import cz.moneta.test.dsl.Harness; +import cz.moneta.test.harness.annotations.TestCase; +import cz.moneta.test.harness.annotations.TestScenario; +import cz.moneta.test.harness.endpoints.imq.ImqFirstVisionQueue; + +@TestScenario(name = "IBM MQ Correlation ID Test") +public class ImqCorrelationIdTest { + + @TestCase(name = "Send message with Correlation ID") + public void sendMessageWithCorrelationId(Harness harness) { + harness.withImqFirstVision().toQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS) + .withPayload("{\"paymentId\": \"PAY-789\"}").withCorrelationId("corr-789").send(); + } + + @TestCase(name = "Receive message with Correlation ID") + public void receiveMessageWithCorrelationId(Harness harness) { + harness.withImqFirstVision().fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS) + .withSelector("JMSCorrelationID = 'corr-789'").receiveWhere(msg -> true) + .withTimeout(10, TimeUnit.SECONDS); + } +} \ No newline at end of file diff --git a/tests/src/test/java/cz/moneta/test/system/messaging/ImqFirstVisionTest.java b/tests/src/test/java/cz/moneta/test/system/messaging/ImqFirstVisionTest.java index 497ef86..3e587ee 100644 --- a/tests/src/test/java/cz/moneta/test/system/messaging/ImqFirstVisionTest.java +++ b/tests/src/test/java/cz/moneta/test/system/messaging/ImqFirstVisionTest.java @@ -71,12 +71,4 @@ public class ImqFirstVisionTest { .addField("beneficiary", "accountNumber", "1234567890/0100").send(); } - @TestCase(name = "Receive message with Correlation ID") - public void receiveMessageWithCorrelationId(Harness harness) { - - harness.withImqFirstVision().fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS) - .withSelector("JMSCorrelationID = 'corr-789'").receiveWhere(msg -> true) - .withTimeout(10, TimeUnit.SECONDS); - } - }