executeSql(String sqlToExecute) {
- return super.executeSql(sqlToExecute, SQLDialect.POSTGRES);
- }
-}
+package cz.moneta.test.harness.connectors.database;
+
+import org.jooq.Record;
+import org.jooq.Result;
+import org.jooq.SQLDialect;
+
+public class PostgresConnector extends DatabaseConnector {
+
+ public PostgresConnector(String url, String user, String password) {
+ super(url, user, password);
+ }
+
+ public Result executeSql(String sqlToExecute) {
+ return super.executeSql(sqlToExecute, SQLDialect.POSTGRES);
+ }
+}
diff --git a/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/IbmMqConnector.java b/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/IbmMqConnector.java
index 85db0dc..f1e7fdd 100644
--- a/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/IbmMqConnector.java
+++ b/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/IbmMqConnector.java
@@ -1,24 +1,23 @@
package cz.moneta.test.harness.connectors.messaging;
-import java.io.FileInputStream;
+import java.io.EOFException;
+import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.security.KeyStore;
+import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Enumeration;
import java.util.HashMap;
+import java.util.Hashtable;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
-import javax.jms.BytesMessage;
-import javax.jms.JMSConsumer;
-import javax.jms.JMSContext;
-import javax.jms.JMSException;
-import javax.jms.JMSRuntimeException;
-import javax.jms.Message;
-import javax.jms.TextMessage;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
@@ -27,26 +26,28 @@ import javax.net.ssl.TrustManagerFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import com.ibm.mq.jms.MQConnectionFactory;
+import com.ibm.mq.MQException;
+import com.ibm.mq.MQGetMessageOptions;
+import com.ibm.mq.MQMessage;
+import com.ibm.mq.MQPutMessageOptions;
+import com.ibm.mq.MQQueue;
+import com.ibm.mq.MQQueueManager;
+import com.ibm.mq.constants.CMQC;
import com.ibm.msg.client.wmq.WMQConstants;
import cz.moneta.test.harness.connectors.Connector;
-import cz.moneta.test.harness.messaging.MessageContentType;
-import cz.moneta.test.harness.messaging.MqMessageFormat;
-import cz.moneta.test.harness.messaging.ReceivedMessage;
-import cz.moneta.test.harness.messaging.exception.MessagingConnectionException;
-import cz.moneta.test.harness.messaging.exception.MessagingDestinationException;
-import cz.moneta.test.harness.messaging.exception.MessagingTimeoutException;
import cz.moneta.test.harness.support.messaging.ImqRequest;
+import cz.moneta.test.harness.support.messaging.MessageContentType;
+import cz.moneta.test.harness.support.messaging.MqMessageFormat;
+import cz.moneta.test.harness.support.messaging.ReceivedMessage;
+import cz.moneta.test.harness.support.messaging.exception.MessagingConnectionException;
+import cz.moneta.test.harness.support.messaging.exception.MessagingDestinationException;
+import cz.moneta.test.harness.support.messaging.exception.MessagingTimeoutException;
/**
- * IBM MQ connector using JMS client with Jakarta JMS API. Supports
- * multi-instance Queue Manager, SSL/TLS, and multiple message formats.
- *
- * Supported formats: - JSON: JMS TextMessage with plain JSON string (default) -
- * XML: JMS TextMessage with XML string - UTF-8 (CCSID 1208): JMS BytesMessage
- * with UTF-8 encoding - EBCDIC (CCSID 870): JMS BytesMessage with EBCDIC
- * IBM-870 encoding
+ * IBM MQ connector using the native com.ibm.mq classes. Supports
+ * multi-instance Queue Manager connection lists, SSL/TLS, message properties,
+ * and JSON/XML/UTF-8/EBCDIC payload formats.
*/
public class IbmMqConnector implements Connector {
@@ -59,12 +60,19 @@ public class IbmMqConnector implements Connector {
private static final long DEFAULT_MAX_POLL_INTERVAL_MS = 1000;
private static final String TLS_VERSION = "TLSv1.2";
+ private static final String JMS_ID_PREFIX = "ID:";
+ private static final Pattern SELECTOR_EQUALS_PATTERN = Pattern
+ .compile("\\s*([A-Za-z_][A-Za-z0-9_.-]*)\\s*=\\s*'((?:''|[^'])*)'\\s*");
- private final MQConnectionFactory connectionFactory;
- private JMSContext jmsContext;
+ private final String connectionNameList;
+ private final String channel;
private final String queueManager;
private final String user;
private final String password;
+ private final SSLSocketFactory sslSocketFactory;
+ private final String sslCipherSuite;
+
+ private MQQueueManager mqQueueManager;
/**
* Constructor with multi-instance Queue Manager support.
@@ -82,35 +90,17 @@ public class IbmMqConnector implements Connector {
*/
public IbmMqConnector(String connectionNameList, String channel, String queueManager, String user, String password,
String keystorePath, String keystorePassword, String sslCipherSuite) {
+ this.connectionNameList = connectionNameList;
+ this.channel = channel;
this.queueManager = queueManager;
this.user = user;
this.password = password;
+ this.sslCipherSuite = sslCipherSuite;
try {
- connectionFactory = new MQConnectionFactory();
- connectionFactory.setConnectionNameList(connectionNameList);
- connectionFactory.setQueueManager(queueManager);
- connectionFactory.setChannel(channel);
- connectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
- if (user != null && !user.isBlank()) {
- connectionFactory.setStringProperty(WMQConstants.USERID, user);
- }
- if (password != null && !password.isBlank()) {
- connectionFactory.setStringProperty(WMQConstants.PASSWORD, password);
- }
-
- if (keystorePath != null && !keystorePath.isBlank() && keystorePassword != null
- && !keystorePassword.isBlank()) {
- connectionFactory.setSSLSocketFactory(getSslSocketFactory(keystorePath, keystorePassword));
- }
-
- if (sslCipherSuite != null && !sslCipherSuite.isBlank()) {
- connectionFactory.setSSLCipherSuite(sslCipherSuite);
- }
-
- // Initialize JMS context
+ this.sslSocketFactory = keystorePath != null && !keystorePath.isBlank() && keystorePassword != null
+ && !keystorePassword.isBlank() ? getSslSocketFactory(keystorePath, keystorePassword) : null;
connect();
-
} catch (Exception e) {
throw new MessagingConnectionException("Failed to create IBM MQ connection to " + queueManager, e);
}
@@ -121,97 +111,82 @@ public class IbmMqConnector implements Connector {
*/
private void connect() {
try {
- this.jmsContext = connectionFactory.createContext(user, password, JMSContext.AUTO_ACKNOWLEDGE);
- this.jmsContext.start();
+ this.mqQueueManager = new MQQueueManager(queueManager, createConnectionProperties());
LOG.info("Connected to IBM MQ: {}", queueManager);
- } catch (Exception e) {
+ } catch (MQException e) {
throw new MessagingConnectionException(
"Failed to connect to IBM MQ: " + queueManager + " - " + e.getMessage(), e);
}
}
- /**
- * Send a JSON or XML message as TextMessage.
- */
- private void sendTextMessage(String queueName, String payload, Map properties) {
- javax.jms.Queue queue = getQueue(queueName);
+ private Hashtable createConnectionProperties() {
+ Hashtable properties = new Hashtable<>();
+ if (connectionNameList != null && !connectionNameList.isBlank()) {
+ properties.put(WMQConstants.WMQ_CONNECTION_NAME_LIST, connectionNameList);
+ }
+ properties.put(CMQC.CHANNEL_PROPERTY, channel);
+ properties.put(CMQC.TRANSPORT_PROPERTY, CMQC.TRANSPORT_MQSERIES_CLIENT);
- TextMessage message = jmsContext.createTextMessage(payload);
-
- // Set JMS properties
- if (properties != null) {
- for (Map.Entry entry : properties.entrySet()) {
- try {
- if (entry.getKey().equals(ImqRequest.PROP_JMS_CORRELATION_ID)) {
- message.setJMSCorrelationID(entry.getValue());
- continue;
- } else if (entry.getKey().equals(ImqRequest.PROP_JMS_TYPE)) {
- message.setJMSType(entry.getValue());
- continue;
- } else if (entry.getKey().equals(ImqRequest.PROP_JMS_MESSAGE_ID)) {
- message.setJMSMessageID(entry.getValue());
- continue;
- }
- message.setStringProperty(entry.getKey(), entry.getValue());
- } catch (JMSException e) {
- LOG.warn("Failed to set property: {}", entry.getKey(), e);
- }
- }
+ if (user != null && !user.isBlank()) {
+ properties.put(CMQC.USER_ID_PROPERTY, user);
+ }
+ if (password != null && !password.isBlank()) {
+ properties.put(CMQC.PASSWORD_PROPERTY, password);
+ }
+ if (sslSocketFactory != null) {
+ properties.put(CMQC.SSL_SOCKET_FACTORY_PROPERTY, sslSocketFactory);
+ }
+ if (sslCipherSuite != null && !sslCipherSuite.isBlank()) {
+ properties.put(CMQC.SSL_CIPHER_SUITE_PROPERTY, sslCipherSuite);
}
- try {
- jmsContext.createProducer().send(queue, message);
- } catch (RuntimeException e) {
- throw new MessagingDestinationException("Failed to send message to queue: " + queueName, e);
- }
- LOG.debug("Sent JSON/XML message to queue: {}", queueName);
+ return properties;
}
/**
- * Send a message as BytesMessage with specific encoding and CCSID.
+ * Send a JSON or XML message as MQ string message.
+ */
+ private void sendTextMessage(String queueName, String payload, Map properties) {
+ sendMessage(queueName, payload.getBytes(UTF_8), UTF_8, 1208, CMQC.MQFMT_STRING, properties, "JSON/XML");
+ }
+
+ /**
+ * Send a message as raw bytes with specific encoding and CCSID.
*/
private void sendBytesMessage(String queueName, String payload, Charset charset, int ccsid,
Map properties) {
- javax.jms.Queue queue = getQueue(queueName);
+ sendMessage(queueName, payload.getBytes(charset), charset, ccsid, CMQC.MQFMT_NONE, properties,
+ charset.displayName());
+ }
- BytesMessage message = jmsContext.createBytesMessage();
-
- // Convert payload to bytes using specified charset
- byte[] bytes = payload.getBytes(charset);
+ private void sendMessage(String queueName, byte[] payload, Charset charset, int ccsid, String mqFormat,
+ Map properties, String logFormat) {
+ MQQueue queue = null;
try {
- message.writeBytes(bytes);
- message.setIntProperty("CCSID", ccsid);
- } catch (JMSException e) {
- throw new MessagingDestinationException("Failed to create bytes message", e);
- }
+ int openOptions = CMQC.MQOO_OUTPUT | CMQC.MQOO_FAIL_IF_QUIESCING;
- // Set JMS properties
- if (properties != null) {
- for (Map.Entry entry : properties.entrySet()) {
- try {
- if (entry.getKey().equals(ImqRequest.PROP_JMS_CORRELATION_ID)) {
- message.setJMSCorrelationID(entry.getValue());
- continue;
- } else if (entry.getKey().equals(ImqRequest.PROP_JMS_TYPE)) {
- message.setJMSType(entry.getValue());
- continue;
- } else if (entry.getKey().equals(ImqRequest.PROP_JMS_MESSAGE_ID)) {
- message.setJMSMessageID(entry.getValue());
- continue;
- }
- message.setStringProperty(entry.getKey(), entry.getValue());
- } catch (JMSException e) {
- LOG.warn("Failed to set property: {}", entry.getKey(), e);
- }
+ queue = openQueue(queueName, openOptions);
+
+ MQMessage message = new MQMessage();
+ message.format = mqFormat;
+ message.characterSet = ccsid;
+ message.write(payload);
+ if (!CMQC.MQFMT_STRING.equals(mqFormat)) {
+ message.setIntProperty("CCSID", ccsid);
}
+
+ MQPutMessageOptions putOptions = new MQPutMessageOptions();
+ putOptions.options = CMQC.MQPMO_NO_SYNCPOINT | CMQC.MQPMO_FAIL_IF_QUIESCING;
+ applyMessageProperties(message, properties);
+
+ queue.put(message, putOptions);
+ } catch (MQException | IOException e) {
+ throw new MessagingDestinationException("Failed to send message to queue: " + queueName, e);
+ } finally {
+ closeQueue(queue, queueName);
}
- try {
- jmsContext.createProducer().send(queue, message);
- } catch (RuntimeException e) {
- throw new MessagingDestinationException("Failed to send message to queue: " + queueName, e);
- }
- LOG.debug("Sent {} message to queue: {}", charset, queueName);
+ LOG.debug("Sent {} message ({}) to queue: {}", logFormat, charset, queueName);
}
/**
@@ -220,7 +195,7 @@ public class IbmMqConnector implements Connector {
* @param queueName Queue name
* @param payload Message payload
* @param format Message format (JSON, XML, EBCDIC_870, UTF8_1208)
- * @param properties JMS properties to set
+ * @param properties message properties to set
*/
public void send(String queueName, String payload, MqMessageFormat format, Map properties) {
switch (format) {
@@ -234,56 +209,99 @@ public class IbmMqConnector implements Connector {
* Receive a message from a queue with timeout.
*
* @param queueName Queue name
- * @param messageSelector JMS message selector (optional)
+ * @param messageSelector JMS-style message selector (optional). Equality
+ * predicates joined by AND are evaluated client-side.
* @param format Expected message format
* @param timeout Timeout duration
* @return Received message
*/
- public ReceivedMessage receive(String queueName, String messageSelector, MqMessageFormat format,
- java.time.Duration timeout) {
- long timeoutMs = timeout.toMillis();
+ public ReceivedMessage receive(String queueName, String messageSelector, MqMessageFormat format, Duration timeout) {
+ if (messageSelector == null || messageSelector.isBlank()) {
+ return receiveNext(queueName, format, timeout);
+ }
- javax.jms.Queue queue = getQueue(queueName);
- JMSConsumer consumer = (messageSelector == null || messageSelector.isBlank() ? jmsContext.createConsumer(queue)
- : jmsContext.createConsumer(queue, messageSelector));
-
- AtomicBoolean messageFound = new AtomicBoolean(false);
- ReceivedMessage received = null;
-
- long pollInterval = DEFAULT_POLL_INTERVAL_MS;
- long remainingTime = timeoutMs;
+ return receiveMatching(queueName, messageSelector, format, timeout);
+ }
+ private ReceivedMessage receiveNext(String queueName, MqMessageFormat format, Duration timeout) {
+ MQQueue queue = null;
try {
- while (remainingTime > 0 && !messageFound.get()) {
- Message message = consumer.receive(remainingTime);
+ queue = openQueue(queueName, CMQC.MQOO_INPUT_SHARED | CMQC.MQOO_FAIL_IF_QUIESCING);
+ MQMessage message = new MQMessage();
+ MQGetMessageOptions getOptions = new MQGetMessageOptions();
+ getOptions.options = CMQC.MQGMO_WAIT | CMQC.MQGMO_NO_SYNCPOINT | CMQC.MQGMO_FAIL_IF_QUIESCING;
+ getOptions.waitInterval = toWaitInterval(timeout.toMillis());
- if (message != null) {
- received = decodeMessage(message, queueName, format);
- messageFound.set(true);
- } else {
- // Exponential backoff
- pollInterval = Math.min(pollInterval * 2, DEFAULT_MAX_POLL_INTERVAL_MS);
- remainingTime -= pollInterval;
- }
- }
-
- if (received == null) {
+ queue.get(message, getOptions);
+ return decodeMessage(message, queueName, format);
+ } catch (MQException e) {
+ if (e.reasonCode == CMQC.MQRC_NO_MSG_AVAILABLE) {
throw new MessagingTimeoutException("No message matching filter found on queue '" + queueName
+ "' within " + timeout.toMillis() + "ms");
}
-
- return received;
-
- } catch (MessagingTimeoutException e) {
- throw e;
- } catch (Exception e) {
throw new MessagingDestinationException("Failed to receive message from queue: " + queueName, e);
} finally {
- try {
- consumer.close();
- } catch (JMSRuntimeException e) {
- LOG.warn("Failed to close consumer", e);
+ closeQueue(queue, queueName);
+ }
+ }
+
+ private ReceivedMessage receiveMatching(String queueName, String messageSelector, MqMessageFormat format,
+ Duration timeout) {
+ long deadline = System.currentTimeMillis() + timeout.toMillis();
+ long pollInterval = DEFAULT_POLL_INTERVAL_MS;
+ Selector selector = Selector.parse(messageSelector);
+
+ while (System.currentTimeMillis() < deadline) {
+ ReceivedMessage received = tryReceiveMatching(queueName, format, selector);
+ if (received != null) {
+ return received;
}
+
+ sleepQuietly(Math.min(pollInterval, Math.max(1, deadline - System.currentTimeMillis())));
+ pollInterval = Math.min(pollInterval * 2, DEFAULT_MAX_POLL_INTERVAL_MS);
+ }
+
+ throw new MessagingTimeoutException("No message matching filter found on queue '" + queueName + "' within "
+ + timeout.toMillis() + "ms");
+ }
+
+ private ReceivedMessage tryReceiveMatching(String queueName, MqMessageFormat format, Selector selector) {
+ MQQueue queue = null;
+ try {
+ queue = openQueue(queueName,
+ CMQC.MQOO_BROWSE | CMQC.MQOO_INPUT_SHARED | CMQC.MQOO_FAIL_IF_QUIESCING);
+
+ MQGetMessageOptions browseOptions = new MQGetMessageOptions();
+ browseOptions.options = CMQC.MQGMO_BROWSE_FIRST | CMQC.MQGMO_NO_SYNCPOINT | CMQC.MQGMO_FAIL_IF_QUIESCING;
+
+ while (true) {
+ MQMessage browsedMessage = new MQMessage();
+ try {
+ queue.get(browsedMessage, browseOptions);
+ } catch (MQException e) {
+ if (e.reasonCode == CMQC.MQRC_NO_MSG_AVAILABLE) {
+ return null;
+ }
+ throw e;
+ }
+
+ ReceivedMessage browsed = decodeMessage(browsedMessage, queueName, format);
+ if (selector.matches(browsed.getHeaders())) {
+ MQMessage removedMessage = new MQMessage();
+ MQGetMessageOptions removeOptions = new MQGetMessageOptions();
+ removeOptions.options = CMQC.MQGMO_MSG_UNDER_CURSOR | CMQC.MQGMO_NO_SYNCPOINT
+ | CMQC.MQGMO_FAIL_IF_QUIESCING;
+ queue.get(removedMessage, removeOptions);
+ return decodeMessage(removedMessage, queueName, format);
+ }
+
+ browseOptions.options = CMQC.MQGMO_BROWSE_NEXT | CMQC.MQGMO_NO_SYNCPOINT
+ | CMQC.MQGMO_FAIL_IF_QUIESCING;
+ }
+ } catch (MQException | RuntimeException e) {
+ throw new MessagingDestinationException("Failed to receive message from queue: " + queueName, e);
+ } finally {
+ closeQueue(queue, queueName);
}
}
@@ -291,7 +309,8 @@ public class IbmMqConnector implements Connector {
* Browse a queue (non-destructive read).
*
* @param queueName Queue name
- * @param messageSelector JMS message selector (optional)
+ * @param messageSelector JMS-style message selector (optional). Equality
+ * predicates joined by AND are evaluated client-side.
* @param format Expected message format
* @param maxMessages Maximum number of messages to browse
* @return List of received messages
@@ -299,170 +318,268 @@ public class IbmMqConnector implements Connector {
public List browse(String queueName, String messageSelector, MqMessageFormat format,
int maxMessages) {
List messages = new ArrayList<>();
- javax.jms.Queue queue = getQueue(queueName);
+ Selector selector = Selector.parse(messageSelector);
+ MQQueue queue = null;
- try (javax.jms.QueueBrowser browser = (messageSelector == null || messageSelector.isBlank())
- ? jmsContext.createBrowser(queue)
- : jmsContext.createBrowser(queue, messageSelector)) {
+ try {
+ queue = openQueue(queueName, CMQC.MQOO_BROWSE | CMQC.MQOO_FAIL_IF_QUIESCING);
- Enumeration> enumeration = browser.getEnumeration();
- int count = 0;
+ MQGetMessageOptions browseOptions = new MQGetMessageOptions();
+ browseOptions.options = CMQC.MQGMO_BROWSE_FIRST | CMQC.MQGMO_NO_SYNCPOINT | CMQC.MQGMO_FAIL_IF_QUIESCING;
- while (enumeration.hasMoreElements() && count < maxMessages) {
- Message message = (Message) enumeration.nextElement();
- if (message != null) {
- ReceivedMessage received = decodeMessage(message, queueName, format);
- messages.add(received);
- count++;
+ while (messages.size() < maxMessages) {
+ MQMessage message = new MQMessage();
+ try {
+ queue.get(message, browseOptions);
+ } catch (MQException e) {
+ if (e.reasonCode == CMQC.MQRC_NO_MSG_AVAILABLE) {
+ break;
+ }
+ throw e;
}
+
+ ReceivedMessage received = decodeMessage(message, queueName, format);
+ if (selector.matches(received.getHeaders())) {
+ messages.add(received);
+ }
+
+ browseOptions.options = CMQC.MQGMO_BROWSE_NEXT | CMQC.MQGMO_NO_SYNCPOINT
+ | CMQC.MQGMO_FAIL_IF_QUIESCING;
}
return messages;
- } catch (JMSException e) {
+ } catch (MQException e) {
throw new MessagingDestinationException("Failed to browse queue: " + queueName, e);
+ } finally {
+ closeQueue(queue, queueName);
}
}
/**
- * Decode a JMS message to ReceivedMessage.
+ * Decode an IBM MQ message to ReceivedMessage.
*/
- private ReceivedMessage decodeMessage(Message jmsMessage, String queueName, MqMessageFormat format) {
- long timestamp;
- try {
- timestamp = jmsMessage.getJMSTimestamp();
- } catch (JMSException e) {
- timestamp = System.currentTimeMillis();
- }
+ private ReceivedMessage decodeMessage(MQMessage mqMessage, String queueName, MqMessageFormat format) {
+ long timestamp = mqMessage.putDateTime != null ? mqMessage.putDateTime.getTimeInMillis()
+ : System.currentTimeMillis();
if (timestamp == 0) {
timestamp = System.currentTimeMillis();
}
+ Map headers = new HashMap<>();
+ extractMqHeadersAndProperties(mqMessage, headers, queueName);
+
+ byte[] data = readMessageBody(mqMessage);
String body;
MessageContentType contentType;
- Map headers = new HashMap<>();
- // Extract JMS properties as headers
- extractJmsProperties(jmsMessage, headers);
-
- if (jmsMessage instanceof TextMessage textMessage) {
- try {
- body = textMessage.getText();
- } catch (JMSException e) {
- throw new RuntimeException("Failed to read text message body", e);
- }
- contentType = switch (format) {
- case XML -> MessageContentType.XML;
- default -> MessageContentType.JSON;
- };
- } else if (jmsMessage instanceof BytesMessage bytesMessage) {
- int ccsid;
- try {
- ccsid = bytesMessage.getIntProperty("CCSID");
- } catch (JMSException e) {
- ccsid = 1208; // default UTF-8
- }
- body = decodeBytesMessage(bytesMessage, ccsid);
+ switch (format) {
+ case XML -> {
+ body = new String(data, charsetFor(mqMessage.characterSet, UTF_8));
+ contentType = MessageContentType.XML;
+ }
+ case JSON -> {
+ body = new String(data, charsetFor(mqMessage.characterSet, UTF_8));
+ contentType = MessageContentType.JSON;
+ }
+ case EBCDIC_870 -> {
+ body = new String(data, EBCDIC_870);
contentType = MessageContentType.RAW_TEXT;
- } else {
- try {
- throw new IllegalArgumentException("Unsupported message type: " + jmsMessage.getJMSType());
- } catch (JMSException e) {
- throw new IllegalArgumentException("Unsupported message type", e);
- }
+ }
+ case UTF8_1208 -> {
+ body = new String(data, UTF_8);
+ contentType = MessageContentType.RAW_TEXT;
+ }
+ default -> {
+ body = new String(data, UTF_8);
+ contentType = MessageContentType.RAW_TEXT;
+ }
}
return new ReceivedMessage(body, contentType, headers, timestamp, queueName, null);
}
- /**
- * Decode BytesMessage body based on CCSID.
- */
- private String decodeBytesMessage(BytesMessage bytesMessage, int ccsid) {
+ private byte[] readMessageBody(MQMessage message) {
try {
- long bodyLength;
- try {
- bodyLength = bytesMessage.getBodyLength();
- } catch (JMSException e) {
- throw new RuntimeException("Failed to get message body length", e);
- }
- byte[] data = new byte[(int) bodyLength];
- bytesMessage.readBytes(data);
-
- Charset charset = switch (ccsid) {
- case 870 -> EBCDIC_870;
- case 1208 -> UTF_8;
- default -> UTF_8;
- };
-
- return new String(data, charset);
- } catch (JMSException e) {
- throw new RuntimeException("Failed to read BytesMessage body", e);
+ message.seek(0);
+ int length = message.getMessageLength();
+ byte[] data = new byte[length];
+ message.readFully(data);
+ return data;
+ } catch (EOFException e) {
+ throw new RuntimeException("Failed to seek message body", e);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to read message body", e);
}
}
/**
- * Extract JMS properties as headers.
+ * Extract MQ headers and message properties as headers.
*/
- @SuppressWarnings("unchecked")
- private void extractJmsProperties(Message message, Map headers) {
- try {
- // Common JMS headers
- headers.put("JMSMessageID", message.getJMSMessageID());
- try {
- headers.put("JMSType", message.getJMSType() != null ? message.getJMSType() : "");
- } catch (JMSException e) {
- headers.put("JMSType", "");
- }
- try {
- headers.put("JMSDestination",
- message.getJMSDestination() != null ? message.getJMSDestination().toString() : "");
- } catch (JMSException e) {
- headers.put("JMSDestination", "");
- }
- try {
- headers.put("JMSDeliveryMode", String.valueOf(message.getJMSDeliveryMode()));
- } catch (JMSException e) {
- headers.put("JMSDeliveryMode", "");
- }
- try {
- headers.put("JMSPriority", String.valueOf(message.getJMSPriority()));
- } catch (JMSException e) {
- headers.put("JMSPriority", "");
- }
- try {
- headers.put("JMSTimestamp", String.valueOf(message.getJMSTimestamp()));
- } catch (JMSException e) {
- headers.put("JMSTimestamp", "");
- }
+ private void extractMqHeadersAndProperties(MQMessage message, Map headers, String queueName) {
+ headers.put("JMSMessageID", toJmsId(message.messageId));
+ headers.put("JMSCorrelationID", toJmsId(message.correlationId));
+ headers.put("JMSType", getStringProperty(message, ImqRequest.PROP_JMS_TYPE, ""));
+ headers.put("JMSDestination", queueName);
+ headers.put("JMSDeliveryMode", String.valueOf(message.persistence));
+ headers.put("JMSPriority", String.valueOf(message.priority));
+ headers.put("JMSTimestamp",
+ message.putDateTime != null ? String.valueOf(message.putDateTime.getTimeInMillis()) : "");
+ headers.put("MQFormat", message.format != null ? message.format.trim() : "");
+ headers.put("MQCharacterSet", String.valueOf(message.characterSet));
+ headers.put("MQEncoding", String.valueOf(message.encoding));
+ headers.put("MQBackoutCount", String.valueOf(message.backoutCount));
+ headers.put("MQReplyToQueue", message.replyToQueueName != null ? message.replyToQueueName.trim() : "");
+ headers.put("MQReplyToQueueManager",
+ message.replyToQueueManagerName != null ? message.replyToQueueManagerName.trim() : "");
+ headers.put("MQUserId", message.userId != null ? message.userId.trim() : "");
- // Extract custom properties
- Enumeration propertyNames = (Enumeration) message.getPropertyNames();
- while (propertyNames.hasMoreElements()) {
- String propName = propertyNames.nextElement();
+ Enumeration propertyNames;
+ try {
+ propertyNames = message.getPropertyNames("%");
+ } catch (MQException e) {
+ LOG.warn("Failed to extract MQ properties", e);
+ return;
+ }
+
+ while (propertyNames.hasMoreElements()) {
+ String propName = propertyNames.nextElement();
+ try {
Object propValue = message.getObjectProperty(propName);
if (propValue != null) {
headers.put(propName, propValue.toString());
}
+ } catch (MQException e) {
+ LOG.warn("Failed to extract MQ property: {}", propName, e);
}
- } catch (JMSException e) {
- LOG.warn("Failed to extract JMS properties", e);
}
}
- /**
- * Get Queue object from queue name.
- */
- private javax.jms.Queue getQueue(String queueName) {
- return jmsContext.createQueue(queueName);
+ private void applyMessageProperties(MQMessage message, Map properties) {
+ if (properties == null) {
+ return;
+ }
+
+ for (Map.Entry entry : properties.entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+ try {
+ if (ImqRequest.PROP_JMS_CORRELATION_ID.equals(key)) {
+ message.correlationId = toMqId(value);
+ message.setStringProperty(key, value);
+ } else if (ImqRequest.PROP_JMS_MESSAGE_ID.equals(key)) {
+ message.messageId = toMqId(value);
+ } else if (ImqRequest.PROP_JMS_TYPE.equals(key)) {
+ message.setStringProperty(ImqRequest.PROP_JMS_TYPE, value);
+ } else {
+ message.setStringProperty(key, value);
+ }
+ } catch (MQException e) {
+ LOG.warn("Failed to set property: {}", key, e);
+ }
+ }
+ }
+
+ private String getStringProperty(MQMessage message, String property, String defaultValue) {
+ try {
+ String value = message.getStringProperty(property);
+ return value != null ? value : defaultValue;
+ } catch (MQException e) {
+ return defaultValue;
+ }
+ }
+
+ private MQQueue openQueue(String queueName, int openOptions) throws MQException {
+ ensureConnected();
+ return mqQueueManager.accessQueue(queueName, openOptions);
+ }
+
+ private void ensureConnected() {
+ if (mqQueueManager == null || !mqQueueManager.isConnected()) {
+ connect();
+ }
+ }
+
+ private void closeQueue(MQQueue queue, String queueName) {
+ if (queue == null) {
+ return;
+ }
+ try {
+ queue.close();
+ } catch (MQException e) {
+ LOG.warn("Failed to close IBM MQ queue: {}", queueName, e);
+ }
+ }
+
+ private int toWaitInterval(long timeoutMs) {
+ if (timeoutMs <= 0) {
+ return 0;
+ }
+ return timeoutMs > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) timeoutMs;
+ }
+
+ private Charset charsetFor(int ccsid, Charset defaultCharset) {
+ return switch (ccsid) {
+ case 870 -> EBCDIC_870;
+ case 1208 -> UTF_8;
+ default -> defaultCharset;
+ };
+ }
+
+ private byte[] toMqId(String value) {
+ if (value == null || value.isBlank()) {
+ return CMQC.MQMI_NONE.clone();
+ }
+
+ if (value.regionMatches(true, 0, JMS_ID_PREFIX, 0, JMS_ID_PREFIX.length())) {
+ String hex = value.substring(JMS_ID_PREFIX.length());
+ if (hex.length() == CMQC.MQ_MSG_ID_LENGTH * 2 && hex.matches("[0-9A-Fa-f]+")) {
+ return hexToBytes(hex);
+ }
+ }
+
+ byte[] id = CMQC.MQMI_NONE.clone();
+ byte[] source = value.getBytes(UTF_8);
+ System.arraycopy(source, 0, id, 0, Math.min(source.length, id.length));
+ return id;
+ }
+
+ private String toJmsId(byte[] value) {
+ if (value == null || value.length == 0 || Arrays.equals(value, CMQC.MQMI_NONE)) {
+ return "";
+ }
+
+ StringBuilder builder = new StringBuilder(JMS_ID_PREFIX);
+ for (byte b : value) {
+ builder.append(String.format(Locale.ROOT, "%02X", b));
+ }
+ return builder.toString();
+ }
+
+ private byte[] hexToBytes(String hex) {
+ byte[] data = new byte[hex.length() / 2];
+ for (int i = 0; i < data.length; i++) {
+ int index = i * 2;
+ data[i] = (byte) Integer.parseInt(hex.substring(index, index + 2), 16);
+ }
+ return data;
+ }
+
+ private void sleepQuietly(long millis) {
+ try {
+ Thread.sleep(millis);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new MessagingDestinationException("Interrupted while waiting for IBM MQ message", e);
+ }
}
@Override
public void close() {
- if (jmsContext != null) {
+ if (mqQueueManager != null && mqQueueManager.isConnected()) {
try {
- jmsContext.close();
+ mqQueueManager.disconnect();
LOG.info("Closed connection to IBM MQ: {}", queueManager);
- } catch (Exception e) {
+ } catch (MQException e) {
LOG.error("Failed to close IBM MQ connection", e);
}
}
@@ -477,7 +594,7 @@ public class IbmMqConnector implements Connector {
throw new IllegalStateException("Keystore not found: " + keystorePath);
}
keyStore.load(ksStream, keystorePassword.toCharArray());
-
+
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(keyStore, keystorePassword.toCharArray());
@@ -499,4 +616,34 @@ public class IbmMqConnector implements Connector {
return sslContext.getSocketFactory();
}
+
+ private record Selector(Map expectedValues) {
+
+ private static Selector parse(String selector) {
+ if (selector == null || selector.isBlank()) {
+ return new Selector(Map.of());
+ }
+
+ Map predicates = new HashMap<>();
+ for (String predicate : selector.split("(?i)\\s+AND\\s+")) {
+ Matcher matcher = SELECTOR_EQUALS_PATTERN.matcher(predicate);
+ if (!matcher.matches()) {
+ throw new IllegalArgumentException(
+ "Unsupported IBM MQ selector expression. Supported format: property = 'value' joined by AND. Selector: "
+ + selector);
+ }
+ predicates.put(matcher.group(1), matcher.group(2).replace("''", "'"));
+ }
+ return new Selector(predicates);
+ }
+
+ private boolean matches(Map headers) {
+ for (Map.Entry expectedValue : expectedValues.entrySet()) {
+ if (!expectedValue.getValue().equals(headers.get(expectedValue.getKey()))) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
}
diff --git a/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/KafkaConnector.java b/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/KafkaConnector.java
index 7ee7a3c..eb75b14 100644
--- a/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/KafkaConnector.java
+++ b/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/KafkaConnector.java
@@ -1,348 +1,369 @@
-package cz.moneta.test.harness.connectors.messaging;
-
-import java.nio.charset.StandardCharsets;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.function.Predicate;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import cz.moneta.test.harness.messaging.exception.MessagingConnectionException;
-import cz.moneta.test.harness.messaging.exception.MessagingDestinationException;
-import cz.moneta.test.harness.messaging.exception.MessagingSchemaException;
-import cz.moneta.test.harness.messaging.exception.MessagingTimeoutException;
-import cz.moneta.test.harness.support.messaging.kafka.MessageContentType;
-import cz.moneta.test.harness.support.messaging.kafka.ReceivedMessage;
-import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
-import io.confluent.kafka.serializers.KafkaAvroDeserializer;
-import io.confluent.kafka.serializers.KafkaAvroSerializer;
-
-/**
- * Kafka connector for sending and receiving messages.
- * Supports Avro serialization with Confluent Schema Registry.
- *
- * Uses manual partition assignment (no consumer group) for test isolation.
- * Each receive operation creates a new consumer to avoid offset sharing.
- */
-public class KafkaConnector implements cz.moneta.test.harness.connectors.Connector {
-
- private static final Logger LOG = LoggerFactory.getLogger(KafkaConnector.class);
-
- private final Properties producerConfig;
- private final Properties consumerConfig;
- private final String schemaRegistryUrl;
- private final CachedSchemaRegistryClient schemaRegistryClient;
- private KafkaProducer producer;
-
- /**
- * Creates a new KafkaConnector.
- *
- * @param bootstrapServers Kafka bootstrap servers
- * @param apiKey Kafka API key
- * @param apiSecret Kafka API secret
- * @param schemaRegistryUrl Schema Registry URL
- * @param schemaRegistryApiKey Schema Registry API key
- * @param schemaRegistryApiSecret Schema Registry API secret
- */
- public KafkaConnector(String bootstrapServers,
- String apiKey,
- String apiSecret,
- String schemaRegistryUrl,
- String schemaRegistryApiKey,
- String schemaRegistryApiSecret) {
- this.schemaRegistryUrl = schemaRegistryUrl;
- this.schemaRegistryClient = new CachedSchemaRegistryClient(
- Collections.singletonList(schemaRegistryUrl), 100, new HashMap<>());
-
- this.producerConfig = createProducerConfig(bootstrapServers, apiKey, apiSecret);
- this.consumerConfig = createConsumerConfig(bootstrapServers, schemaRegistryApiKey, schemaRegistryApiSecret);
- }
-
- /**
- * Creates producer configuration.
- */
- private Properties createProducerConfig(String bootstrapServers, String apiKey, String apiSecret) {
- Properties config = new Properties();
- config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
- config.put("schema.registry.url", schemaRegistryUrl);
- config.put(ProducerConfig.ACKS_CONFIG, "all");
- config.put(ProducerConfig.LINGER_MS_CONFIG, 1);
- config.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
-
-// SASL/PLAIN authentication
-// config.put("security.protocol", "SASL_SSL");
-// config.put("sasl.mechanism", "PLAIN");
-// config.put("sasl.jaas.config",
-// "org.apache.kafka.common.security.plain.PlainLoginModule required " +
-// "username=\"" + apiKey + "\" password=\"" + apiSecret + "\";");
-
- // SSL configuration
-// config.put("ssl.endpoint.identification.algorithm", "https");
-
- return config;
- }
-
- /**
- * Creates consumer configuration.
- */
- private Properties createConsumerConfig(String bootstrapServers, String apiKey, String apiSecret) {
- Properties config = new Properties();
- config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
- config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
- config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-
- // SASL/PLAIN authentication
- config.put("security.protocol", "SASL_SSL");
- config.put("sasl.mechanism", "PLAIN");
- config.put("sasl.jaas.config",
- "org.apache.kafka.common.security.plain.PlainLoginModule required " +
- "username=\"" + apiKey + "\" password=\"" + apiSecret + "\";");
-
- // Schema Registry for deserialization
- config.put("schema.registry.url", schemaRegistryUrl);
- config.put("specific.avro.reader", false);
-
- // SSL configuration
- config.put("ssl.endpoint.identification.algorithm", "https");
-
- return config;
- }
-
- /**
- * Sends a message to a Kafka topic.
- */
- public void send(String topic, String key, String jsonPayload, Map headers) {
- try {
- org.apache.avro.Schema schema = getSchemaForTopic(topic);
- GenericRecord record = jsonToAvro(jsonPayload, schema);
-
- ProducerRecord producerRecord =
- new ProducerRecord<>(topic, key, record);
-
- // Add headers
- if (headers != null) {
- Headers kafkaHeaders = producerRecord.headers();
- headers.forEach((k, v) ->
- kafkaHeaders.add(k, v.getBytes(StandardCharsets.UTF_8)));
- }
-
- // Send and wait for confirmation
- getProducer().send(producerRecord, (metadata, exception) -> {
- if (exception != null) {
- LOG.error("Failed to send message", exception);
- } else {
- LOG.debug("Message sent to topic {} partition {} offset {}",
- metadata.topic(), metadata.partition(), metadata.offset());
- }
- }).get(10, java.util.concurrent.TimeUnit.SECONDS);
-
- } catch (ExecutionException e) {
- throw new MessagingConnectionException(
- "Failed to send message to topic " + topic, e.getCause());
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new MessagingConnectionException(
- "Interrupted while sending message to topic " + topic, e);
- } catch (MessagingSchemaException e) {
- throw e;
- } catch (Exception e) {
- throw new MessagingSchemaException(
- "Failed to convert JSON to Avro for topic " + topic, e);
- }
- }
-
- /**
- * Receives a message from a Kafka topic matching the filter.
- */
- public List receive(String topic,
- Predicate filter,
- Duration timeout) {
- KafkaConsumer consumer = null;
- try {
- consumer = new KafkaConsumer<>(consumerConfig);
-
- // Get partitions for the topic
- List partitions = getPartitionsForTopic(consumer, topic);
- if (partitions.isEmpty()) {
- throw new MessagingDestinationException(
- "Topic '" + topic + "' does not exist or has no partitions");
- }
-
- // Assign partitions and seek to end
- consumer.assign(partitions);
-// consumer.seekToBeginning(partitions);
- consumer.seekToBeginning(partitions);
-
- // Poll loop with exponential backoff
- long startTime = System.currentTimeMillis();
- Duration pollInterval = Duration.ofMillis(100);
- Duration maxPollInterval = Duration.ofSeconds(1);
-
- while (Duration.ofMillis(System.currentTimeMillis() - startTime).compareTo(timeout) < 0) {
- ConsumerRecords records = consumer.poll(pollInterval);
-
- for (ConsumerRecord record : records) {
- ReceivedMessage message = convertToReceivedMessage(record, topic);
- if (filter.test(message)) {
- LOG.debug("Found matching message on topic {} partition {} offset {}",
- record.topic(), record.partition(), record.offset());
- return Collections.singletonList(message);
- }
- }
-
- // Exponential backoff
- pollInterval = Duration.ofMillis(
- Math.min(pollInterval.toMillis() * 2, maxPollInterval.toMillis()));
- }
-
- throw new MessagingTimeoutException(
- "No message matching filter found on topic '" + topic + "' within " + timeout);
-
- } finally {
- if (consumer != null) {
- consumer.close();
- }
- }
- }
-
- /**
- * Gets partitions for a topic.
- */
- private List getPartitionsForTopic(KafkaConsumer, ?> consumer, String topic) {
- List partitions = new ArrayList<>();
- List partitionInfos = consumer.partitionsFor(topic);
- if (partitionInfos != null) {
- for (org.apache.kafka.common.PartitionInfo partitionInfo : partitionInfos) {
- partitions.add(new TopicPartition(topic, partitionInfo.partition()));
- }
- }
- return partitions;
- }
-
- /**
- * Saves current offsets for a topic.
- */
- public Map saveOffsets(String topic) {
- return new HashMap<>();
- }
-
- /**
- * Closes the connector and releases resources.
- */
- @Override
- public void close() {
- if (producer != null) {
- producer.close();
- }
- }
-
- /**
- * Gets or creates the producer (singleton, thread-safe).
- */
- private KafkaProducer getProducer() {
- if (producer == null) {
- synchronized (this) {
- if (producer == null) {
- producer = new KafkaProducer<>(producerConfig);
- }
- }
- }
- return producer;
- }
-
- /**
- * Retrieves schema from Schema Registry based on topic name.
- */
- private org.apache.avro.Schema getSchemaForTopic(String topic) {
- String subject = topic + "-value";
- try {
- io.confluent.kafka.schemaregistry.client.SchemaMetadata metadata =
- schemaRegistryClient.getLatestSchemaMetadata(subject);
- return new org.apache.avro.Schema.Parser().parse(metadata.getSchema());
- } catch (Exception e) {
- if (e.getMessage() != null && e.getMessage().contains("404")) {
- throw new MessagingSchemaException(
- "Schema not found for subject '" + subject + "' in Schema Registry. " +
- "Make sure the topic exists and schema is registered.");
- }
- throw new MessagingSchemaException(
- "Failed to retrieve schema for topic '" + topic + "'", e);
- }
- }
-
- /**
- * Converts JSON string to Avro GenericRecord.
- */
- private GenericRecord jsonToAvro(String json, org.apache.avro.Schema schema) {
- try {
- GenericRecord genericRecord = JsonToAvroConverter.processJson(json, schema);
- return genericRecord;
- } catch (Exception e) {
- throw new MessagingSchemaException("Failed to convert JSON to Avro: " + e.getMessage(), e);
- }
- }
-
- /**
- * Converts Kafka ConsumerRecord to ReceivedMessage.
- */
- private ReceivedMessage convertToReceivedMessage(ConsumerRecord record, String topic) {
- try {
- String jsonBody = avroToJson(record.value());
-
- Map headers = new HashMap<>();
- Headers kafkaHeaders = record.headers();
- for (org.apache.kafka.common.header.Header header : kafkaHeaders) {
- if (header.value() != null) {
- headers.put(header.key(), new String(header.value(), StandardCharsets.UTF_8));
- }
- }
-
- return ReceivedMessage.builder()
- .body(jsonBody)
- .contentType(MessageContentType.JSON)
- .headers(headers)
- .timestamp(record.timestamp())
- .source(topic)
- .key(record.key())
- .build();
-
- } catch (Exception e) {
- LOG.error("Failed to convert Avro record to ReceivedMessage", e);
- throw new RuntimeException("Failed to convert message", e);
- }
- }
-
- /**
- * Converts Avro GenericRecord to JSON string.
- */
- private String avroToJson(GenericRecord record) {
- try {
- return record.toString();
- } catch (Exception e) {
- throw new RuntimeException("Failed to convert Avro to JSON: " + e.getMessage(), e);
- }
- }
-}
+package cz.moneta.test.harness.connectors.messaging;
+
+import cz.moneta.test.harness.connectors.messaging.kafkautils.CustomKafkaAvroDeserializer;
+import cz.moneta.test.harness.connectors.messaging.kafkautils.JsonToAvroConverter;
+import cz.moneta.test.harness.support.messaging.exception.MessagingConnectionException;
+import cz.moneta.test.harness.support.messaging.exception.MessagingDestinationException;
+import cz.moneta.test.harness.support.messaging.exception.MessagingSchemaException;
+import cz.moneta.test.harness.support.messaging.exception.MessagingTimeoutException;
+import cz.moneta.test.harness.support.messaging.kafka.MessageContentType;
+import cz.moneta.test.harness.support.messaging.kafka.ReceivedMessage;
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
+import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Predicate;
+
+/**
+ * Kafka connector for sending and receiving messages.
+ * Supports Avro serialization with Confluent Schema Registry.
+ *
+ * Uses manual partition assignment (no consumer group) for test isolation.
+ * Each receive operation creates a new consumer to avoid offset sharing.
+ */
+public class KafkaConnector implements cz.moneta.test.harness.connectors.Connector {
+
+ private static final Logger LOG = LogManager.getLogger(KafkaConnector.class);
+
+ private final Properties producerConfig;
+ private final Properties consumerConfig;
+ private final String schemaRegistryUrl;
+ private final CachedSchemaRegistryClient schemaRegistryClient;
+ private KafkaProducer producer;
+
+ /**
+ * Creates a new KafkaConnector.
+ *
+ * @param bootstrapServers Kafka bootstrap servers
+ * @param apiKey Kafka API key
+ * @param apiSecret Kafka API secret
+ * @param schemaRegistryUrl Schema Registry URL
+ * @param schemaRegistryApiKey Schema Registry API key
+ * @param schemaRegistryApiSecret Schema Registry API secret
+ */
+ public KafkaConnector(String bootstrapServers,
+ String apiKey,
+ String apiSecret,
+ String schemaRegistryUrl,
+ String schemaRegistryApiKey,
+ String schemaRegistryApiSecret) {
+ this.schemaRegistryUrl = schemaRegistryUrl;
+
+ HashMap schemaRegistryProps = new HashMap<>();
+ schemaRegistryProps.put(SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
+ schemaRegistryProps.put(SchemaRegistryClientConfig.USER_INFO_CONFIG, schemaRegistryApiKey + ":" + schemaRegistryApiSecret);
+ this.schemaRegistryClient = new CachedSchemaRegistryClient(
+ Collections.singletonList(schemaRegistryUrl), 100, schemaRegistryProps);
+
+ this.producerConfig = createProducerConfig(bootstrapServers, apiKey, apiSecret, schemaRegistryApiKey, schemaRegistryApiSecret);
+ this.consumerConfig = createConsumerConfig(bootstrapServers, apiKey, apiSecret, schemaRegistryApiKey, schemaRegistryApiSecret);
+ }
+
+ /**
+ * Creates producer configuration.
+ */
+ private Properties createProducerConfig(String bootstrapServers, String apiKey, String apiSecret,
+ String schemaRegistryApiKey, String schemaRegistryApiSecret) {
+ Properties config = new Properties();
+ config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
+ config.put(ProducerConfig.ACKS_CONFIG, "all");
+ config.put(ProducerConfig.LINGER_MS_CONFIG, 1);
+ config.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
+
+ config.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false);
+ config.put(AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, true);
+ config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
+ config.put(SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
+ config.put(SchemaRegistryClientConfig.USER_INFO_CONFIG, schemaRegistryApiKey + ":" + schemaRegistryApiSecret);
+
+// SASL/PLAIN authentication
+ config.put("security.protocol", "SASL_SSL");
+ config.put("sasl.mechanism", "PLAIN");
+ config.put("sasl.jaas.config",
+ "org.apache.kafka.common.security.plain.PlainLoginModule required " +
+ "username=\"" + apiKey + "\" password=\"" + apiSecret + "\";");
+
+// SSL configuration
+ config.put("ssl.endpoint.identification.algorithm", "https");
+
+ return config;
+ }
+
+ /**
+ * Creates consumer configuration.
+ */
+ private Properties createConsumerConfig(String bootstrapServers, String apiKey, String apiSecret,
+ String schemaRegistryApiKey, String schemaRegistryApiSecret) {
+ Properties config = new Properties();
+ config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomKafkaAvroDeserializer.class);
+ config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
+ config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+
+ // SASL/PLAIN authentication
+ config.put("security.protocol", "SASL_SSL");
+ config.put("sasl.mechanism", "PLAIN");
+ config.put("sasl.jaas.config",
+ "org.apache.kafka.common.security.plain.PlainLoginModule required " +
+ "username=\"" + apiKey + "\" password=\"" + apiSecret + "\";");
+
+ // Schema Registry for deserialization
+
+ config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
+ config.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false);
+ config.put(AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, true);
+ config.put(SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
+ config.put(SchemaRegistryClientConfig.USER_INFO_CONFIG, schemaRegistryApiKey + ":" + schemaRegistryApiSecret);
+ config.put("specific.avro.reader", false);
+
+ // SSL configuration
+ config.put("ssl.endpoint.identification.algorithm", "https");
+
+ return config;
+ }
+
+ /**
+ * Sends a message to a Kafka topic.
+ */
+ public void send(String topic, String key, String jsonPayload, Map headers) {
+ try {
+ org.apache.avro.Schema schema = getSchemaForTopic(topic);
+ GenericRecord record = jsonToAvro(jsonPayload, schema);
+
+ ProducerRecord producerRecord =
+ new ProducerRecord<>(topic, key, record);
+
+ // Add headers
+ if (headers != null) {
+ Headers kafkaHeaders = producerRecord.headers();
+ headers.forEach((k, v) ->
+ kafkaHeaders.add(k, v.getBytes(StandardCharsets.UTF_8)));
+ }
+
+ // Send and wait for confirmation
+ getProducer().send(producerRecord, (metadata, exception) -> {
+ if (exception != null) {
+ LOG.error("Failed to send message", exception);
+ } else {
+ LOG.debug("Message sent to topic {} partition {} offset {}",
+ metadata.topic(), metadata.partition(), metadata.offset());
+ }
+ }).get(10, java.util.concurrent.TimeUnit.SECONDS);
+
+ } catch (ExecutionException e) {
+ throw new MessagingConnectionException(
+ "Failed to send message to topic " + topic, e.getCause());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new MessagingConnectionException(
+ "Interrupted while sending message to topic " + topic, e);
+ } catch (MessagingSchemaException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new MessagingSchemaException(
+ "Failed to convert JSON to Avro for topic " + topic, e);
+ }
+ }
+
+ /**
+ * Receives a message from a Kafka topic matching the filter.
+ */
+ public List receive(String topic,
+ Predicate filter,
+ Duration timeout, boolean startFromBeginning) {
+ KafkaConsumer consumer = null;
+ try {
+ consumer = new KafkaConsumer<>(consumerConfig);
+
+ // Get partitions for the topic
+ List partitions = getPartitionsForTopic(consumer, topic);
+ if (partitions.isEmpty()) {
+ throw new MessagingDestinationException(
+ "Topic '" + topic + "' does not exist or has no partitions");
+ }
+
+ consumer.assign(partitions);
+ if (startFromBeginning) {
+ consumer.seekToBeginning(partitions);
+ } else {
+ consumer.seekToEnd(partitions);
+ }
+
+ // Poll loop with exponential backoff
+ long startTime = System.currentTimeMillis();
+ Duration pollInterval = Duration.ofMillis(100);
+ Duration maxPollInterval = Duration.ofSeconds(1);
+
+ while (Duration.ofMillis(System.currentTimeMillis() - startTime).compareTo(timeout) < 0) {
+ ConsumerRecords records = consumer.poll(pollInterval);
+
+ for (ConsumerRecord record : records) {
+ ReceivedMessage message = convertToReceivedMessage(record, topic);
+ if (filter.test(message)) {
+ LOG.debug("Found matching message on topic {} partition {} offset {}",
+ record.topic(), record.partition(), record.offset());
+ return Collections.singletonList(message);
+ }
+ }
+
+ // Exponential backoff
+ pollInterval = Duration.ofMillis(
+ Math.min(pollInterval.toMillis() * 2, maxPollInterval.toMillis()));
+ }
+
+ throw new MessagingTimeoutException(
+ "No message matching filter found on topic '" + topic + "' within " + timeout);
+
+ } finally {
+ if (consumer != null) {
+ consumer.close();
+ }
+ }
+ }
+
+ /**
+ * Gets partitions for a topic.
+ */
+ private List getPartitionsForTopic(KafkaConsumer, ?> consumer, String topic) {
+ List partitions = new ArrayList<>();
+ List partitionInfos = consumer.partitionsFor(topic);
+ if (partitionInfos != null) {
+ for (org.apache.kafka.common.PartitionInfo partitionInfo : partitionInfos) {
+ partitions.add(new TopicPartition(topic, partitionInfo.partition()));
+ }
+ }
+ return partitions;
+ }
+
+ /**
+ * Saves current offsets for a topic.
+ */
+ public Map saveOffsets(String topic) {
+ return new HashMap<>();
+ }
+
+ /**
+ * Closes the connector and releases resources.
+ */
+ @Override
+ public void close() {
+ if (producer != null) {
+ producer.close();
+ }
+ }
+
+ /**
+ * Gets or creates the producer (singleton, thread-safe).
+ */
+ private KafkaProducer getProducer() {
+ if (producer == null) {
+ synchronized (this) {
+ if (producer == null) {
+ producer = new KafkaProducer<>(producerConfig);
+ }
+ }
+ }
+ return producer;
+ }
+
+ /**
+ * Retrieves schema from Schema Registry based on topic name.
+ */
+ private org.apache.avro.Schema getSchemaForTopic(String topic) {
+ String subject = topic + "-value";
+ try {
+ io.confluent.kafka.schemaregistry.client.SchemaMetadata metadata =
+ schemaRegistryClient.getLatestSchemaMetadata(subject);
+ return new org.apache.avro.Schema.Parser().parse(metadata.getSchema());
+ } catch (Exception e) {
+ if (e.getMessage() != null && e.getMessage().contains("404")) {
+ throw new MessagingSchemaException(
+ "Schema not found for subject '" + subject + "' in Schema Registry. " +
+ "Make sure the topic exists and schema is registered.");
+ }
+ throw new MessagingSchemaException(
+ "Failed to retrieve schema for topic '" + topic + "'", e);
+ }
+ }
+
+ /**
+ * Converts JSON string to Avro GenericRecord.
+ */
+ private GenericRecord jsonToAvro(String json, org.apache.avro.Schema schema) {
+ try {
+ GenericRecord genericRecord = JsonToAvroConverter.processJson(json, schema);
+ return genericRecord;
+ } catch (Exception e) {
+ throw new MessagingSchemaException("Failed to convert JSON to Avro: " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Converts Kafka ConsumerRecord to ReceivedMessage.
+ */
+ private ReceivedMessage convertToReceivedMessage(ConsumerRecord record, String topic) {
+ try {
+ String jsonBody;
+ if (null != record.value()) {
+ jsonBody = avroToJson(record.value());
+ } else {
+ jsonBody = "";
+ }
+ Map headers = new HashMap<>();
+ Headers kafkaHeaders = record.headers();
+ for (org.apache.kafka.common.header.Header header : kafkaHeaders) {
+ if (header.value() != null) {
+ headers.put(header.key(), new String(header.value(), StandardCharsets.UTF_8));
+ }
+ }
+
+ return ReceivedMessage.builder()
+ .body(jsonBody)
+ .contentType(MessageContentType.JSON)
+ .headers(headers)
+ .timestamp(record.timestamp())
+ .source(topic)
+ .key(record.key())
+ .build();
+
+ } catch (Exception e) {
+ LOG.error("Failed to convert Avro record to ReceivedMessage", e);
+ throw new RuntimeException("Failed to convert message", e);
+ }
+ }
+
+ /**
+ * Converts Avro GenericRecord to JSON string.
+ */
+ private String avroToJson(GenericRecord record) {
+ try {
+
+ return record.toString();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to convert Avro to JSON: " + e.getMessage(), e);
+ }
+ }
+}
diff --git a/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/kafkautils/CustomKafkaAvroDeserializer.java b/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/kafkautils/CustomKafkaAvroDeserializer.java
new file mode 100644
index 0000000..19262e8
--- /dev/null
+++ b/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/kafkautils/CustomKafkaAvroDeserializer.java
@@ -0,0 +1,54 @@
+package cz.moneta.test.harness.connectors.messaging.kafkautils;
+
+import cz.moneta.test.harness.connectors.messaging.KafkaConnector;
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import org.apache.avro.Schema;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class CustomKafkaAvroDeserializer extends KafkaAvroDeserializer {
+
+ private static final Logger LOG = LogManager.getLogger(KafkaConnector.class);
+
+ @Override
+ public Object deserialize(String topic, byte[] bytes) {
+ try {
+ return super.deserialize(topic, bytes);
+ } catch (SerializationException e) {
+ LOG.error("Message deserialization by avro schema failed", e);
+ return null;
+ }
+ }
+
+ @Override
+ public Object deserialize(String topic, Headers headers, byte[] bytes) {
+ try {
+ return super.deserialize(topic, headers, bytes);
+ } catch (SerializationException e) {
+ LOG.error("Message deserialization by avro schema failed", e);
+ return null;
+ }
+ }
+
+ @Override
+ public Object deserialize(String topic, byte[] bytes, Schema readerSchema) {
+ try {
+ return super.deserialize(topic, bytes, readerSchema);
+ } catch (SerializationException e) {
+ LOG.error("Message deserialization by avro schema failed", e);
+ return null;
+ }
+ }
+
+ @Override
+ public Object deserialize(String topic, Headers headers, byte[] bytes, Schema readerSchema) {
+ try {
+ return super.deserialize(topic, headers, bytes, readerSchema);
+ } catch (SerializationException e) {
+ LOG.error("Message deserialization by avro schema failed", e);
+ return null;
+ }
+ }
+}
diff --git a/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/JsonToAvroConverter.java b/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/kafkautils/JsonToAvroConverter.java
similarity index 90%
rename from test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/JsonToAvroConverter.java
rename to test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/kafkautils/JsonToAvroConverter.java
index 1f9f093..f18b7a7 100644
--- a/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/JsonToAvroConverter.java
+++ b/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/kafkautils/JsonToAvroConverter.java
@@ -1,115 +1,109 @@
-package cz.moneta.test.harness.connectors.messaging;
-
-import com.google.gson.*;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.io.Decoder;
-import org.apache.avro.io.JsonDecoder;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class JsonToAvroConverter {
-
- protected static GenericRecord processJson(String json, Schema schema) throws IllegalArgumentException, JsonSchemaException {
- GenericRecord result = (GenericRecord) jsonElementToAvro(JsonParser.parseString(json), schema);
- return result;
- }
-
- private static Object jsonElementToAvro(JsonElement element, Schema schema) throws JsonSchemaException {
- boolean schemaIsNullable = isNullable(schema);
- if (schemaIsNullable) {
- schema = typeFromNullable(schema);
- }
- if (element == null || element.isJsonNull()) {
- if (!schemaIsNullable) {
- throw new JsonSchemaException("The element is not nullable in Avro schema.");
- }
- return null;
- } else if (element.isJsonObject()) {
- if (schema.getType() != Schema.Type.RECORD) {
- throw new JsonSchemaException(
- String.format("The element `%s` doesn't match Avro type RECORD", element));
- }
- return jsonObjectToAvro(element.getAsJsonObject(), schema);
- } else if (element.isJsonArray()) {
- if (schema.getType() != Schema.Type.ARRAY) {
- throw new JsonSchemaException(
- String.format("The element `%s` doesn't match Avro type ARRAY", element));
- }
- JsonArray jsonArray = element.getAsJsonArray();
- List