diff --git a/messaging-connection-demo/pom.xml b/messaging-connection-demo/pom.xml
deleted file mode 100644
index 2a73534..0000000
--- a/messaging-connection-demo/pom.xml
+++ /dev/null
@@ -1,55 +0,0 @@
-
-
- 4.0.0
-
- cz.moneta.demo
- messaging-connection-demo
- 1.0.0-SNAPSHOT
-
-
- 17
- 17
- UTF-8
-
- 3.7.1
- 7.6.1
- 9.4.2.0
- 3.1.0
-
-
-
-
- org.apache.kafka
- kafka-clients
- ${kafka.version}
-
-
-
- io.confluent
- kafka-avro-serializer
- ${confluent.version}
-
-
-
- com.ibm.mq
- com.ibm.mq.allclient
- ${ibm.mq.version}
-
-
-
- jakarta.jms
- jakarta.jms-api
- ${jakarta.jms.version}
-
-
-
-
-
- confluent
- Confluent Maven Repository
- https://packages.confluent.io/maven/
-
-
-
diff --git a/messaging-connection-demo/src/main/java/cz/moneta/demo/MessagingConnectionApp.java b/messaging-connection-demo/src/main/java/cz/moneta/demo/MessagingConnectionApp.java
deleted file mode 100644
index 4c8ba4f..0000000
--- a/messaging-connection-demo/src/main/java/cz/moneta/demo/MessagingConnectionApp.java
+++ /dev/null
@@ -1,69 +0,0 @@
-package cz.moneta.demo;
-
-import com.ibm.msg.client.jms.JmsConnectionFactory;
-import com.ibm.msg.client.jms.JmsFactoryFactory;
-import com.ibm.msg.client.wmq.WMQConstants;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.StringSerializer;
-
-import javax.jms.JMSContext;
-import java.util.Properties;
-
-public class MessagingConnectionApp {
-
- public static KafkaProducer createKafkaConnection(String bootstrapServers,
- String apiKey,
- String apiSecret) {
- Properties kafkaProps = new Properties();
- kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- kafkaProps.put("security.protocol", "SASL_SSL");
- kafkaProps.put("sasl.mechanism", "PLAIN");
- kafkaProps.put("sasl.jaas.config",
- String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
- apiKey, apiSecret));
-
- return new KafkaProducer<>(kafkaProps);
- }
-
- public static JMSContext createMqConnection(String host,
- int port,
- String channel,
- String queueManager,
- String user,
- String password) throws Exception {
- JmsFactoryFactory factoryFactory = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
- JmsConnectionFactory connectionFactory = factoryFactory.createConnectionFactory();
-
- connectionFactory.setStringProperty(WMQConstants.WMQ_HOST_NAME, host);
- connectionFactory.setIntProperty(WMQConstants.WMQ_PORT, port);
- connectionFactory.setStringProperty(WMQConstants.WMQ_CHANNEL, channel);
- connectionFactory.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, queueManager);
- connectionFactory.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
- connectionFactory.setStringProperty(WMQConstants.USERID, user);
- connectionFactory.setStringProperty(WMQConstants.PASSWORD, password);
-
- return connectionFactory.createContext(user, password, JMSContext.AUTO_ACKNOWLEDGE);
- }
-
- public static void main(String[] args) throws Exception {
- String kafkaBootstrap = System.getProperty("kafka.bootstrap", "localhost:9092");
- String kafkaApiKey = System.getProperty("kafka.apiKey", "api-key");
- String kafkaApiSecret = System.getProperty("kafka.apiSecret", "api-secret");
-
- String mqHost = System.getProperty("mq.host", "localhost");
- int mqPort = Integer.parseInt(System.getProperty("mq.port", "1414"));
- String mqChannel = System.getProperty("mq.channel", "DEV.APP.SVRCONN");
- String mqQueueManager = System.getProperty("mq.queueManager", "QM1");
- String mqUser = System.getProperty("mq.user", "app");
- String mqPassword = System.getProperty("mq.password", "pass");
-
- try (KafkaProducer kafkaProducer = createKafkaConnection(kafkaBootstrap, kafkaApiKey, kafkaApiSecret);
- JMSContext mqContext = createMqConnection(mqHost, mqPort, mqChannel, mqQueueManager, mqUser, mqPassword)) {
- System.out.println("Kafka connection created: " + (kafkaProducer != null));
- System.out.println("IBM MQ connection created: " + (mqContext != null));
- }
- }
-}
diff --git a/test-harness/pom.xml b/test-harness/pom.xml
index a8d4c51..618e703 100644
--- a/test-harness/pom.xml
+++ b/test-harness/pom.xml
@@ -29,10 +29,6 @@
1.9.3
1.6
4.0.3
- 3.7.1
- 7.6.1
- 9.4.5.0
- 3.1.0
@@ -270,48 +266,6 @@
${appium-java-client.version}
-
- org.apache.kafka
- kafka-clients
- ${kafka.version}
-
-
-
- io.confluent
- kafka-avro-serializer
- ${confluent.version}
-
-
-
- io.confluent
- kafka-schema-registry-client
- ${confluent.version}
-
-
-
- org.apache.avro
- avro
- 1.11.3
-
-
-
- com.ibm.mq
- com.ibm.mq.allclient
- ${ibm.mq.version}
-
-
-
- javax.jms
- javax.jms-api
- 2.0.1
-
-
-
- jakarta.jms
- jakarta.jms-api
- ${jakarta.jms.version}
-
-
org.apache.cxf
@@ -404,23 +358,6 @@
-
- org.apache.maven.plugins
- maven-dependency-plugin
- 3.9.0
-
-
- install
-
- copy-dependencies
-
-
- ${project.build.directory}/lib
- runtime
-
-
-
-
org.apache.maven.plugins
maven-compiler-plugin
@@ -454,23 +391,16 @@
false
true
- confluent
- Confluent Hub
- https://packages.confluent.io/maven/
+ central
+ libs-release
+ https://artifactory-aws.ux.mbid.cz/artifactory/libs-release
- false
- true
- mcentral
- Maven Central
- https://repo1.maven.org/maven2/
-
-
- false
- true
- mvnrepo
- MVN Repository
- https://mvnrepository.com/artifact/
+ true
+ false
+ snapshots
+ libs-snapshot
+ https://artifactory-aws.ux.mbid.cz/artifactory/libs-snapshot
diff --git a/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/IbmMqConnector.java b/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/IbmMqConnector.java
deleted file mode 100644
index 4045b70..0000000
--- a/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/IbmMqConnector.java
+++ /dev/null
@@ -1,264 +0,0 @@
-package cz.moneta.test.harness.connectors.messaging;
-
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Predicate;
-
-import javax.jms.BytesMessage;
-import javax.jms.JMSConsumer;
-import javax.jms.JMSContext;
-import javax.jms.JMSException;
-import javax.jms.JMSProducer;
-import javax.jms.Message;
-import javax.jms.Queue;
-import javax.jms.QueueBrowser;
-import javax.jms.TextMessage;
-
-import com.ibm.mq.jms.MQConnectionFactory;
-import com.ibm.msg.client.wmq.WMQConstants;
-
-import cz.moneta.test.harness.connectors.Connector;
-import cz.moneta.test.harness.exception.MessagingTimeoutException;
-import cz.moneta.test.harness.messaging.model.MessageContentType;
-import cz.moneta.test.harness.messaging.model.MqMessageFormat;
-import cz.moneta.test.harness.messaging.model.ReceivedMessage;
-
-public class IbmMqConnector implements Connector {
-
- private static final Charset EBCDIC_870 = Charset.forName("IBM870");
- private static final Charset UTF_8 = StandardCharsets.UTF_8;
- private final MQConnectionFactory connectionFactory;
- private final String user;
- private final String password;
- private final Object contextLock = new Object();
- private volatile JMSContext jmsContext;
-
- public IbmMqConnector(String host,
- int port,
- String channel,
- String queueManager,
- String user,
- String password,
- String keystorePath,
- String keystorePassword,
- String cipherSuite) {
- this.user = user;
- this.password = password;
- try {
- if (keystorePath != null && !keystorePath.isBlank()) {
- System.setProperty("javax.net.ssl.keyStore", keystorePath);
- System.setProperty("javax.net.ssl.trustStore", keystorePath);
- if (keystorePassword != null) {
- System.setProperty("javax.net.ssl.keyStorePassword", keystorePassword);
- System.setProperty("javax.net.ssl.trustStorePassword", keystorePassword);
- }
- }
-
- connectionFactory = new MQConnectionFactory();
- connectionFactory.setHostName(host);
- connectionFactory.setPort(port);
- connectionFactory.setQueueManager(queueManager);
- connectionFactory.setChannel(channel);
- connectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
- if (user != null && !user.isBlank()) {
- connectionFactory.setStringProperty(WMQConstants.USERID, user);
- }
- if (password != null && !password.isBlank()) {
- connectionFactory.setStringProperty(WMQConstants.PASSWORD, password);
- }
-
- if (cipherSuite != null && !cipherSuite.isBlank()) {
- connectionFactory.setSSLCipherSuite(cipherSuite);
- }
- } catch (Exception e) {
- throw new IllegalStateException("Failed to initialize IBM MQ connection factory", e);
- }
- }
-
- public void send(String queueName,
- String payload,
- MqMessageFormat format,
- Map properties) {
- switch (Objects.requireNonNull(format, "format")) {
- case JSON, XML -> sendTextMessage(queueName, payload, properties);
- case EBCDIC_870 -> sendBytesMessage(queueName, payload, EBCDIC_870, 870, properties);
- case UTF8_1208 -> sendBytesMessage(queueName, payload, UTF_8, 1208, properties);
- }
- }
-
- public ReceivedMessage receive(String queueName,
- String messageSelector,
- MqMessageFormat expectedFormat,
- Duration timeout) {
- JMSContext context = getContext();
- Queue queue = context.createQueue("queue:///" + queueName);
- try (JMSConsumer consumer = messageSelector == null || messageSelector.isBlank()
- ? context.createConsumer(queue)
- : context.createConsumer(queue, messageSelector)) {
- Message message = consumer.receive(Optional.ofNullable(timeout).orElse(Duration.ofSeconds(30)).toMillis());
- if (message == null) {
- throw new MessagingTimeoutException("Timeout waiting for IBM MQ message from queue: " + queueName);
- }
- return toReceivedMessage(message, queueName, expectedFormat);
- }
- }
-
- public List browse(String queueName,
- Predicate filter,
- MqMessageFormat expectedFormat,
- Duration timeout) {
- long timeoutMillis = Optional.ofNullable(timeout).orElse(Duration.ofSeconds(30)).toMillis();
- long deadline = System.currentTimeMillis() + timeoutMillis;
- long backoff = 100;
- JMSContext context = getContext();
- Queue queue = context.createQueue("queue:///" + queueName);
-
- while (System.currentTimeMillis() < deadline) {
- List matched = new ArrayList<>();
- try (QueueBrowser browser = context.createBrowser(queue)) {
- Enumeration> messages = browser.getEnumeration();
- while (messages.hasMoreElements()) {
- Message message = (Message) messages.nextElement();
- ReceivedMessage receivedMessage = toReceivedMessage(message, queueName, expectedFormat);
- if (filter == null || filter.test(receivedMessage)) {
- matched.add(receivedMessage);
- }
- }
- } catch (JMSException e) {
- throw new IllegalStateException("Failed to browse IBM MQ queue: " + queueName, e);
- }
-
- if (!matched.isEmpty()) {
- return matched;
- }
-
- try {
- TimeUnit.MILLISECONDS.sleep(backoff);
- } catch (InterruptedException ignored) {
- Thread.currentThread().interrupt();
- break;
- }
- backoff = Math.min(backoff * 2, 1000);
- }
- throw new MessagingTimeoutException("Timeout waiting for IBM MQ message from queue: " + queueName);
- }
-
- @Override
- public void close() {
- JMSContext context = jmsContext;
- if (context != null) {
- context.close();
- }
- }
-
- private void sendTextMessage(String queueName, String payload, Map properties) {
- JMSContext context = getContext();
- JMSProducer producer = context.createProducer();
- TextMessage message = context.createTextMessage(payload);
- applyProperties(message, properties);
- producer.send(context.createQueue("queue:///" + queueName), message);
- }
-
- private void sendBytesMessage(String queueName,
- String payload,
- Charset charset,
- int ccsid,
- Map properties) {
- try {
- JMSContext context = getContext();
- JMSProducer producer = context.createProducer();
- BytesMessage message = context.createBytesMessage();
- message.writeBytes(Optional.ofNullable(payload).orElse("").getBytes(charset));
- message.setIntProperty(WMQConstants.JMS_IBM_CHARACTER_SET, ccsid);
- applyProperties(message, properties);
- producer.send(context.createQueue("queue:///" + queueName), message);
- } catch (JMSException e) {
- throw new IllegalStateException("Failed to send bytes message to IBM MQ queue: " + queueName, e);
- }
- }
-
- private void applyProperties(Message message, Map properties) {
- Optional.ofNullable(properties).orElseGet(Collections::emptyMap)
- .forEach((key, value) -> {
- try {
- message.setStringProperty(key, String.valueOf(value));
- } catch (JMSException e) {
- throw new IllegalStateException("Failed to set JMS property: " + key, e);
- }
- });
- }
-
- private ReceivedMessage toReceivedMessage(Message message, String queueName, MqMessageFormat format) {
- try {
- Map headers = new LinkedHashMap<>();
- Enumeration> names = message.getPropertyNames();
- while (names.hasMoreElements()) {
- String name = String.valueOf(names.nextElement());
- headers.put(name, String.valueOf(message.getObjectProperty(name)));
- }
-
- String body = decodeMessage(message, format);
- MessageContentType contentType = resolveContentType(message, format);
- return new ReceivedMessage(body, contentType, headers, message.getJMSTimestamp(), queueName);
- } catch (JMSException e) {
- throw new IllegalStateException("Failed to decode IBM MQ message", e);
- }
- }
-
- private MessageContentType resolveContentType(Message message, MqMessageFormat expectedFormat) {
- if (message instanceof TextMessage) {
- return expectedFormat == MqMessageFormat.XML ? MessageContentType.XML : MessageContentType.JSON;
- }
- if (expectedFormat == MqMessageFormat.XML) {
- return MessageContentType.XML;
- }
- if (expectedFormat == MqMessageFormat.JSON) {
- return MessageContentType.JSON;
- }
- return MessageContentType.RAW_TEXT;
- }
-
- private String decodeMessage(Message jmsMessage, MqMessageFormat format) {
- try {
- if (jmsMessage instanceof TextMessage textMessage) {
- return textMessage.getText();
- }
-
- if (jmsMessage instanceof BytesMessage bytesMessage) {
- byte[] data = new byte[(int) bytesMessage.getBodyLength()];
- bytesMessage.readBytes(data);
- Charset charset = switch (format) {
- case EBCDIC_870 -> EBCDIC_870;
- case UTF8_1208, JSON, XML -> UTF_8;
- };
- return new String(data, charset);
- }
- return "";
- } catch (Exception e) {
- throw new IllegalStateException("Failed to decode JMS message", e);
- }
- }
-
- private JMSContext getContext() {
- JMSContext current = jmsContext;
- if (current == null) {
- synchronized (contextLock) {
- current = jmsContext;
- if (current == null) {
- jmsContext = current = connectionFactory.createContext(user, password, JMSContext.AUTO_ACKNOWLEDGE);
- }
- }
- }
- return current;
- }
-}
diff --git a/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/KafkaConnector.java b/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/KafkaConnector.java
deleted file mode 100644
index 701d8b8..0000000
--- a/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/KafkaConnector.java
+++ /dev/null
@@ -1,353 +0,0 @@
-package cz.moneta.test.harness.connectors.messaging;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import cz.moneta.test.harness.connectors.Connector;
-import cz.moneta.test.harness.exception.MessagingTimeoutException;
-import cz.moneta.test.harness.messaging.model.MessageContentType;
-import cz.moneta.test.harness.messaging.model.ReceivedMessage;
-import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
-import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
-import io.confluent.kafka.serializers.KafkaAvroDeserializer;
-import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
-import io.confluent.kafka.serializers.KafkaAvroSerializer;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.header.Header;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Base64;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Predicate;
-
-public class KafkaConnector implements Connector {
-
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
- private final Properties producerProps = new Properties();
- private final Properties consumerProps = new Properties();
- private final CachedSchemaRegistryClient schemaRegistryClient;
- private volatile KafkaProducer producer;
- private final Object producerLock = new Object();
-
- public KafkaConnector(String bootstrapServers,
- String apiKey,
- String apiSecret,
- String schemaRegistryUrl,
- String schemaRegistryApiKey,
- String schemaRegistryApiSecret) {
- String jaasConfig = String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
- apiKey,
- apiSecret);
- String schemaRegistryAuth = schemaRegistryApiKey + ":" + schemaRegistryApiSecret;
-
- producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- producerProps.put("security.protocol", "SASL_SSL");
- producerProps.put("sasl.mechanism", "PLAIN");
- producerProps.put("sasl.jaas.config", jaasConfig);
- producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
- producerProps.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
- producerProps.put(AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
- producerProps.put(AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG, schemaRegistryAuth);
- producerProps.put("auto.register.schemas", "false");
-
- consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- consumerProps.put("security.protocol", "SASL_SSL");
- consumerProps.put("sasl.mechanism", "PLAIN");
- consumerProps.put("sasl.jaas.config", jaasConfig);
- consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
- consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
- consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
- consumerProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "false");
- consumerProps.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
- consumerProps.put(AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
- consumerProps.put(AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG, schemaRegistryAuth);
-
- this.schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128);
- }
-
- public void send(String topic, String key, String jsonPayload, Map headers) {
- Objects.requireNonNull(topic, "topic");
- Schema schema = getSchemaForTopic(topic);
- GenericRecord record = jsonToAvro(jsonPayload, schema);
- ProducerRecord producerRecord = new ProducerRecord<>(topic, key, record);
- Optional.ofNullable(headers).orElseGet(HashMap::new)
- .forEach((headerKey, headerValue) -> producerRecord.headers()
- .add(headerKey, String.valueOf(headerValue).getBytes(StandardCharsets.UTF_8)));
-
- try {
- getProducer().send(producerRecord).get(30, TimeUnit.SECONDS);
- } catch (Exception e) {
- throw new IllegalStateException("Failed to send Kafka message to topic: " + topic, e);
- }
- }
-
- public List receive(String topic,
- Predicate filter,
- Duration timeout) {
- long timeoutMillis = Optional.ofNullable(timeout).orElse(Duration.ofSeconds(30)).toMillis();
- long deadline = System.currentTimeMillis() + timeoutMillis;
- long backoff = 100;
-
- try (KafkaConsumer consumer = createConsumer()) {
- List partitions = consumer.partitionsFor(topic).stream()
- .map(info -> new TopicPartition(topic, info.partition()))
- .toList();
- consumer.assign(partitions);
- consumer.seekToEnd(partitions);
-
- while (System.currentTimeMillis() < deadline) {
- ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
- if (!records.isEmpty()) {
- for (ConsumerRecord record : records) {
- ReceivedMessage message = toReceivedMessage(record);
- if (filter == null || filter.test(message)) {
- return List.of(message);
- }
- }
- backoff = 100;
- continue;
- }
- TimeUnit.MILLISECONDS.sleep(backoff);
- backoff = Math.min(backoff * 2, 1000);
- }
- } catch (MessagingTimeoutException e) {
- throw e;
- } catch (Exception e) {
- throw new IllegalStateException("Failed to receive Kafka message from topic: " + topic, e);
- }
- throw new MessagingTimeoutException("Timeout waiting for Kafka message from topic: " + topic);
- }
-
- public Map saveOffsets(String topic) {
- try (KafkaConsumer consumer = createConsumer()) {
- List partitions = consumer.partitionsFor(topic).stream()
- .map(info -> new TopicPartition(topic, info.partition()))
- .toList();
- consumer.assign(partitions);
- consumer.seekToEnd(partitions);
- Map offsets = new HashMap<>();
- partitions.forEach(partition -> offsets.put(partition, consumer.position(partition)));
- return offsets;
- }
- }
-
- @Override
- public void close() {
- KafkaProducer current = producer;
- if (current != null) {
- current.close(Duration.ofSeconds(5));
- }
- }
-
- private KafkaProducer getProducer() {
- KafkaProducer current = producer;
- if (current == null) {
- synchronized (producerLock) {
- current = producer;
- if (current == null) {
- producer = current = new KafkaProducer<>(producerProps);
- }
- }
- }
- return current;
- }
-
- private KafkaConsumer createConsumer() {
- Properties properties = new Properties();
- properties.putAll(consumerProps);
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, "harness-" + UUID.randomUUID());
- return new KafkaConsumer<>(properties);
- }
-
- private ReceivedMessage toReceivedMessage(ConsumerRecord record) {
- String body = convertValueToJson(record.value());
- Map headers = new LinkedHashMap<>();
- for (Header header : record.headers()) {
- headers.put(header.key(), new String(header.value(), StandardCharsets.UTF_8));
- }
-
- return new ReceivedMessage(body,
- MessageContentType.JSON,
- headers,
- record.timestamp(),
- record.topic());
- }
-
- private String convertValueToJson(Object value) {
- try {
- if (value instanceof GenericRecord genericRecord) {
- return avroToJson(genericRecord);
- }
- if (value instanceof CharSequence) {
- return value.toString();
- }
- return OBJECT_MAPPER.writeValueAsString(value);
- } catch (Exception e) {
- throw new IllegalStateException("Failed to convert Kafka payload to JSON", e);
- }
- }
-
- private Schema getSchemaForTopic(String topic) {
- String subject = topic + "-value";
- try {
- // Get all versions and use the latest one
- java.util.List versions = schemaRegistryClient.getAllVersions(subject);
- int latestVersion = versions.get(versions.size() - 1);
- io.confluent.kafka.schemaregistry.client.rest.entities.Schema confluentSchema =
- schemaRegistryClient.getByVersion(subject, latestVersion, false);
- String schemaString = confluentSchema.getSchema();
- return new Schema.Parser().parse(schemaString);
- } catch (Exception e) {
- throw new IllegalStateException("Failed to get schema for subject: " + subject, e);
- }
- }
-
- private String avroToJson(GenericRecord record) {
- try {
- return OBJECT_MAPPER.writeValueAsString(convertAvroObject(record));
- } catch (Exception e) {
- throw new IllegalStateException("Failed to convert Avro record to JSON", e);
- }
- }
-
- private GenericRecord jsonToAvro(String jsonPayload, Schema schema) {
- try {
- JsonNode root = OBJECT_MAPPER.readTree(jsonPayload);
- Object converted = convertJsonNode(root, schema);
- return (GenericRecord) converted;
- } catch (Exception e) {
- throw new IllegalStateException("Failed to convert JSON payload to Avro", e);
- }
- }
-
- private Object convertJsonNode(JsonNode node, Schema schema) {
- return switch (schema.getType()) {
- case RECORD -> {
- GenericData.Record record = new GenericData.Record(schema);
- schema.getFields().forEach(field -> record.put(field.name(),
- convertJsonNode(node.path(field.name()), field.schema())));
- yield record;
- }
- case ARRAY -> {
- List