diff --git a/messaging-connection-demo/pom.xml b/messaging-connection-demo/pom.xml
new file mode 100644
index 0000000..2a73534
--- /dev/null
+++ b/messaging-connection-demo/pom.xml
@@ -0,0 +1,55 @@
+
+
+ 4.0.0
+
+ cz.moneta.demo
+ messaging-connection-demo
+ 1.0.0-SNAPSHOT
+
+
+ 17
+ 17
+ UTF-8
+
+ 3.7.1
+ 7.6.1
+ 9.4.2.0
+ 3.1.0
+
+
+
+
+ org.apache.kafka
+ kafka-clients
+ ${kafka.version}
+
+
+
+ io.confluent
+ kafka-avro-serializer
+ ${confluent.version}
+
+
+
+ com.ibm.mq
+ com.ibm.mq.allclient
+ ${ibm.mq.version}
+
+
+
+ jakarta.jms
+ jakarta.jms-api
+ ${jakarta.jms.version}
+
+
+
+
+
+ confluent
+ Confluent Maven Repository
+ https://packages.confluent.io/maven/
+
+
+
diff --git a/messaging-connection-demo/src/main/java/cz/moneta/demo/MessagingConnectionApp.java b/messaging-connection-demo/src/main/java/cz/moneta/demo/MessagingConnectionApp.java
new file mode 100644
index 0000000..4c8ba4f
--- /dev/null
+++ b/messaging-connection-demo/src/main/java/cz/moneta/demo/MessagingConnectionApp.java
@@ -0,0 +1,69 @@
+package cz.moneta.demo;
+
+import com.ibm.msg.client.jms.JmsConnectionFactory;
+import com.ibm.msg.client.jms.JmsFactoryFactory;
+import com.ibm.msg.client.wmq.WMQConstants;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import javax.jms.JMSContext;
+import java.util.Properties;
+
+public class MessagingConnectionApp {
+
+ public static KafkaProducer createKafkaConnection(String bootstrapServers,
+ String apiKey,
+ String apiSecret) {
+ Properties kafkaProps = new Properties();
+ kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ kafkaProps.put("security.protocol", "SASL_SSL");
+ kafkaProps.put("sasl.mechanism", "PLAIN");
+ kafkaProps.put("sasl.jaas.config",
+ String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
+ apiKey, apiSecret));
+
+ return new KafkaProducer<>(kafkaProps);
+ }
+
+ public static JMSContext createMqConnection(String host,
+ int port,
+ String channel,
+ String queueManager,
+ String user,
+ String password) throws Exception {
+ JmsFactoryFactory factoryFactory = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
+ JmsConnectionFactory connectionFactory = factoryFactory.createConnectionFactory();
+
+ connectionFactory.setStringProperty(WMQConstants.WMQ_HOST_NAME, host);
+ connectionFactory.setIntProperty(WMQConstants.WMQ_PORT, port);
+ connectionFactory.setStringProperty(WMQConstants.WMQ_CHANNEL, channel);
+ connectionFactory.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, queueManager);
+ connectionFactory.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
+ connectionFactory.setStringProperty(WMQConstants.USERID, user);
+ connectionFactory.setStringProperty(WMQConstants.PASSWORD, password);
+
+ return connectionFactory.createContext(user, password, JMSContext.AUTO_ACKNOWLEDGE);
+ }
+
+ public static void main(String[] args) throws Exception {
+ String kafkaBootstrap = System.getProperty("kafka.bootstrap", "localhost:9092");
+ String kafkaApiKey = System.getProperty("kafka.apiKey", "api-key");
+ String kafkaApiSecret = System.getProperty("kafka.apiSecret", "api-secret");
+
+ String mqHost = System.getProperty("mq.host", "localhost");
+ int mqPort = Integer.parseInt(System.getProperty("mq.port", "1414"));
+ String mqChannel = System.getProperty("mq.channel", "DEV.APP.SVRCONN");
+ String mqQueueManager = System.getProperty("mq.queueManager", "QM1");
+ String mqUser = System.getProperty("mq.user", "app");
+ String mqPassword = System.getProperty("mq.password", "pass");
+
+ try (KafkaProducer kafkaProducer = createKafkaConnection(kafkaBootstrap, kafkaApiKey, kafkaApiSecret);
+ JMSContext mqContext = createMqConnection(mqHost, mqPort, mqChannel, mqQueueManager, mqUser, mqPassword)) {
+ System.out.println("Kafka connection created: " + (kafkaProducer != null));
+ System.out.println("IBM MQ connection created: " + (mqContext != null));
+ }
+ }
+}
diff --git a/test-harness/pom.xml b/test-harness/pom.xml
index 1aa0937..329e85d 100644
--- a/test-harness/pom.xml
+++ b/test-harness/pom.xml
@@ -29,6 +29,10 @@
1.9.3
1.6
4.0.3
+ 3.7.1
+ 7.6.1
+ 9.4.2.0
+ 3.1.0
@@ -266,6 +270,48 @@
${appium-java-client.version}
+
+ org.apache.kafka
+ kafka-clients
+ ${kafka.version}
+
+
+
+ io.confluent
+ kafka-avro-serializer
+ ${confluent.version}
+
+
+
+ io.confluent
+ kafka-schema-registry-client
+ ${confluent.version}
+
+
+
+ org.apache.avro
+ avro
+ 1.11.3
+
+
+
+ com.ibm.mq
+ com.ibm.mq.allclient
+ ${ibm.mq.version}
+
+
+
+ javax.jms
+ javax.jms-api
+ 2.0.1
+
+
+
+ jakarta.jms
+ jakarta.jms-api
+ ${jakarta.jms.version}
+
+
org.apache.cxf
@@ -405,6 +451,13 @@
+
+ false
+ true
+ confluent
+ Confluent Hub
+ https://packages.confluent.io/maven/
+
false
true
diff --git a/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/IbmMqConnector.java b/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/IbmMqConnector.java
new file mode 100644
index 0000000..9f79254
--- /dev/null
+++ b/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/IbmMqConnector.java
@@ -0,0 +1,253 @@
+package cz.moneta.test.harness.connectors.messaging;
+
+import com.ibm.msg.client.jms.JmsConnectionFactory;
+import com.ibm.msg.client.jms.JmsFactoryFactory;
+import com.ibm.msg.client.wmq.WMQConstants;
+import cz.moneta.test.harness.connectors.Connector;
+import cz.moneta.test.harness.exception.MessagingTimeoutException;
+import cz.moneta.test.harness.messaging.model.MessageContentType;
+import cz.moneta.test.harness.messaging.model.MqMessageFormat;
+import cz.moneta.test.harness.messaging.model.ReceivedMessage;
+import javax.jms.BytesMessage;
+import javax.jms.JMSConsumer;
+import javax.jms.JMSContext;
+import javax.jms.JMSException;
+import javax.jms.JMSProducer;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.TextMessage;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+
+public class IbmMqConnector implements Connector {
+
+ private static final Charset EBCDIC_870 = Charset.forName("IBM870");
+ private static final Charset UTF_8 = StandardCharsets.UTF_8;
+ private final JmsConnectionFactory connectionFactory;
+ private final String user;
+ private final String password;
+ private final Object contextLock = new Object();
+ private volatile JMSContext jmsContext;
+
+ public IbmMqConnector(String host,
+ int port,
+ String channel,
+ String queueManager,
+ String user,
+ String password,
+ String keystorePath,
+ String keystorePassword) {
+ this.user = user;
+ this.password = password;
+ try {
+ if (keystorePath != null && !keystorePath.isBlank()) {
+ System.setProperty("javax.net.ssl.keyStore", keystorePath);
+ if (keystorePassword != null) {
+ System.setProperty("javax.net.ssl.keyStorePassword", keystorePassword);
+ }
+ }
+
+ JmsFactoryFactory factoryFactory = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
+ connectionFactory = factoryFactory.createConnectionFactory();
+ connectionFactory.setStringProperty(WMQConstants.WMQ_HOST_NAME, host);
+ connectionFactory.setIntProperty(WMQConstants.WMQ_PORT, port);
+ connectionFactory.setStringProperty(WMQConstants.WMQ_CHANNEL, channel);
+ connectionFactory.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, queueManager);
+ connectionFactory.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
+ connectionFactory.setStringProperty(WMQConstants.USERID, user);
+ connectionFactory.setStringProperty(WMQConstants.PASSWORD, password);
+ } catch (Exception e) {
+ throw new IllegalStateException("Failed to initialize IBM MQ connection factory", e);
+ }
+ }
+
+ public void send(String queueName,
+ String payload,
+ MqMessageFormat format,
+ Map properties) {
+ switch (Objects.requireNonNull(format, "format")) {
+ case JSON, XML -> sendTextMessage(queueName, payload, properties);
+ case EBCDIC_870 -> sendBytesMessage(queueName, payload, EBCDIC_870, 870, properties);
+ case UTF8_1208 -> sendBytesMessage(queueName, payload, UTF_8, 1208, properties);
+ }
+ }
+
+ public ReceivedMessage receive(String queueName,
+ String messageSelector,
+ MqMessageFormat expectedFormat,
+ Duration timeout) {
+ JMSContext context = getContext();
+ Queue queue = context.createQueue("queue:///" + queueName);
+ try (JMSConsumer consumer = messageSelector == null || messageSelector.isBlank()
+ ? context.createConsumer(queue)
+ : context.createConsumer(queue, messageSelector)) {
+ Message message = consumer.receive(Optional.ofNullable(timeout).orElse(Duration.ofSeconds(30)).toMillis());
+ if (message == null) {
+ throw new MessagingTimeoutException("Timeout waiting for IBM MQ message from queue: " + queueName);
+ }
+ return toReceivedMessage(message, queueName, expectedFormat);
+ }
+ }
+
+ public List browse(String queueName,
+ Predicate filter,
+ MqMessageFormat expectedFormat,
+ Duration timeout) {
+ long timeoutMillis = Optional.ofNullable(timeout).orElse(Duration.ofSeconds(30)).toMillis();
+ long deadline = System.currentTimeMillis() + timeoutMillis;
+ long backoff = 100;
+ JMSContext context = getContext();
+ Queue queue = context.createQueue("queue:///" + queueName);
+
+ while (System.currentTimeMillis() < deadline) {
+ List matched = new ArrayList<>();
+ try (QueueBrowser browser = context.createBrowser(queue)) {
+ Enumeration> messages = browser.getEnumeration();
+ while (messages.hasMoreElements()) {
+ Message message = (Message) messages.nextElement();
+ ReceivedMessage receivedMessage = toReceivedMessage(message, queueName, expectedFormat);
+ if (filter == null || filter.test(receivedMessage)) {
+ matched.add(receivedMessage);
+ }
+ }
+ } catch (JMSException e) {
+ throw new IllegalStateException("Failed to browse IBM MQ queue: " + queueName, e);
+ }
+
+ if (!matched.isEmpty()) {
+ return matched;
+ }
+
+ try {
+ TimeUnit.MILLISECONDS.sleep(backoff);
+ } catch (InterruptedException ignored) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ backoff = Math.min(backoff * 2, 1000);
+ }
+ throw new MessagingTimeoutException("Timeout waiting for IBM MQ message from queue: " + queueName);
+ }
+
+ @Override
+ public void close() {
+ JMSContext context = jmsContext;
+ if (context != null) {
+ context.close();
+ }
+ }
+
+ private void sendTextMessage(String queueName, String payload, Map properties) {
+ JMSContext context = getContext();
+ JMSProducer producer = context.createProducer();
+ TextMessage message = context.createTextMessage(payload);
+ applyProperties(message, properties);
+ producer.send(context.createQueue("queue:///" + queueName), message);
+ }
+
+ private void sendBytesMessage(String queueName,
+ String payload,
+ Charset charset,
+ int ccsid,
+ Map properties) {
+ try {
+ JMSContext context = getContext();
+ JMSProducer producer = context.createProducer();
+ BytesMessage message = context.createBytesMessage();
+ message.writeBytes(Optional.ofNullable(payload).orElse("").getBytes(charset));
+ message.setIntProperty(WMQConstants.JMS_IBM_CHARACTER_SET, ccsid);
+ applyProperties(message, properties);
+ producer.send(context.createQueue("queue:///" + queueName), message);
+ } catch (JMSException e) {
+ throw new IllegalStateException("Failed to send bytes message to IBM MQ queue: " + queueName, e);
+ }
+ }
+
+ private void applyProperties(Message message, Map properties) {
+ Optional.ofNullable(properties).orElseGet(Collections::emptyMap)
+ .forEach((key, value) -> {
+ try {
+ message.setStringProperty(key, String.valueOf(value));
+ } catch (JMSException e) {
+ throw new IllegalStateException("Failed to set JMS property: " + key, e);
+ }
+ });
+ }
+
+ private ReceivedMessage toReceivedMessage(Message message, String queueName, MqMessageFormat format) {
+ try {
+ Map headers = new LinkedHashMap<>();
+ Enumeration> names = message.getPropertyNames();
+ while (names.hasMoreElements()) {
+ String name = String.valueOf(names.nextElement());
+ headers.put(name, String.valueOf(message.getObjectProperty(name)));
+ }
+
+ String body = decodeMessage(message, format);
+ MessageContentType contentType = resolveContentType(message, format);
+ return new ReceivedMessage(body, contentType, headers, message.getJMSTimestamp(), queueName);
+ } catch (JMSException e) {
+ throw new IllegalStateException("Failed to decode IBM MQ message", e);
+ }
+ }
+
+ private MessageContentType resolveContentType(Message message, MqMessageFormat expectedFormat) {
+ if (message instanceof TextMessage) {
+ return expectedFormat == MqMessageFormat.XML ? MessageContentType.XML : MessageContentType.JSON;
+ }
+ if (expectedFormat == MqMessageFormat.XML) {
+ return MessageContentType.XML;
+ }
+ if (expectedFormat == MqMessageFormat.JSON) {
+ return MessageContentType.JSON;
+ }
+ return MessageContentType.RAW_TEXT;
+ }
+
+ private String decodeMessage(Message jmsMessage, MqMessageFormat format) {
+ try {
+ if (jmsMessage instanceof TextMessage textMessage) {
+ return textMessage.getText();
+ }
+
+ if (jmsMessage instanceof BytesMessage bytesMessage) {
+ byte[] data = new byte[(int) bytesMessage.getBodyLength()];
+ bytesMessage.readBytes(data);
+ Charset charset = switch (format) {
+ case EBCDIC_870 -> EBCDIC_870;
+ case UTF8_1208, JSON, XML -> UTF_8;
+ };
+ return new String(data, charset);
+ }
+ return "";
+ } catch (Exception e) {
+ throw new IllegalStateException("Failed to decode JMS message", e);
+ }
+ }
+
+ private JMSContext getContext() {
+ JMSContext current = jmsContext;
+ if (current == null) {
+ synchronized (contextLock) {
+ current = jmsContext;
+ if (current == null) {
+ jmsContext = current = connectionFactory.createContext(user, password, JMSContext.AUTO_ACKNOWLEDGE);
+ }
+ }
+ }
+ return current;
+ }
+}
diff --git a/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/KafkaConnector.java b/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/KafkaConnector.java
new file mode 100644
index 0000000..701d8b8
--- /dev/null
+++ b/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/KafkaConnector.java
@@ -0,0 +1,353 @@
+package cz.moneta.test.harness.connectors.messaging;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import cz.moneta.test.harness.connectors.Connector;
+import cz.moneta.test.harness.exception.MessagingTimeoutException;
+import cz.moneta.test.harness.messaging.model.MessageContentType;
+import cz.moneta.test.harness.messaging.model.ReceivedMessage;
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+
+public class KafkaConnector implements Connector {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private final Properties producerProps = new Properties();
+ private final Properties consumerProps = new Properties();
+ private final CachedSchemaRegistryClient schemaRegistryClient;
+ private volatile KafkaProducer producer;
+ private final Object producerLock = new Object();
+
+ public KafkaConnector(String bootstrapServers,
+ String apiKey,
+ String apiSecret,
+ String schemaRegistryUrl,
+ String schemaRegistryApiKey,
+ String schemaRegistryApiSecret) {
+ String jaasConfig = String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
+ apiKey,
+ apiSecret);
+ String schemaRegistryAuth = schemaRegistryApiKey + ":" + schemaRegistryApiSecret;
+
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ producerProps.put("security.protocol", "SASL_SSL");
+ producerProps.put("sasl.mechanism", "PLAIN");
+ producerProps.put("sasl.jaas.config", jaasConfig);
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
+ producerProps.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
+ producerProps.put(AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
+ producerProps.put(AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG, schemaRegistryAuth);
+ producerProps.put("auto.register.schemas", "false");
+
+ consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ consumerProps.put("security.protocol", "SASL_SSL");
+ consumerProps.put("sasl.mechanism", "PLAIN");
+ consumerProps.put("sasl.jaas.config", jaasConfig);
+ consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
+ consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+ consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ consumerProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "false");
+ consumerProps.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
+ consumerProps.put(AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
+ consumerProps.put(AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG, schemaRegistryAuth);
+
+ this.schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128);
+ }
+
+ public void send(String topic, String key, String jsonPayload, Map headers) {
+ Objects.requireNonNull(topic, "topic");
+ Schema schema = getSchemaForTopic(topic);
+ GenericRecord record = jsonToAvro(jsonPayload, schema);
+ ProducerRecord producerRecord = new ProducerRecord<>(topic, key, record);
+ Optional.ofNullable(headers).orElseGet(HashMap::new)
+ .forEach((headerKey, headerValue) -> producerRecord.headers()
+ .add(headerKey, String.valueOf(headerValue).getBytes(StandardCharsets.UTF_8)));
+
+ try {
+ getProducer().send(producerRecord).get(30, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ throw new IllegalStateException("Failed to send Kafka message to topic: " + topic, e);
+ }
+ }
+
+ public List receive(String topic,
+ Predicate filter,
+ Duration timeout) {
+ long timeoutMillis = Optional.ofNullable(timeout).orElse(Duration.ofSeconds(30)).toMillis();
+ long deadline = System.currentTimeMillis() + timeoutMillis;
+ long backoff = 100;
+
+ try (KafkaConsumer consumer = createConsumer()) {
+ List partitions = consumer.partitionsFor(topic).stream()
+ .map(info -> new TopicPartition(topic, info.partition()))
+ .toList();
+ consumer.assign(partitions);
+ consumer.seekToEnd(partitions);
+
+ while (System.currentTimeMillis() < deadline) {
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
+ if (!records.isEmpty()) {
+ for (ConsumerRecord record : records) {
+ ReceivedMessage message = toReceivedMessage(record);
+ if (filter == null || filter.test(message)) {
+ return List.of(message);
+ }
+ }
+ backoff = 100;
+ continue;
+ }
+ TimeUnit.MILLISECONDS.sleep(backoff);
+ backoff = Math.min(backoff * 2, 1000);
+ }
+ } catch (MessagingTimeoutException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new IllegalStateException("Failed to receive Kafka message from topic: " + topic, e);
+ }
+ throw new MessagingTimeoutException("Timeout waiting for Kafka message from topic: " + topic);
+ }
+
+ public Map saveOffsets(String topic) {
+ try (KafkaConsumer consumer = createConsumer()) {
+ List partitions = consumer.partitionsFor(topic).stream()
+ .map(info -> new TopicPartition(topic, info.partition()))
+ .toList();
+ consumer.assign(partitions);
+ consumer.seekToEnd(partitions);
+ Map offsets = new HashMap<>();
+ partitions.forEach(partition -> offsets.put(partition, consumer.position(partition)));
+ return offsets;
+ }
+ }
+
+ @Override
+ public void close() {
+ KafkaProducer current = producer;
+ if (current != null) {
+ current.close(Duration.ofSeconds(5));
+ }
+ }
+
+ private KafkaProducer getProducer() {
+ KafkaProducer current = producer;
+ if (current == null) {
+ synchronized (producerLock) {
+ current = producer;
+ if (current == null) {
+ producer = current = new KafkaProducer<>(producerProps);
+ }
+ }
+ }
+ return current;
+ }
+
+ private KafkaConsumer createConsumer() {
+ Properties properties = new Properties();
+ properties.putAll(consumerProps);
+ properties.put(ConsumerConfig.GROUP_ID_CONFIG, "harness-" + UUID.randomUUID());
+ return new KafkaConsumer<>(properties);
+ }
+
+ private ReceivedMessage toReceivedMessage(ConsumerRecord record) {
+ String body = convertValueToJson(record.value());
+ Map headers = new LinkedHashMap<>();
+ for (Header header : record.headers()) {
+ headers.put(header.key(), new String(header.value(), StandardCharsets.UTF_8));
+ }
+
+ return new ReceivedMessage(body,
+ MessageContentType.JSON,
+ headers,
+ record.timestamp(),
+ record.topic());
+ }
+
+ private String convertValueToJson(Object value) {
+ try {
+ if (value instanceof GenericRecord genericRecord) {
+ return avroToJson(genericRecord);
+ }
+ if (value instanceof CharSequence) {
+ return value.toString();
+ }
+ return OBJECT_MAPPER.writeValueAsString(value);
+ } catch (Exception e) {
+ throw new IllegalStateException("Failed to convert Kafka payload to JSON", e);
+ }
+ }
+
+ private Schema getSchemaForTopic(String topic) {
+ String subject = topic + "-value";
+ try {
+ // Get all versions and use the latest one
+ java.util.List versions = schemaRegistryClient.getAllVersions(subject);
+ int latestVersion = versions.get(versions.size() - 1);
+ io.confluent.kafka.schemaregistry.client.rest.entities.Schema confluentSchema =
+ schemaRegistryClient.getByVersion(subject, latestVersion, false);
+ String schemaString = confluentSchema.getSchema();
+ return new Schema.Parser().parse(schemaString);
+ } catch (Exception e) {
+ throw new IllegalStateException("Failed to get schema for subject: " + subject, e);
+ }
+ }
+
+ private String avroToJson(GenericRecord record) {
+ try {
+ return OBJECT_MAPPER.writeValueAsString(convertAvroObject(record));
+ } catch (Exception e) {
+ throw new IllegalStateException("Failed to convert Avro record to JSON", e);
+ }
+ }
+
+ private GenericRecord jsonToAvro(String jsonPayload, Schema schema) {
+ try {
+ JsonNode root = OBJECT_MAPPER.readTree(jsonPayload);
+ Object converted = convertJsonNode(root, schema);
+ return (GenericRecord) converted;
+ } catch (Exception e) {
+ throw new IllegalStateException("Failed to convert JSON payload to Avro", e);
+ }
+ }
+
+ private Object convertJsonNode(JsonNode node, Schema schema) {
+ return switch (schema.getType()) {
+ case RECORD -> {
+ GenericData.Record record = new GenericData.Record(schema);
+ schema.getFields().forEach(field -> record.put(field.name(),
+ convertJsonNode(node.path(field.name()), field.schema())));
+ yield record;
+ }
+ case ARRAY -> {
+ List