CorrelationId added
This commit is contained in:
parent
23a5e9972d
commit
e7d9acf13b
@ -7,6 +7,8 @@ import cz.moneta.test.harness.messaging.ReceivedMessage;
|
|||||||
import cz.moneta.test.harness.messaging.exception.MessagingConnectionException;
|
import cz.moneta.test.harness.messaging.exception.MessagingConnectionException;
|
||||||
import cz.moneta.test.harness.messaging.exception.MessagingDestinationException;
|
import cz.moneta.test.harness.messaging.exception.MessagingDestinationException;
|
||||||
import cz.moneta.test.harness.messaging.exception.MessagingTimeoutException;
|
import cz.moneta.test.harness.messaging.exception.MessagingTimeoutException;
|
||||||
|
import cz.moneta.test.harness.support.messaging.ImqRequest;
|
||||||
|
|
||||||
import com.ibm.mq.jms.MQConnectionFactory;
|
import com.ibm.mq.jms.MQConnectionFactory;
|
||||||
import com.ibm.msg.client.wmq.WMQConstants;
|
import com.ibm.msg.client.wmq.WMQConstants;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
@ -128,6 +130,16 @@ public class IbmMqConnector implements Connector {
|
|||||||
if (properties != null) {
|
if (properties != null) {
|
||||||
for (Map.Entry<String, String> entry : properties.entrySet()) {
|
for (Map.Entry<String, String> entry : properties.entrySet()) {
|
||||||
try {
|
try {
|
||||||
|
if (entry.getKey().equals(ImqRequest.PROP_JMS_CORRELATION_ID)) {
|
||||||
|
message.setJMSCorrelationID(entry.getValue());
|
||||||
|
continue;
|
||||||
|
} else if (entry.getKey().equals(ImqRequest.PROP_JMS_TYPE)) {
|
||||||
|
message.setJMSType(entry.getValue());
|
||||||
|
continue;
|
||||||
|
} else if (entry.getKey().equals(ImqRequest.PROP_JMS_MESSAGE_ID)) {
|
||||||
|
message.setJMSMessageID(entry.getValue());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
message.setStringProperty(entry.getKey(), entry.getValue());
|
message.setStringProperty(entry.getKey(), entry.getValue());
|
||||||
} catch (JMSException e) {
|
} catch (JMSException e) {
|
||||||
LOG.warn("Failed to set property: {}", entry.getKey(), e);
|
LOG.warn("Failed to set property: {}", entry.getKey(), e);
|
||||||
|
|||||||
@ -39,6 +39,12 @@ public final class ImqRequest {
|
|||||||
|
|
||||||
private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
|
private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
|
private static final String PROP_LOGICAL_QUEUE = "LogicalQueue";
|
||||||
|
private static final String PROP_SELECTOR = "Selector";
|
||||||
|
public static final String PROP_JMS_CORRELATION_ID = "JMSCorrelationID";
|
||||||
|
public static final String PROP_JMS_MESSAGE_ID = "JMSMessageID";
|
||||||
|
public static final String PROP_JMS_TYPE = "JMSType";
|
||||||
|
|
||||||
private ImqRequest() {
|
private ImqRequest() {
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -130,6 +136,21 @@ public final class ImqRequest {
|
|||||||
*/
|
*/
|
||||||
PayloadPhase appendToArray(String path, Object value);
|
PayloadPhase appendToArray(String path, Object value);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set JMS correlation ID.
|
||||||
|
*/
|
||||||
|
PayloadPhase withCorrelationId(String correlationId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set JMS message ID.
|
||||||
|
*/
|
||||||
|
PayloadPhase withMessageId(String messageId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set JMS type.
|
||||||
|
*/
|
||||||
|
PayloadPhase withJmsType(String jmsType);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send the message.
|
* Send the message.
|
||||||
*/
|
*/
|
||||||
@ -215,6 +236,9 @@ public final class ImqRequest {
|
|||||||
private String payload;
|
private String payload;
|
||||||
private Map<String, Object> fields = new HashMap<>();
|
private Map<String, Object> fields = new HashMap<>();
|
||||||
private List<Map.Entry<String, Object>> arrayAppends = new ArrayList<>();
|
private List<Map.Entry<String, Object>> arrayAppends = new ArrayList<>();
|
||||||
|
private String correlationId;
|
||||||
|
private String messageId;
|
||||||
|
private String jmsType;
|
||||||
private Predicate<ReceivedMessage> filter;
|
private Predicate<ReceivedMessage> filter;
|
||||||
private Duration timeout;
|
private Duration timeout;
|
||||||
|
|
||||||
@ -291,16 +315,43 @@ public final class ImqRequest {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public PayloadPhase withCorrelationId(String correlationId) {
|
||||||
|
this.correlationId = correlationId;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public PayloadPhase withMessageId(String messageId) {
|
||||||
|
this.messageId = messageId;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public PayloadPhase withJmsType(String jmsType) {
|
||||||
|
this.jmsType = jmsType;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void send() {
|
public void send() {
|
||||||
String finalPayload = buildPayload();
|
String finalPayload = buildPayload();
|
||||||
Map<String, String> properties = new HashMap<>();
|
Map<String, String> properties = new HashMap<>();
|
||||||
|
|
||||||
if (logicalQueue != null) {
|
if (logicalQueue != null) {
|
||||||
properties.put("LogicalQueue", logicalQueue.name());
|
properties.put(PROP_LOGICAL_QUEUE, logicalQueue.name());
|
||||||
}
|
}
|
||||||
if (selector != null && !selector.isBlank()) {
|
if (selector != null && !selector.isBlank()) {
|
||||||
properties.put("Selector", selector);
|
properties.put(PROP_SELECTOR, selector);
|
||||||
|
}
|
||||||
|
if (correlationId != null) {
|
||||||
|
properties.put(PROP_JMS_CORRELATION_ID, correlationId);
|
||||||
|
}
|
||||||
|
if (messageId != null) {
|
||||||
|
properties.put(PROP_JMS_MESSAGE_ID, messageId);
|
||||||
|
}
|
||||||
|
if (jmsType != null) {
|
||||||
|
properties.put(PROP_JMS_TYPE, jmsType);
|
||||||
}
|
}
|
||||||
|
|
||||||
endpoint.send(getQueueName(), finalPayload, format, properties);
|
endpoint.send(getQueueName(), finalPayload, format, properties);
|
||||||
|
|||||||
@ -0,0 +1,25 @@
|
|||||||
|
package cz.moneta.test.system.messaging;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import cz.moneta.test.dsl.Harness;
|
||||||
|
import cz.moneta.test.harness.annotations.TestCase;
|
||||||
|
import cz.moneta.test.harness.annotations.TestScenario;
|
||||||
|
import cz.moneta.test.harness.endpoints.imq.ImqFirstVisionQueue;
|
||||||
|
|
||||||
|
@TestScenario(name = "IBM MQ Correlation ID Test")
|
||||||
|
public class ImqCorrelationIdTest {
|
||||||
|
|
||||||
|
@TestCase(name = "Send message with Correlation ID")
|
||||||
|
public void sendMessageWithCorrelationId(Harness harness) {
|
||||||
|
harness.withImqFirstVision().toQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
|
||||||
|
.withPayload("{\"paymentId\": \"PAY-789\"}").withCorrelationId("corr-789").send();
|
||||||
|
}
|
||||||
|
|
||||||
|
@TestCase(name = "Receive message with Correlation ID")
|
||||||
|
public void receiveMessageWithCorrelationId(Harness harness) {
|
||||||
|
harness.withImqFirstVision().fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
|
||||||
|
.withSelector("JMSCorrelationID = 'corr-789'").receiveWhere(msg -> true)
|
||||||
|
.withTimeout(10, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -71,12 +71,4 @@ public class ImqFirstVisionTest {
|
|||||||
.addField("beneficiary", "accountNumber", "1234567890/0100").send();
|
.addField("beneficiary", "accountNumber", "1234567890/0100").send();
|
||||||
}
|
}
|
||||||
|
|
||||||
@TestCase(name = "Receive message with Correlation ID")
|
|
||||||
public void receiveMessageWithCorrelationId(Harness harness) {
|
|
||||||
|
|
||||||
harness.withImqFirstVision().fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
|
|
||||||
.withSelector("JMSCorrelationID = 'corr-789'").receiveWhere(msg -> true)
|
|
||||||
.withTimeout(10, TimeUnit.SECONDS);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user