rewritten connector
This commit is contained in:
parent
579246d772
commit
0833bcff06
@ -24,424 +24,423 @@ import java.util.Map;
|
|||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* IBM MQ connector using JMS client with Jakarta JMS API.
|
* IBM MQ connector using JMS client with Jakarta JMS API. Supports
|
||||||
* Supports multi-instance Queue Manager, SSL/TLS, and multiple message formats.
|
* multi-instance Queue Manager, SSL/TLS, and multiple message formats.
|
||||||
* <p>
|
* <p>
|
||||||
* Supported formats:
|
* Supported formats: - JSON: JMS TextMessage with plain JSON string (default) -
|
||||||
* - JSON: JMS TextMessage with plain JSON string (default)
|
* XML: JMS TextMessage with XML string - UTF-8 (CCSID 1208): JMS BytesMessage
|
||||||
* - XML: JMS TextMessage with XML string
|
* with UTF-8 encoding - EBCDIC (CCSID 870): JMS BytesMessage with EBCDIC
|
||||||
* - UTF-8 (CCSID 1208): JMS BytesMessage with UTF-8 encoding
|
* IBM-870 encoding
|
||||||
* - EBCDIC (CCSID 870): JMS BytesMessage with EBCDIC IBM-870 encoding
|
|
||||||
*/
|
*/
|
||||||
public class IbmMqConnector implements Connector {
|
public class IbmMqConnector implements Connector {
|
||||||
|
|
||||||
private static final Logger LOG = LogManager.getLogger(IbmMqConnector.class);
|
private static final Logger LOG = LogManager.getLogger(IbmMqConnector.class);
|
||||||
|
|
||||||
private static final Charset EBCDIC_870 = Charset.forName("IBM870");
|
private static final Charset EBCDIC_870 = Charset.forName("IBM870");
|
||||||
private static final Charset UTF_8 = StandardCharsets.UTF_8;
|
private static final Charset UTF_8 = StandardCharsets.UTF_8;
|
||||||
|
|
||||||
private static final long DEFAULT_POLL_INTERVAL_MS = 100;
|
private static final long DEFAULT_POLL_INTERVAL_MS = 100;
|
||||||
private static final long DEFAULT_MAX_POLL_INTERVAL_MS = 1000;
|
private static final long DEFAULT_MAX_POLL_INTERVAL_MS = 1000;
|
||||||
|
|
||||||
private final MQConnectionFactory connectionFactory;
|
private final MQConnectionFactory connectionFactory;
|
||||||
private JMSContext jmsContext;
|
private JMSContext jmsContext;
|
||||||
private final String queueManager;
|
private final String queueManager;
|
||||||
|
private final String user;
|
||||||
|
private final String password;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor with multi-instance Queue Manager support.
|
* Constructor with multi-instance Queue Manager support.
|
||||||
*
|
*
|
||||||
* @param connectionNameList Connection name list in format "host1(port1),host2(port2)"
|
* @param connectionNameList Connection name list in format
|
||||||
* @param channel MQ channel name
|
* "host1(port1),host2(port2)"
|
||||||
* @param queueManager Queue Manager name
|
* @param channel MQ channel name
|
||||||
* @param user Username for authentication
|
* @param queueManager Queue Manager name
|
||||||
* @param password Password for authentication
|
* @param user Username for authentication
|
||||||
* @param keystorePath Path to SSL keystore (can be null for non-SSL)
|
* @param password Password for authentication
|
||||||
* @param keystorePassword Password for SSL keystore
|
* @param keystorePath Path to SSL keystore (can be null for non-SSL)
|
||||||
* @param sslCipherSuite SSL cipher suite to use (e.g., "TLS_RSA_WITH_AES_256_CBC_SHA256")
|
* @param keystorePassword Password for SSL keystore
|
||||||
*/
|
* @param sslCipherSuite SSL cipher suite to use (e.g.,
|
||||||
public IbmMqConnector(String connectionNameList, String channel, String queueManager,
|
* "TLS_RSA_WITH_AES_256_CBC_SHA256")
|
||||||
String user, String password,
|
*/
|
||||||
String keystorePath, String keystorePassword, String sslCipherSuite) {
|
public IbmMqConnector(String connectionNameList, String channel, String queueManager, String user, String password,
|
||||||
this.queueManager = queueManager;
|
String keystorePath, String keystorePassword, String sslCipherSuite) {
|
||||||
|
this.queueManager = queueManager;
|
||||||
|
this.user = user;
|
||||||
|
this.password = password;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
MQConnectionFactory cf = new MQConnectionFactory();
|
if (keystorePath != null && !keystorePath.isBlank()) {
|
||||||
|
System.setProperty("javax.net.ssl.keyStore", keystorePath);
|
||||||
|
System.setProperty("javax.net.ssl.trustStore", keystorePath);
|
||||||
|
if (keystorePassword != null) {
|
||||||
|
System.setProperty("javax.net.ssl.keyStorePassword", keystorePassword);
|
||||||
|
System.setProperty("javax.net.ssl.trustStorePassword", keystorePassword);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Set connection name list for multi-instance QMGR
|
connectionFactory = new MQConnectionFactory();
|
||||||
cf.setChannel(channel);
|
connectionFactory.setConnectionNameList(connectionNameList);
|
||||||
cf.setQueueManager(queueManager);
|
connectionFactory.setQueueManager(queueManager);
|
||||||
cf.setConnectionNameList(connectionNameList);
|
connectionFactory.setChannel(channel);
|
||||||
cf.setTransportType(WMQConstants.WMQ_CM_CLIENT);
|
connectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
|
||||||
|
if (user != null && !user.isBlank()) {
|
||||||
|
connectionFactory.setStringProperty(WMQConstants.USERID, user);
|
||||||
|
}
|
||||||
|
if (password != null && !password.isBlank()) {
|
||||||
|
connectionFactory.setStringProperty(WMQConstants.PASSWORD, password);
|
||||||
|
}
|
||||||
|
|
||||||
// Set authentication
|
if (sslCipherSuite != null && !sslCipherSuite.isBlank()) {
|
||||||
cf.setStringProperty(WMQConstants.USERID, user);
|
connectionFactory.setSSLCipherSuite(sslCipherSuite);
|
||||||
cf.setStringProperty(WMQConstants.PASSWORD, password);
|
}
|
||||||
|
|
||||||
// SSL configuration if keystore is provided
|
// Initialize JMS context
|
||||||
if (StringUtils.isNotBlank(keystorePath)) {
|
connect();
|
||||||
System.setProperty("javax.net.ssl.keyStore", keystorePath);
|
|
||||||
System.setProperty("javax.net.ssl.trustStore", keystorePath);
|
|
||||||
if (StringUtils.isNotBlank(keystorePassword)) {
|
|
||||||
System.setProperty("javax.net.ssl.keyStorePassword", keystorePassword);
|
|
||||||
System.setProperty("javax.net.ssl.trustStorePassword", keystorePassword);
|
|
||||||
}
|
|
||||||
if (StringUtils.isNotBlank(sslCipherSuite)) {
|
|
||||||
cf.setSSLCipherSuite(sslCipherSuite);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
this.connectionFactory = cf;
|
} catch (Exception e) {
|
||||||
|
throw new MessagingConnectionException("Failed to create IBM MQ connection to " + queueManager, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Initialize JMS context
|
/**
|
||||||
connect();
|
* Connect to IBM MQ.
|
||||||
|
*/
|
||||||
|
private void connect() {
|
||||||
|
try {
|
||||||
|
this.jmsContext = connectionFactory.createContext(user, password, JMSContext.AUTO_ACKNOWLEDGE);
|
||||||
|
this.jmsContext.start();
|
||||||
|
LOG.info("Connected to IBM MQ: {}", queueManager);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new MessagingConnectionException(
|
||||||
|
"Failed to connect to IBM MQ: " + queueManager + " - " + e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
} catch (Exception e) {
|
/**
|
||||||
throw new MessagingConnectionException(
|
* Send a JSON or XML message as TextMessage.
|
||||||
"Failed to create IBM MQ connection to " + queueManager, e);
|
*/
|
||||||
}
|
private void sendTextMessage(String queueName, String payload, Map<String, String> properties) {
|
||||||
}
|
javax.jms.Queue queue = getQueue(queueName);
|
||||||
|
|
||||||
/**
|
TextMessage message = jmsContext.createTextMessage(payload);
|
||||||
* Connect to IBM MQ.
|
|
||||||
*/
|
|
||||||
private void connect() {
|
|
||||||
try {
|
|
||||||
this.jmsContext = connectionFactory.createContext();
|
|
||||||
this.jmsContext.start();
|
|
||||||
LOG.info("Connected to IBM MQ: {}", queueManager);
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new MessagingConnectionException(
|
|
||||||
"Failed to connect to IBM MQ: " + queueManager + " - " + e.getMessage(), e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
// Set JMS properties
|
||||||
* Send a JSON or XML message as TextMessage.
|
if (properties != null) {
|
||||||
*/
|
for (Map.Entry<String, String> entry : properties.entrySet()) {
|
||||||
private void sendTextMessage(String queueName, String payload, Map<String, String> properties) {
|
try {
|
||||||
javax.jms.Queue queue = getQueue(queueName);
|
message.setStringProperty(entry.getKey(), entry.getValue());
|
||||||
|
} catch (JMSException e) {
|
||||||
|
LOG.warn("Failed to set property: {}", entry.getKey(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
TextMessage message = jmsContext.createTextMessage(payload);
|
try {
|
||||||
|
jmsContext.createProducer().send(queue, message);
|
||||||
|
} catch (RuntimeException e) {
|
||||||
|
throw new MessagingDestinationException("Failed to send message to queue: " + queueName, e);
|
||||||
|
}
|
||||||
|
LOG.debug("Sent JSON/XML message to queue: {}", queueName);
|
||||||
|
}
|
||||||
|
|
||||||
// Set JMS properties
|
/**
|
||||||
if (properties != null) {
|
* Send a message as BytesMessage with specific encoding and CCSID.
|
||||||
for (Map.Entry<String, String> entry : properties.entrySet()) {
|
*/
|
||||||
try {
|
private void sendBytesMessage(String queueName, String payload, Charset charset, int ccsid,
|
||||||
message.setStringProperty(entry.getKey(), entry.getValue());
|
Map<String, String> properties) {
|
||||||
} catch (JMSException e) {
|
javax.jms.Queue queue = getQueue(queueName);
|
||||||
LOG.warn("Failed to set property: {}", entry.getKey(), e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
BytesMessage message = jmsContext.createBytesMessage();
|
||||||
jmsContext.createProducer().send(queue, message);
|
|
||||||
} catch (RuntimeException e) {
|
|
||||||
throw new MessagingDestinationException("Failed to send message to queue: " + queueName, e);
|
|
||||||
}
|
|
||||||
LOG.debug("Sent JSON/XML message to queue: {}", queueName);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
// Convert payload to bytes using specified charset
|
||||||
* Send a message as BytesMessage with specific encoding and CCSID.
|
byte[] bytes = payload.getBytes(charset);
|
||||||
*/
|
try {
|
||||||
private void sendBytesMessage(String queueName, String payload, Charset charset,
|
message.writeBytes(bytes);
|
||||||
int ccsid, Map<String, String> properties) {
|
message.setIntProperty("CCSID", ccsid);
|
||||||
javax.jms.Queue queue = getQueue(queueName);
|
} catch (JMSException e) {
|
||||||
|
throw new MessagingDestinationException("Failed to create bytes message", e);
|
||||||
|
}
|
||||||
|
|
||||||
BytesMessage message = jmsContext.createBytesMessage();
|
// Set JMS properties
|
||||||
|
if (properties != null) {
|
||||||
|
for (Map.Entry<String, String> entry : properties.entrySet()) {
|
||||||
|
try {
|
||||||
|
message.setStringProperty(entry.getKey(), entry.getValue());
|
||||||
|
} catch (JMSException e) {
|
||||||
|
LOG.warn("Failed to set property: {}", entry.getKey(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Convert payload to bytes using specified charset
|
try {
|
||||||
byte[] bytes = payload.getBytes(charset);
|
jmsContext.createProducer().send(queue, message);
|
||||||
try {
|
} catch (RuntimeException e) {
|
||||||
message.writeBytes(bytes);
|
throw new MessagingDestinationException("Failed to send message to queue: " + queueName, e);
|
||||||
message.setIntProperty("CCSID", ccsid);
|
}
|
||||||
} catch (JMSException e) {
|
LOG.debug("Sent {} message to queue: {}", charset, queueName);
|
||||||
throw new MessagingDestinationException("Failed to create bytes message", e);
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Set JMS properties
|
/**
|
||||||
if (properties != null) {
|
* Send a message to a queue with specified format.
|
||||||
for (Map.Entry<String, String> entry : properties.entrySet()) {
|
*
|
||||||
try {
|
* @param queueName Queue name
|
||||||
message.setStringProperty(entry.getKey(), entry.getValue());
|
* @param payload Message payload
|
||||||
} catch (JMSException e) {
|
* @param format Message format (JSON, XML, EBCDIC_870, UTF8_1208)
|
||||||
LOG.warn("Failed to set property: {}", entry.getKey(), e);
|
* @param properties JMS properties to set
|
||||||
}
|
*/
|
||||||
}
|
public void send(String queueName, String payload, MqMessageFormat format, Map<String, String> properties) {
|
||||||
}
|
switch (format) {
|
||||||
|
case JSON, XML -> sendTextMessage(queueName, payload, properties);
|
||||||
|
case EBCDIC_870 -> sendBytesMessage(queueName, payload, EBCDIC_870, 870, properties);
|
||||||
|
case UTF8_1208 -> sendBytesMessage(queueName, payload, UTF_8, 1208, properties);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
/**
|
||||||
jmsContext.createProducer().send(queue, message);
|
* Receive a message from a queue with timeout.
|
||||||
} catch (RuntimeException e) {
|
*
|
||||||
throw new MessagingDestinationException("Failed to send message to queue: " + queueName, e);
|
* @param queueName Queue name
|
||||||
}
|
* @param messageSelector JMS message selector (optional)
|
||||||
LOG.debug("Sent {} message to queue: {}", charset, queueName);
|
* @param format Expected message format
|
||||||
}
|
* @param timeout Timeout duration
|
||||||
|
* @return Received message
|
||||||
|
*/
|
||||||
|
public ReceivedMessage receive(String queueName, String messageSelector, MqMessageFormat format,
|
||||||
|
java.time.Duration timeout) {
|
||||||
|
long timeoutMs = timeout.toMillis();
|
||||||
|
|
||||||
/**
|
javax.jms.Queue queue = getQueue(queueName);
|
||||||
* Send a message to a queue with specified format.
|
MessageConsumer consumer = (MessageConsumer) (messageSelector == null || messageSelector.isBlank()
|
||||||
*
|
? jmsContext.createConsumer(queue)
|
||||||
* @param queueName Queue name
|
: jmsContext.createConsumer(queue, messageSelector));
|
||||||
* @param payload Message payload
|
|
||||||
* @param format Message format (JSON, XML, EBCDIC_870, UTF8_1208)
|
|
||||||
* @param properties JMS properties to set
|
|
||||||
*/
|
|
||||||
public void send(String queueName, String payload, MqMessageFormat format,
|
|
||||||
Map<String, String> properties) {
|
|
||||||
switch (format) {
|
|
||||||
case JSON, XML -> sendTextMessage(queueName, payload, properties);
|
|
||||||
case EBCDIC_870 -> sendBytesMessage(queueName, payload, EBCDIC_870, 870, properties);
|
|
||||||
case UTF8_1208 -> sendBytesMessage(queueName, payload, UTF_8, 1208, properties);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
AtomicBoolean messageFound = new AtomicBoolean(false);
|
||||||
* Receive a message from a queue with timeout.
|
ReceivedMessage received = null;
|
||||||
*
|
|
||||||
* @param queueName Queue name
|
|
||||||
* @param messageSelector JMS message selector (optional)
|
|
||||||
* @param format Expected message format
|
|
||||||
* @param timeout Timeout duration
|
|
||||||
* @return Received message
|
|
||||||
*/
|
|
||||||
public ReceivedMessage receive(String queueName, String messageSelector,
|
|
||||||
MqMessageFormat format, java.time.Duration timeout) {
|
|
||||||
long timeoutMs = timeout.toMillis();
|
|
||||||
|
|
||||||
javax.jms.Queue queue = getQueue(queueName);
|
long pollInterval = DEFAULT_POLL_INTERVAL_MS;
|
||||||
MessageConsumer consumer = (MessageConsumer) (messageSelector == null || messageSelector.isBlank()
|
long remainingTime = timeoutMs;
|
||||||
? jmsContext.createConsumer(queue)
|
|
||||||
: jmsContext.createConsumer(queue, messageSelector));
|
|
||||||
|
|
||||||
AtomicBoolean messageFound = new AtomicBoolean(false);
|
try {
|
||||||
ReceivedMessage received = null;
|
while (remainingTime > 0 && !messageFound.get()) {
|
||||||
|
Message message = consumer.receive(remainingTime);
|
||||||
|
|
||||||
long pollInterval = DEFAULT_POLL_INTERVAL_MS;
|
if (message != null) {
|
||||||
long remainingTime = timeoutMs;
|
received = decodeMessage(message, queueName, format);
|
||||||
|
messageFound.set(true);
|
||||||
|
} else {
|
||||||
|
// Exponential backoff
|
||||||
|
pollInterval = Math.min(pollInterval * 2, DEFAULT_MAX_POLL_INTERVAL_MS);
|
||||||
|
remainingTime -= pollInterval;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
if (received == null) {
|
||||||
while (remainingTime > 0 && !messageFound.get()) {
|
throw new MessagingTimeoutException("No message matching filter found on queue '" + queueName
|
||||||
Message message = consumer.receive(remainingTime);
|
+ "' within " + timeout.toMillis() + "ms");
|
||||||
|
}
|
||||||
|
|
||||||
if (message != null) {
|
return received;
|
||||||
received = decodeMessage(message, queueName, format);
|
|
||||||
messageFound.set(true);
|
|
||||||
} else {
|
|
||||||
// Exponential backoff
|
|
||||||
pollInterval = Math.min(pollInterval * 2, DEFAULT_MAX_POLL_INTERVAL_MS);
|
|
||||||
remainingTime -= pollInterval;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (received == null) {
|
} catch (MessagingTimeoutException e) {
|
||||||
throw new MessagingTimeoutException(
|
throw e;
|
||||||
"No message matching filter found on queue '" + queueName +
|
} catch (Exception e) {
|
||||||
"' within " + timeout.toMillis() + "ms");
|
throw new MessagingDestinationException("Failed to receive message from queue: " + queueName, e);
|
||||||
}
|
} finally {
|
||||||
|
try {
|
||||||
|
consumer.close();
|
||||||
|
} catch (JMSException e) {
|
||||||
|
LOG.warn("Failed to close consumer", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return received;
|
/**
|
||||||
|
* Browse a queue (non-destructive read).
|
||||||
|
*
|
||||||
|
* @param queueName Queue name
|
||||||
|
* @param messageSelector JMS message selector (optional)
|
||||||
|
* @param format Expected message format
|
||||||
|
* @param maxMessages Maximum number of messages to browse
|
||||||
|
* @return List of received messages
|
||||||
|
*/
|
||||||
|
public List<ReceivedMessage> browse(String queueName, String messageSelector, MqMessageFormat format,
|
||||||
|
int maxMessages) {
|
||||||
|
List<ReceivedMessage> messages = new ArrayList<>();
|
||||||
|
|
||||||
} catch (MessagingTimeoutException e) {
|
javax.jms.Queue queue = getQueue(queueName);
|
||||||
throw e;
|
MessageConsumer consumer = (MessageConsumer) (messageSelector == null || messageSelector.isBlank()
|
||||||
} catch (Exception e) {
|
? jmsContext.createConsumer(queue)
|
||||||
throw new MessagingDestinationException("Failed to receive message from queue: " + queueName, e);
|
: jmsContext.createConsumer(queue, messageSelector));
|
||||||
} finally {
|
|
||||||
try {
|
|
||||||
consumer.close();
|
|
||||||
} catch (JMSException e) {
|
|
||||||
LOG.warn("Failed to close consumer", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
int count = 0;
|
||||||
* Browse a queue (non-destructive read).
|
try {
|
||||||
*
|
while (count < maxMessages) {
|
||||||
* @param queueName Queue name
|
Message message = consumer.receiveNoWait();
|
||||||
* @param messageSelector JMS message selector (optional)
|
|
||||||
* @param format Expected message format
|
|
||||||
* @param maxMessages Maximum number of messages to browse
|
|
||||||
* @return List of received messages
|
|
||||||
*/
|
|
||||||
public List<ReceivedMessage> browse(String queueName, String messageSelector,
|
|
||||||
MqMessageFormat format, int maxMessages) {
|
|
||||||
List<ReceivedMessage> messages = new ArrayList<>();
|
|
||||||
|
|
||||||
javax.jms.Queue queue = getQueue(queueName);
|
if (message == null) {
|
||||||
MessageConsumer consumer = (MessageConsumer) (messageSelector == null || messageSelector.isBlank()
|
break;
|
||||||
? jmsContext.createConsumer(queue)
|
}
|
||||||
: jmsContext.createConsumer(queue, messageSelector));
|
|
||||||
|
|
||||||
int count = 0;
|
ReceivedMessage received = decodeMessage(message, queueName, format);
|
||||||
try {
|
messages.add(received);
|
||||||
while (count < maxMessages) {
|
count++;
|
||||||
Message message = consumer.receiveNoWait();
|
}
|
||||||
|
|
||||||
if (message == null) {
|
return messages;
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
ReceivedMessage received = decodeMessage(message, queueName, format);
|
} catch (Exception e) {
|
||||||
messages.add(received);
|
throw new MessagingDestinationException("Failed to browse queue: " + queueName, e);
|
||||||
count++;
|
} finally {
|
||||||
}
|
try {
|
||||||
|
consumer.close();
|
||||||
|
} catch (JMSException e) {
|
||||||
|
LOG.warn("Failed to close consumer", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return messages;
|
/**
|
||||||
|
* Decode a JMS message to ReceivedMessage.
|
||||||
|
*/
|
||||||
|
private ReceivedMessage decodeMessage(Message jmsMessage, String queueName, MqMessageFormat format) {
|
||||||
|
long timestamp;
|
||||||
|
try {
|
||||||
|
timestamp = jmsMessage.getJMSTimestamp();
|
||||||
|
} catch (JMSException e) {
|
||||||
|
timestamp = System.currentTimeMillis();
|
||||||
|
}
|
||||||
|
if (timestamp == 0) {
|
||||||
|
timestamp = System.currentTimeMillis();
|
||||||
|
}
|
||||||
|
|
||||||
} catch (Exception e) {
|
String body;
|
||||||
throw new MessagingDestinationException("Failed to browse queue: " + queueName, e);
|
MessageContentType contentType;
|
||||||
} finally {
|
Map<String, String> headers = new HashMap<>();
|
||||||
try {
|
|
||||||
consumer.close();
|
|
||||||
} catch (JMSException e) {
|
|
||||||
LOG.warn("Failed to close consumer", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
// Extract JMS properties as headers
|
||||||
* Decode a JMS message to ReceivedMessage.
|
extractJmsProperties(jmsMessage, headers);
|
||||||
*/
|
|
||||||
private ReceivedMessage decodeMessage(Message jmsMessage, String queueName, MqMessageFormat format) {
|
|
||||||
long timestamp;
|
|
||||||
try {
|
|
||||||
timestamp = jmsMessage.getJMSTimestamp();
|
|
||||||
} catch (JMSException e) {
|
|
||||||
timestamp = System.currentTimeMillis();
|
|
||||||
}
|
|
||||||
if (timestamp == 0) {
|
|
||||||
timestamp = System.currentTimeMillis();
|
|
||||||
}
|
|
||||||
|
|
||||||
String body;
|
if (jmsMessage instanceof TextMessage textMessage) {
|
||||||
MessageContentType contentType;
|
try {
|
||||||
Map<String, String> headers = new HashMap<>();
|
body = textMessage.getText();
|
||||||
|
} catch (JMSException e) {
|
||||||
|
throw new RuntimeException("Failed to read text message body", e);
|
||||||
|
}
|
||||||
|
contentType = switch (format) {
|
||||||
|
case XML -> MessageContentType.XML;
|
||||||
|
default -> MessageContentType.JSON;
|
||||||
|
};
|
||||||
|
} else if (jmsMessage instanceof BytesMessage bytesMessage) {
|
||||||
|
int ccsid;
|
||||||
|
try {
|
||||||
|
ccsid = bytesMessage.getIntProperty("CCSID");
|
||||||
|
} catch (JMSException e) {
|
||||||
|
ccsid = 1208; // default UTF-8
|
||||||
|
}
|
||||||
|
body = decodeBytesMessage(bytesMessage, ccsid);
|
||||||
|
contentType = MessageContentType.RAW_TEXT;
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
throw new IllegalArgumentException("Unsupported message type: " + jmsMessage.getJMSType());
|
||||||
|
} catch (JMSException e) {
|
||||||
|
throw new IllegalArgumentException("Unsupported message type", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Extract JMS properties as headers
|
return new ReceivedMessage(body, contentType, headers, timestamp, queueName, null);
|
||||||
extractJmsProperties(jmsMessage, headers);
|
}
|
||||||
|
|
||||||
if (jmsMessage instanceof TextMessage textMessage) {
|
/**
|
||||||
try {
|
* Decode BytesMessage body based on CCSID.
|
||||||
body = textMessage.getText();
|
*/
|
||||||
} catch (JMSException e) {
|
private String decodeBytesMessage(BytesMessage bytesMessage, int ccsid) {
|
||||||
throw new RuntimeException("Failed to read text message body", e);
|
try {
|
||||||
}
|
long bodyLength;
|
||||||
contentType = switch (format) {
|
try {
|
||||||
case XML -> MessageContentType.XML;
|
bodyLength = bytesMessage.getBodyLength();
|
||||||
default -> MessageContentType.JSON;
|
} catch (JMSException e) {
|
||||||
};
|
throw new RuntimeException("Failed to get message body length", e);
|
||||||
} else if (jmsMessage instanceof BytesMessage bytesMessage) {
|
}
|
||||||
int ccsid;
|
byte[] data = new byte[(int) bodyLength];
|
||||||
try {
|
bytesMessage.readBytes(data);
|
||||||
ccsid = bytesMessage.getIntProperty("CCSID");
|
|
||||||
} catch (JMSException e) {
|
|
||||||
ccsid = 1208; // default UTF-8
|
|
||||||
}
|
|
||||||
body = decodeBytesMessage(bytesMessage, ccsid);
|
|
||||||
contentType = MessageContentType.RAW_TEXT;
|
|
||||||
} else {
|
|
||||||
try {
|
|
||||||
throw new IllegalArgumentException("Unsupported message type: " + jmsMessage.getJMSType());
|
|
||||||
} catch (JMSException e) {
|
|
||||||
throw new IllegalArgumentException("Unsupported message type", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return new ReceivedMessage(body, contentType, headers, timestamp, queueName, null);
|
Charset charset = switch (ccsid) {
|
||||||
}
|
case 870 -> EBCDIC_870;
|
||||||
|
case 1208 -> UTF_8;
|
||||||
|
default -> UTF_8;
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
return new String(data, charset);
|
||||||
* Decode BytesMessage body based on CCSID.
|
} catch (JMSException e) {
|
||||||
*/
|
throw new RuntimeException("Failed to read BytesMessage body", e);
|
||||||
private String decodeBytesMessage(BytesMessage bytesMessage, int ccsid) {
|
}
|
||||||
try {
|
}
|
||||||
long bodyLength;
|
|
||||||
try {
|
|
||||||
bodyLength = bytesMessage.getBodyLength();
|
|
||||||
} catch (JMSException e) {
|
|
||||||
throw new RuntimeException("Failed to get message body length", e);
|
|
||||||
}
|
|
||||||
byte[] data = new byte[(int) bodyLength];
|
|
||||||
bytesMessage.readBytes(data);
|
|
||||||
|
|
||||||
Charset charset = switch (ccsid) {
|
/**
|
||||||
case 870 -> EBCDIC_870;
|
* Extract JMS properties as headers.
|
||||||
case 1208 -> UTF_8;
|
*/
|
||||||
default -> UTF_8;
|
@SuppressWarnings("unchecked")
|
||||||
};
|
private void extractJmsProperties(Message message, Map<String, String> headers) {
|
||||||
|
try {
|
||||||
|
// Common JMS headers
|
||||||
|
headers.put("JMSMessageID", message.getJMSMessageID());
|
||||||
|
try {
|
||||||
|
headers.put("JMSType", message.getJMSType() != null ? message.getJMSType() : "");
|
||||||
|
} catch (JMSException e) {
|
||||||
|
headers.put("JMSType", "");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
headers.put("JMSDestination",
|
||||||
|
message.getJMSDestination() != null ? message.getJMSDestination().toString() : "");
|
||||||
|
} catch (JMSException e) {
|
||||||
|
headers.put("JMSDestination", "");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
headers.put("JMSDeliveryMode", String.valueOf(message.getJMSDeliveryMode()));
|
||||||
|
} catch (JMSException e) {
|
||||||
|
headers.put("JMSDeliveryMode", "");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
headers.put("JMSPriority", String.valueOf(message.getJMSPriority()));
|
||||||
|
} catch (JMSException e) {
|
||||||
|
headers.put("JMSPriority", "");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
headers.put("JMSTimestamp", String.valueOf(message.getJMSTimestamp()));
|
||||||
|
} catch (JMSException e) {
|
||||||
|
headers.put("JMSTimestamp", "");
|
||||||
|
}
|
||||||
|
|
||||||
return new String(data, charset);
|
// Extract custom properties
|
||||||
} catch (JMSException e) {
|
Enumeration<String> propertyNames = (Enumeration<String>) message.getPropertyNames();
|
||||||
throw new RuntimeException("Failed to read BytesMessage body", e);
|
while (propertyNames.hasMoreElements()) {
|
||||||
}
|
String propName = propertyNames.nextElement();
|
||||||
}
|
Object propValue = message.getObjectProperty(propName);
|
||||||
|
if (propValue != null) {
|
||||||
|
headers.put(propName, propValue.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (JMSException e) {
|
||||||
|
LOG.warn("Failed to extract JMS properties", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Extract JMS properties as headers.
|
* Get Queue object from queue name.
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
private javax.jms.Queue getQueue(String queueName) {
|
||||||
private void extractJmsProperties(Message message, Map<String, String> headers) {
|
return jmsContext.createQueue(queueName);
|
||||||
try {
|
}
|
||||||
// Common JMS headers
|
|
||||||
headers.put("JMSMessageID", message.getJMSMessageID());
|
|
||||||
try {
|
|
||||||
headers.put("JMSType", message.getJMSType() != null ? message.getJMSType() : "");
|
|
||||||
} catch (JMSException e) {
|
|
||||||
headers.put("JMSType", "");
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
headers.put("JMSDestination", message.getJMSDestination() != null ?
|
|
||||||
message.getJMSDestination().toString() : "");
|
|
||||||
} catch (JMSException e) {
|
|
||||||
headers.put("JMSDestination", "");
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
headers.put("JMSDeliveryMode", String.valueOf(message.getJMSDeliveryMode()));
|
|
||||||
} catch (JMSException e) {
|
|
||||||
headers.put("JMSDeliveryMode", "");
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
headers.put("JMSPriority", String.valueOf(message.getJMSPriority()));
|
|
||||||
} catch (JMSException e) {
|
|
||||||
headers.put("JMSPriority", "");
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
headers.put("JMSTimestamp", String.valueOf(message.getJMSTimestamp()));
|
|
||||||
} catch (JMSException e) {
|
|
||||||
headers.put("JMSTimestamp", "");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Extract custom properties
|
@Override
|
||||||
Enumeration<String> propertyNames = (Enumeration<String>) message.getPropertyNames();
|
public void close() {
|
||||||
while (propertyNames.hasMoreElements()) {
|
if (jmsContext != null) {
|
||||||
String propName = propertyNames.nextElement();
|
try {
|
||||||
Object propValue = message.getObjectProperty(propName);
|
jmsContext.close();
|
||||||
if (propValue != null) {
|
LOG.info("Closed connection to IBM MQ: {}", queueManager);
|
||||||
headers.put(propName, propValue.toString());
|
} catch (Exception e) {
|
||||||
}
|
LOG.error("Failed to close IBM MQ connection", e);
|
||||||
}
|
}
|
||||||
} catch (JMSException e) {
|
}
|
||||||
LOG.warn("Failed to extract JMS properties", e);
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get Queue object from queue name.
|
|
||||||
*/
|
|
||||||
private javax.jms.Queue getQueue(String queueName) {
|
|
||||||
return jmsContext.createQueue(queueName);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() {
|
|
||||||
if (jmsContext != null) {
|
|
||||||
try {
|
|
||||||
jmsContext.close();
|
|
||||||
LOG.info("Closed connection to IBM MQ: {}", queueManager);
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.error("Failed to close IBM MQ connection", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -51,8 +51,8 @@ public class ImqFirstVisionEndpoint implements Endpoint {
|
|||||||
//Credentials credentials = loadCredentialsFromVault(vaultPath);
|
//Credentials credentials = loadCredentialsFromVault(vaultPath);
|
||||||
|
|
||||||
// SSL configuration (optional)
|
// SSL configuration (optional)
|
||||||
String keystorePath = null;
|
String keystorePath = "/home/kamma/aa/mq-docker/truststore.jks";
|
||||||
String keystorePassword = null;
|
String keystorePassword = "changeit";
|
||||||
|
|
||||||
try {
|
try {
|
||||||
this.connector = new IbmMqConnector(
|
this.connector = new IbmMqConnector(
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user