added IBM MQ connector

This commit is contained in:
Radek Davidek 2026-03-16 18:59:21 +01:00
parent 6cf0aab157
commit ef4562ab74
15 changed files with 2775 additions and 0 deletions

File diff suppressed because it is too large Load Diff

View File

@ -29,6 +29,8 @@
<commons-beanutils.version>1.9.3</commons-beanutils.version>
<commons-configuration.version>1.6</commons-configuration.version>
<cxf.version>4.0.3</cxf.version>
<ibm.mq.version>9.4.5.0</ibm.mq.version>
<javax.jms.version>2.0.1</javax.jms.version>
</properties>
<dependencies>
@ -285,6 +287,18 @@
<version>${cxf.version}</version>
</dependency>
<!-- Used in IbmMqConnector -->
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>com.ibm.mq.allclient</artifactId>
<version>${ibm.mq.version}</version>
</dependency>
<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
<version>${javax.jms.version}</version>
</dependency>
<!-- Used in Wso2ConnectorServlet -->
<dependency>
<groupId>javax.servlet</groupId>

View File

@ -0,0 +1,447 @@
package cz.moneta.test.harness.connectors.messaging;
import cz.moneta.test.harness.connectors.Connector;
import cz.moneta.test.harness.messaging.MessageContentType;
import cz.moneta.test.harness.messaging.MqMessageFormat;
import cz.moneta.test.harness.messaging.ReceivedMessage;
import cz.moneta.test.harness.messaging.exception.MessagingConnectionException;
import cz.moneta.test.harness.messaging.exception.MessagingDestinationException;
import cz.moneta.test.harness.messaging.exception.MessagingTimeoutException;
import com.ibm.mq.jms.MQConnectionFactory;
import com.ibm.msg.client.wmq.WMQConstants;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import javax.jms.*;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* IBM MQ connector using JMS client with Jakarta JMS API.
* Supports multi-instance Queue Manager, SSL/TLS, and multiple message formats.
* <p>
* Supported formats:
* - JSON: JMS TextMessage with plain JSON string (default)
* - XML: JMS TextMessage with XML string
* - UTF-8 (CCSID 1208): JMS BytesMessage with UTF-8 encoding
* - EBCDIC (CCSID 870): JMS BytesMessage with EBCDIC IBM-870 encoding
*/
public class IbmMqConnector implements Connector {
private static final Logger LOG = LogManager.getLogger(IbmMqConnector.class);
private static final Charset EBCDIC_870 = Charset.forName("IBM870");
private static final Charset UTF_8 = StandardCharsets.UTF_8;
private static final long DEFAULT_POLL_INTERVAL_MS = 100;
private static final long DEFAULT_MAX_POLL_INTERVAL_MS = 1000;
private final MQConnectionFactory connectionFactory;
private JMSContext jmsContext;
private final String queueManager;
/**
* Constructor with multi-instance Queue Manager support.
*
* @param connectionNameList Connection name list in format "host1(port1),host2(port2)"
* @param channel MQ channel name
* @param queueManager Queue Manager name
* @param user Username for authentication
* @param password Password for authentication
* @param keystorePath Path to SSL keystore (can be null for non-SSL)
* @param keystorePassword Password for SSL keystore
* @param sslCipherSuite SSL cipher suite to use (e.g., "TLS_RSA_WITH_AES_256_CBC_SHA256")
*/
public IbmMqConnector(String connectionNameList, String channel, String queueManager,
String user, String password,
String keystorePath, String keystorePassword, String sslCipherSuite) {
this.queueManager = queueManager;
try {
MQConnectionFactory cf = new MQConnectionFactory();
// Set connection name list for multi-instance QMGR
cf.setChannel(channel);
cf.setQueueManager(queueManager);
cf.setConnectionNameList(connectionNameList);
cf.setTransportType(WMQConstants.WMQ_CM_CLIENT);
// Set authentication
cf.setStringProperty(WMQConstants.USERID, user);
cf.setStringProperty(WMQConstants.PASSWORD, password);
// SSL configuration if keystore is provided
if (StringUtils.isNotBlank(keystorePath)) {
System.setProperty("javax.net.ssl.keyStore", keystorePath);
System.setProperty("javax.net.ssl.trustStore", keystorePath);
if (StringUtils.isNotBlank(keystorePassword)) {
System.setProperty("javax.net.ssl.keyStorePassword", keystorePassword);
System.setProperty("javax.net.ssl.trustStorePassword", keystorePassword);
}
if (StringUtils.isNotBlank(sslCipherSuite)) {
cf.setSSLCipherSuite(sslCipherSuite);
}
}
this.connectionFactory = cf;
// Initialize JMS context
connect();
} catch (Exception e) {
throw new MessagingConnectionException(
"Failed to create IBM MQ connection to " + queueManager, e);
}
}
/**
* Connect to IBM MQ.
*/
private void connect() {
try {
this.jmsContext = connectionFactory.createContext();
this.jmsContext.start();
LOG.info("Connected to IBM MQ: {}", queueManager);
} catch (Exception e) {
throw new MessagingConnectionException(
"Failed to connect to IBM MQ: " + queueManager + " - " + e.getMessage(), e);
}
}
/**
* Send a JSON or XML message as TextMessage.
*/
private void sendTextMessage(String queueName, String payload, Map<String, String> properties) {
javax.jms.Queue queue = getQueue(queueName);
TextMessage message = jmsContext.createTextMessage(payload);
// Set JMS properties
if (properties != null) {
for (Map.Entry<String, String> entry : properties.entrySet()) {
try {
message.setStringProperty(entry.getKey(), entry.getValue());
} catch (JMSException e) {
LOG.warn("Failed to set property: {}", entry.getKey(), e);
}
}
}
try {
jmsContext.createProducer().send(queue, message);
} catch (RuntimeException e) {
throw new MessagingDestinationException("Failed to send message to queue: " + queueName, e);
}
LOG.debug("Sent JSON/XML message to queue: {}", queueName);
}
/**
* Send a message as BytesMessage with specific encoding and CCSID.
*/
private void sendBytesMessage(String queueName, String payload, Charset charset,
int ccsid, Map<String, String> properties) {
javax.jms.Queue queue = getQueue(queueName);
BytesMessage message = jmsContext.createBytesMessage();
// Convert payload to bytes using specified charset
byte[] bytes = payload.getBytes(charset);
try {
message.writeBytes(bytes);
message.setIntProperty("CCSID", ccsid);
} catch (JMSException e) {
throw new MessagingDestinationException("Failed to create bytes message", e);
}
// Set JMS properties
if (properties != null) {
for (Map.Entry<String, String> entry : properties.entrySet()) {
try {
message.setStringProperty(entry.getKey(), entry.getValue());
} catch (JMSException e) {
LOG.warn("Failed to set property: {}", entry.getKey(), e);
}
}
}
try {
jmsContext.createProducer().send(queue, message);
} catch (RuntimeException e) {
throw new MessagingDestinationException("Failed to send message to queue: " + queueName, e);
}
LOG.debug("Sent {} message to queue: {}", charset, queueName);
}
/**
* Send a message to a queue with specified format.
*
* @param queueName Queue name
* @param payload Message payload
* @param format Message format (JSON, XML, EBCDIC_870, UTF8_1208)
* @param properties JMS properties to set
*/
public void send(String queueName, String payload, MqMessageFormat format,
Map<String, String> properties) {
switch (format) {
case JSON, XML -> sendTextMessage(queueName, payload, properties);
case EBCDIC_870 -> sendBytesMessage(queueName, payload, EBCDIC_870, 870, properties);
case UTF8_1208 -> sendBytesMessage(queueName, payload, UTF_8, 1208, properties);
}
}
/**
* Receive a message from a queue with timeout.
*
* @param queueName Queue name
* @param messageSelector JMS message selector (optional)
* @param format Expected message format
* @param timeout Timeout duration
* @return Received message
*/
public ReceivedMessage receive(String queueName, String messageSelector,
MqMessageFormat format, java.time.Duration timeout) {
long timeoutMs = timeout.toMillis();
javax.jms.Queue queue = getQueue(queueName);
MessageConsumer consumer = (MessageConsumer) (messageSelector == null || messageSelector.isBlank()
? jmsContext.createConsumer(queue)
: jmsContext.createConsumer(queue, messageSelector));
AtomicBoolean messageFound = new AtomicBoolean(false);
ReceivedMessage received = null;
long pollInterval = DEFAULT_POLL_INTERVAL_MS;
long remainingTime = timeoutMs;
try {
while (remainingTime > 0 && !messageFound.get()) {
Message message = consumer.receive(remainingTime);
if (message != null) {
received = decodeMessage(message, queueName, format);
messageFound.set(true);
} else {
// Exponential backoff
pollInterval = Math.min(pollInterval * 2, DEFAULT_MAX_POLL_INTERVAL_MS);
remainingTime -= pollInterval;
}
}
if (received == null) {
throw new MessagingTimeoutException(
"No message matching filter found on queue '" + queueName +
"' within " + timeout.toMillis() + "ms");
}
return received;
} catch (MessagingTimeoutException e) {
throw e;
} catch (Exception e) {
throw new MessagingDestinationException("Failed to receive message from queue: " + queueName, e);
} finally {
try {
consumer.close();
} catch (JMSException e) {
LOG.warn("Failed to close consumer", e);
}
}
}
/**
* Browse a queue (non-destructive read).
*
* @param queueName Queue name
* @param messageSelector JMS message selector (optional)
* @param format Expected message format
* @param maxMessages Maximum number of messages to browse
* @return List of received messages
*/
public List<ReceivedMessage> browse(String queueName, String messageSelector,
MqMessageFormat format, int maxMessages) {
List<ReceivedMessage> messages = new ArrayList<>();
javax.jms.Queue queue = getQueue(queueName);
MessageConsumer consumer = (MessageConsumer) (messageSelector == null || messageSelector.isBlank()
? jmsContext.createConsumer(queue)
: jmsContext.createConsumer(queue, messageSelector));
int count = 0;
try {
while (count < maxMessages) {
Message message = consumer.receiveNoWait();
if (message == null) {
break;
}
ReceivedMessage received = decodeMessage(message, queueName, format);
messages.add(received);
count++;
}
return messages;
} catch (Exception e) {
throw new MessagingDestinationException("Failed to browse queue: " + queueName, e);
} finally {
try {
consumer.close();
} catch (JMSException e) {
LOG.warn("Failed to close consumer", e);
}
}
}
/**
* Decode a JMS message to ReceivedMessage.
*/
private ReceivedMessage decodeMessage(Message jmsMessage, String queueName, MqMessageFormat format) {
long timestamp;
try {
timestamp = jmsMessage.getJMSTimestamp();
} catch (JMSException e) {
timestamp = System.currentTimeMillis();
}
if (timestamp == 0) {
timestamp = System.currentTimeMillis();
}
String body;
MessageContentType contentType;
Map<String, String> headers = new HashMap<>();
// Extract JMS properties as headers
extractJmsProperties(jmsMessage, headers);
if (jmsMessage instanceof TextMessage textMessage) {
try {
body = textMessage.getText();
} catch (JMSException e) {
throw new RuntimeException("Failed to read text message body", e);
}
contentType = switch (format) {
case XML -> MessageContentType.XML;
default -> MessageContentType.JSON;
};
} else if (jmsMessage instanceof BytesMessage bytesMessage) {
int ccsid;
try {
ccsid = bytesMessage.getIntProperty("CCSID");
} catch (JMSException e) {
ccsid = 1208; // default UTF-8
}
body = decodeBytesMessage(bytesMessage, ccsid);
contentType = MessageContentType.RAW_TEXT;
} else {
try {
throw new IllegalArgumentException("Unsupported message type: " + jmsMessage.getJMSType());
} catch (JMSException e) {
throw new IllegalArgumentException("Unsupported message type", e);
}
}
return new ReceivedMessage(body, contentType, headers, timestamp, queueName, null);
}
/**
* Decode BytesMessage body based on CCSID.
*/
private String decodeBytesMessage(BytesMessage bytesMessage, int ccsid) {
try {
long bodyLength;
try {
bodyLength = bytesMessage.getBodyLength();
} catch (JMSException e) {
throw new RuntimeException("Failed to get message body length", e);
}
byte[] data = new byte[(int) bodyLength];
bytesMessage.readBytes(data);
Charset charset = switch (ccsid) {
case 870 -> EBCDIC_870;
case 1208 -> UTF_8;
default -> UTF_8;
};
return new String(data, charset);
} catch (JMSException e) {
throw new RuntimeException("Failed to read BytesMessage body", e);
}
}
/**
* Extract JMS properties as headers.
*/
@SuppressWarnings("unchecked")
private void extractJmsProperties(Message message, Map<String, String> headers) {
try {
// Common JMS headers
headers.put("JMSMessageID", message.getJMSMessageID());
try {
headers.put("JMSType", message.getJMSType() != null ? message.getJMSType() : "");
} catch (JMSException e) {
headers.put("JMSType", "");
}
try {
headers.put("JMSDestination", message.getJMSDestination() != null ?
message.getJMSDestination().toString() : "");
} catch (JMSException e) {
headers.put("JMSDestination", "");
}
try {
headers.put("JMSDeliveryMode", String.valueOf(message.getJMSDeliveryMode()));
} catch (JMSException e) {
headers.put("JMSDeliveryMode", "");
}
try {
headers.put("JMSPriority", String.valueOf(message.getJMSPriority()));
} catch (JMSException e) {
headers.put("JMSPriority", "");
}
try {
headers.put("JMSTimestamp", String.valueOf(message.getJMSTimestamp()));
} catch (JMSException e) {
headers.put("JMSTimestamp", "");
}
// Extract custom properties
Enumeration<String> propertyNames = (Enumeration<String>) message.getPropertyNames();
while (propertyNames.hasMoreElements()) {
String propName = propertyNames.nextElement();
Object propValue = message.getObjectProperty(propName);
if (propValue != null) {
headers.put(propName, propValue.toString());
}
}
} catch (JMSException e) {
LOG.warn("Failed to extract JMS properties", e);
}
}
/**
* Get Queue object from queue name.
*/
private javax.jms.Queue getQueue(String queueName) {
return jmsContext.createQueue(queueName);
}
@Override
public void close() {
if (jmsContext != null) {
try {
jmsContext.close();
LOG.info("Closed connection to IBM MQ: {}", queueManager);
} catch (Exception e) {
LOG.error("Failed to close IBM MQ connection", e);
}
}
}
}

View File

@ -0,0 +1,210 @@
package cz.moneta.test.harness.endpoints.imq;
import cz.moneta.test.harness.connectors.messaging.IbmMqConnector;
import cz.moneta.test.harness.context.StoreAccessor;
import cz.moneta.test.harness.endpoints.Endpoint;
import cz.moneta.test.harness.messaging.MqMessageFormat;
import cz.moneta.test.harness.messaging.ReceivedMessage;
import cz.moneta.test.harness.connectors.VaultConnector;
import cz.moneta.test.harness.support.auth.Credentials;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
/**
* IBM MQ First Vision endpoint.
* Provides high-level access to IBM MQ queues with configuration from StoreAccessor.
* <p>
* Credentials are loaded from HashiCorp Vault.
*/
public class ImqFirstVisionEndpoint implements Endpoint {
private static final Logger LOG = LogManager.getLogger(ImqFirstVisionEndpoint.class);
private final IbmMqConnector connector;
private final StoreAccessor store;
// Configuration keys
private static final String CONNECTION_NAME_LIST_KEY = "endpoints.imq-first-vision.connection-name-list";
private static final String CHANNEL_KEY = "endpoints.imq-first-vision.channel";
private static final String QUEUE_MANAGER_KEY = "endpoints.imq-first-vision.queue-manager";
private static final String SSL_CIPHER_SUITE_KEY = "endpoints.imq-first-vision.ssl-cipher-suite";
private static final String VAULT_PATH_KEY = "vault.imq-first-vision.secrets.path";
/**
* Constructor that reads configuration from StoreAccessor.
*/
public ImqFirstVisionEndpoint(StoreAccessor store) {
this.store = store;
// Read configuration
String connectionNameList = getConfig(CONNECTION_NAME_LIST_KEY);
String channel = getConfig(CHANNEL_KEY);
String queueManager = getConfig(QUEUE_MANAGER_KEY);
String sslCipherSuite = getConfig(SSL_CIPHER_SUITE_KEY);
// Load credentials from Vault
String vaultPath = getVaultPath();
Credentials credentials = loadCredentialsFromVault(vaultPath);
// SSL configuration (optional)
String keystorePath = null;
String keystorePassword = null;
try {
this.connector = new IbmMqConnector(
connectionNameList,
channel,
queueManager,
credentials.getUsername(),
credentials.getPassword(),
keystorePath,
keystorePassword,
sslCipherSuite
);
LOG.info("Initialized IBM MQ First Vision endpoint for queue manager: {}", queueManager);
} catch (Exception e) {
throw new IllegalStateException("Failed to initialize IBM MQ endpoint", e);
}
}
/**
* Get a configuration value from StoreAccessor.
*/
private String getConfig(String key) {
return Optional.ofNullable(store.getConfig(key))
.orElseThrow(() -> new IllegalStateException(
"You need to configure " + key));
}
/**
* Get vault path from configuration.
*/
private String getVaultPath() {
return Optional.ofNullable(store.getConfig(VAULT_PATH_KEY))
.orElseThrow(() -> new IllegalStateException(
"You need to configure " + VAULT_PATH_KEY));
}
/**
* Load credentials from HashiCorp Vault.
*/
private Credentials loadCredentialsFromVault(String vaultPath) {
try {
// Get vault URL from configuration
String vaultUrl = getConfig("vault.url");
String vaultUser = getConfig("vault.user");
String vaultPassword = getConfig("vault.password");
VaultConnector vaultConnector = new VaultConnector(vaultUrl, vaultUser, vaultPassword);
Optional<Credentials> credentials = vaultConnector.getUsernameAndPassword(vaultPath);
return credentials.orElseThrow(() -> new IllegalStateException(
"Credentials not found in Vault at path: " + vaultPath));
} catch (Exception e) {
throw new IllegalStateException("Failed to load credentials from Vault", e);
}
}
/**
* Send a message to a queue.
*
* @param queueName Physical queue name or logical name (from ImqFirstVisionQueue)
* @param payload Message payload
* @param format Message format
* @param properties JMS properties
*/
public void send(String queueName, String payload, MqMessageFormat format,
java.util.Map<String, String> properties) {
connector.send(queueName, payload, format, properties);
}
/**
* Send a message to a queue using logical queue name.
*/
public void send(ImqFirstVisionQueue queue, String payload, MqMessageFormat format,
java.util.Map<String, String> properties) {
String physicalQueueName = resolveQueue(queue);
connector.send(physicalQueueName, payload, format, properties);
}
/**
* Receive a message from a queue.
*
* @param queueName Physical queue name or logical name
* @param messageSelector JMS message selector (optional)
* @param format Expected message format
* @param timeout Timeout duration
* @return Received message
*/
public ReceivedMessage receive(String queueName, String messageSelector,
MqMessageFormat format, Duration timeout) {
return connector.receive(queueName, messageSelector, format, timeout);
}
/**
* Receive a message from a queue using logical queue name.
*/
public ReceivedMessage receive(ImqFirstVisionQueue queue, String messageSelector,
MqMessageFormat format, Duration timeout) {
String physicalQueueName = resolveQueue(queue);
return connector.receive(physicalQueueName, messageSelector, format, timeout);
}
/**
* Browse a queue (non-destructive read).
*
* @param queueName Physical queue name or logical name
* @param messageSelector JMS message selector (optional)
* @param format Expected message format
* @param maxMessages Maximum number of messages
* @return List of received messages
*/
public List<ReceivedMessage> browse(String queueName, String messageSelector,
MqMessageFormat format, int maxMessages) {
return connector.browse(queueName, messageSelector, format, maxMessages);
}
/**
* Browse a queue using logical queue name.
*/
public List<ReceivedMessage> browse(ImqFirstVisionQueue queue, String messageSelector,
MqMessageFormat format, int maxMessages) {
String physicalQueueName = resolveQueue(queue);
return connector.browse(physicalQueueName, messageSelector, format, maxMessages);
}
/**
* Resolve logical queue name to physical queue name.
*
* @param logicalName Logical queue name or ImqFirstVisionQueue enum
* @return Physical queue name
*/
public String resolveQueue(String logicalName) {
String configKey = "endpoints.imq-first-vision." + logicalName + ".queue";
return Optional.ofNullable(store.getConfig(configKey))
.orElseThrow(() -> new IllegalStateException(
"Queue '" + logicalName + "' is not configured in " + configKey));
}
/**
* Resolve ImqFirstVisionQueue enum to physical queue name.
*/
public String resolveQueue(ImqFirstVisionQueue queue) {
return resolveQueue(queue.getConfigKey());
}
@Override
public void close() {
if (connector != null) {
connector.close();
}
}
}

View File

@ -0,0 +1,54 @@
package cz.moneta.test.harness.endpoints.imq;
/**
* Logical queue names for IBM MQ First Vision.
* Physical queue names are resolved from configuration.
*/
public enum ImqFirstVisionQueue {
/**
* Payment notifications queue.
*/
PAYMENT_NOTIFICATIONS("payment-notifications"),
/**
* Payment request queue.
*/
PAYMENT_REQUEST("payment-request"),
/**
* MF (Money Flow) requests queue.
*/
MF_REQUESTS("mf-requests"),
/**
* MF (Money Flow) responses queue.
*/
MF_RESPONSES("mf-responses"),
/**
* MF (Money Flow) EBCDIC queue.
*/
MF_EBCDIC("mf-ebcdic"),
/**
* MF (Money Flow) UTF-8 queue.
*/
MF_UTF8("mf-utf8");
private static final String BASE_CONFIG_KEY = "endpoints.imq-first-vision.";
private static final String QUEUE_SUFFIX = ".queue";
private final String configKey;
ImqFirstVisionQueue(String configKey) {
this.configKey = BASE_CONFIG_KEY + configKey + QUEUE_SUFFIX;
}
/**
* Get the configuration key for this queue.
* Used to resolve physical queue name from configuration.
*/
public String getConfigKey() {
return configKey;
}
}

View File

@ -0,0 +1,83 @@
package cz.moneta.test.harness.messaging;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NullNode;
/**
* Wrapper for extracted JSON path values.
* Provides fluent methods for value extraction and conversion.
*/
public class JsonPathValue {
private final JsonNode node;
private final String rawValue;
public JsonPathValue(JsonNode node) {
this.node = node;
this.rawValue = node != null ? node.asText() : null;
}
public JsonPathValue(String rawValue) {
this.node = null;
this.rawValue = rawValue;
}
/**
* Get the value as a string.
*/
public String asText() {
if (node != null && !(node instanceof NullNode)) {
return node.asText();
}
return rawValue;
}
/**
* Get the value as an integer.
*/
public int asInt() {
if (node != null && !(node instanceof NullNode)) {
return node.asInt();
}
return Integer.parseInt(rawValue);
}
/**
* Get the value as a long.
*/
public long asLong() {
if (node != null && !(node instanceof NullNode)) {
return node.asLong();
}
return Long.parseLong(rawValue);
}
/**
* Get the value as a boolean.
*/
public boolean asBoolean() {
if (node != null && !(node instanceof NullNode)) {
return node.asBoolean();
}
return Boolean.parseBoolean(rawValue);
}
/**
* Check if the value is null or missing.
*/
public boolean isNull() {
return node == null || node instanceof NullNode || rawValue == null;
}
/**
* Get the underlying JsonNode.
*/
public JsonNode getNode() {
return node;
}
@Override
public String toString() {
return asText();
}
}

View File

@ -0,0 +1,21 @@
package cz.moneta.test.harness.messaging;
/**
* Content type of a received message.
*/
public enum MessageContentType {
/**
* JSON content - body is a JSON string.
*/
JSON,
/**
* XML content - body is an XML string.
*/
XML,
/**
* Raw text content - body is plain text (e.g., EBCDIC decoded, UTF-8).
*/
RAW_TEXT
}

View File

@ -0,0 +1,111 @@
package cz.moneta.test.harness.messaging;
import org.assertj.core.api.AbstractObjectAssert;
/**
* Response interface for received messages.
* Provides assertion methods for verifying message content.
* Shared interface for both Kafka and IBM MQ message responses.
*/
public interface MessageResponse {
/**
* Assert that a field in the message body has the expected value.
* For JSON: uses JSON path (dot/bracket notation).
* For XML: uses XPath expression.
*
* @param path JSON path or XPath expression
* @param value expected value as string
* @return this instance for fluent assertions
* @throws AssertionError if assertion fails
*/
MessageResponse andAssertFieldValue(String path, String value);
/**
* Assert that a field exists in the message body.
*
* @param path JSON path or XPath expression
* @return this instance for fluent assertions
* @throws AssertionError if assertion fails
*/
MessageResponse andAssertPresent(String path);
/**
* Assert that a field does not exist in the message body.
*
* @param path JSON path or XPath expression
* @return this instance for fluent assertions
* @throws AssertionError if assertion fails
*/
MessageResponse andAssertNotPresent(String path);
/**
* Assert that a header (Kafka header or JMS property) has the expected value.
*
* @param headerName name of the header/property
* @param value expected value
* @return this instance for fluent assertions
* @throws AssertionError if assertion fails
*/
MessageResponse andAssertHeaderValue(String headerName, String value);
/**
* Assert that the message body contains a substring.
* Primarily used for EBCDIC/UTF-8 raw text assertions.
*
* @param substring expected substring
* @return this instance for fluent assertions
* @throws AssertionError if assertion fails
*/
MessageResponse andAssertBodyContains(String substring);
/**
* Get AssertJ fluent assertion for complex object assertions.
*
* @return AssertJ AbstractObjectAssert for fluent assertions
*/
AbstractObjectAssert<?, ?> andAssertWithAssertJ();
/**
* Extract a value from the message body.
* For JSON: uses JSON path (dot/bracket notation).
* For XML: uses XPath expression.
*
* @param path JSON path or XPath expression
* @return JsonPathValue wrapper for the extracted value
*/
JsonPathValue extract(String path);
/**
* Deserialize the message body to a Java object.
* For JSON: uses Jackson ObjectMapper.
* For XML: uses JAXB or Jackson XmlMapper.
*
* @param type target type
* @param <T> target type
* @return deserialized object
*/
<T> T mapTo(Class<T> type);
/**
* Get the raw message body.
*
* @return message body as string
*/
String getBody();
/**
* Get a header value (Kafka header or JMS property).
*
* @param name header/property name
* @return header value or null if not present
*/
String getHeader(String name);
/**
* Get the underlying received message.
*
* @return ReceivedMessage instance
*/
ReceivedMessage getMessage();
}

View File

@ -0,0 +1,31 @@
package cz.moneta.test.harness.messaging;
/**
* Message format for IBM MQ.
* Defines how messages are encoded and transmitted.
*/
public enum MqMessageFormat {
/**
* JSON format - JMS TextMessage with plain JSON string.
* Default format for IBM MQ.
*/
JSON,
/**
* XML format - JMS TextMessage with XML string.
* XML is decoded and can be queried using XPath.
*/
XML,
/**
* EBCDIC format - JMS BytesMessage with EBCDIC IBM-870 encoding.
* Used for mainframe systems (Czech/Slovak characters).
*/
EBCDIC_870,
/**
* UTF-8 format - JMS BytesMessage with UTF-8 (CCSID 1208) encoding.
* Used for binary data with explicit UTF-8 encoding.
*/
UTF8_1208
}

View File

@ -0,0 +1,386 @@
package cz.moneta.test.harness.messaging;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Represents a received message from a messaging system.
* Body is always normalized to a String regardless of source and wire format.
* <p>
* For Kafka: Avro GenericRecord is automatically converted to JSON.
* For IBM MQ (JSON): JSON string from JMS TextMessage.
* For IBM MQ (XML): XML string from JMS TextMessage.
* For IBM MQ (EBCDIC): byte[] from JMS BytesMessage decoded from IBM-870.
*/
public class ReceivedMessage {
private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
private static final Pattern JSON_PATH_PATTERN = Pattern.compile("^([\\w.]+)\\[([\\d]+)\\](.*)$");
private final String body;
private final MessageContentType contentType;
private final Map<String, String> headers;
private final long timestamp;
private final String source;
private final String key;
public ReceivedMessage(String body, MessageContentType contentType, Map<String, String> headers,
long timestamp, String source, String key) {
this.body = body;
this.contentType = contentType;
this.headers = headers != null ? Collections.unmodifiableMap(new HashMap<>(headers)) : new HashMap<>();
this.timestamp = timestamp;
this.source = source;
this.key = key;
}
/**
* Extract a JSON value using JSON path (dot/bracket notation).
* Supports paths like "items[0].sku" or "nested.field".
*
* @param path JSON path
* @return JsonNode for the extracted value
*/
public JsonNode extractJson(String path) {
if (body == null || StringUtils.isEmpty(path)) {
return null;
}
try {
JsonNode root = JSON_MAPPER.readTree(body);
return evaluateJsonPath(root, path);
} catch (Exception e) {
throw new RuntimeException("Failed to extract JSON path: " + path, e);
}
}
/**
* Extract a value using XPath (for XML messages).
*
* @param xpathExpression XPath expression
* @return extracted value as string
*/
public String extractXml(String xpathExpression) {
if (body == null || StringUtils.isEmpty(xpathExpression)) {
return null;
}
try {
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
factory.setFeature("http://xml.org/sax/features/external-general-entities", false);
factory.setFeature("http://xml.org/sax/features/external-parameter-entities", false);
DocumentBuilder builder = factory.newDocumentBuilder();
javax.xml.parsers.DocumentBuilder finalBuilder = builder;
var document = finalBuilder.parse(new java.io.ByteArrayInputStream(body.getBytes()));
document.getDocumentElement().normalize();
XPath xpath = XPathFactory.newInstance().newXPath();
Object result = xpath.evaluate(xpathExpression, document, XPathConstants.NODE);
if (result instanceof org.w3c.dom.Node domNode) {
return domNode.getTextContent();
}
return null;
} catch (Exception e) {
throw new RuntimeException("Failed to evaluate XPath: " + xpathExpression, e);
}
}
/**
* Universal extract method - auto-detects content type and uses appropriate extraction.
*
* @param expression JSON path or XPath expression
* @return extracted value as string
*/
public String extract(String expression) {
return switch (contentType) {
case JSON -> extractJson(expression).asText();
case XML -> extractXml(expression);
case RAW_TEXT -> body;
};
}
/**
* Evaluate JSON path on a JSON node.
* Supports dot notation and bracket notation for arrays.
*/
private JsonNode evaluateJsonPath(JsonNode node, String path) {
if (StringUtils.isEmpty(path)) {
return node;
}
String[] parts = tokenizePath(path);
JsonNode current = node;
for (String part : parts) {
if (current == null) {
return null;
}
if (part.isEmpty()) {
continue;
}
if (part.contains("[")) {
// Array access
Matcher matcher = JSON_PATH_PATTERN.matcher(part);
if (matcher.matches()) {
String arrayName = matcher.group(1);
int index = Integer.parseInt(matcher.group(2));
String remaining = matcher.group(3);
if (current.isArray()) {
current = index < current.size() ? current.get(index) : null;
} else if (current.isObject()) {
JsonNode arrayNode = current.get(arrayName);
if (arrayNode != null && arrayNode.isArray()) {
current = index < arrayNode.size() ? arrayNode.get(index) : null;
} else {
current = null;
}
}
// Continue with remaining path
if (StringUtils.isNotBlank(remaining)) {
current = evaluateJsonPath(current, remaining);
}
} else {
current = current.get(part);
}
} else if (part.contains(".")) {
// Navigate through object properties
String[] segments = part.split("\\.");
for (String segment : segments) {
if (StringUtils.isEmpty(segment)) {
continue;
}
current = current.get(segment);
}
} else {
current = current.get(part);
}
}
return current;
}
/**
* Tokenize JSON path into segments.
*/
private String[] tokenizePath(String path) {
if (StringUtils.isEmpty(path)) {
return new String[0];
}
java.util.List<String> tokens = new java.util.ArrayList<>();
StringBuilder current = new StringBuilder();
boolean inBracket = false;
for (int i = 0; i < path.length(); i++) {
char c = path.charAt(i);
if (c == '[') {
inBracket = true;
current.append(c);
} else if (c == ']') {
inBracket = false;
current.append(c);
} else if (c == '.' && !inBracket) {
if (current.length() > 0) {
tokens.add(current.toString());
current.setLength(0);
}
} else {
current.append(c);
}
}
if (current.length() > 0) {
tokens.add(current.toString());
}
return tokens.toArray(new String[0]);
}
/**
* Get the message key (Kafka message key, null for IBM MQ).
*/
public String getKey() {
return key;
}
/**
* Get a header value (Kafka header or JMS property).
*/
public String getHeader(String name) {
return headers.get(name);
}
/**
* Get all headers.
*/
public Map<String, String> getHeaders() {
return headers;
}
/**
* Get the message body.
*/
public String getBody() {
return body;
}
/**
* Get the message timestamp.
*/
public long getTimestamp() {
return timestamp;
}
/**
* Get the content type.
*/
public MessageContentType getContentType() {
return contentType;
}
/**
* Get the source (topic or queue name).
*/
public String getSource() {
return source;
}
/**
* Deserialize the message body to a Java object.
*
* @param type target type
* @param <T> target type
* @return deserialized object
*/
public <T> T mapTo(Class<T> type) {
if (body == null) {
return null;
}
try {
if (contentType == MessageContentType.XML) {
// XML deserialization using JAXB
return mapXmlTo(type);
} else {
// JSON deserialization using Jackson
return JSON_MAPPER.readValue(body, type);
}
} catch (Exception e) {
throw new RuntimeException("Failed to deserialize message to " + type.getName(), e);
}
}
/**
* Deserialize the message body to a Java object for XML.
* Uses JAXB-like parsing - simplified for basic XML structures.
*/
private <T> T mapXmlTo(Class<T> type) {
try {
// For XML, parse to a simple map structure
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
DocumentBuilder builder = factory.newDocumentBuilder();
javax.xml.parsers.DocumentBuilder finalBuilder = builder;
var document = finalBuilder.parse(new java.io.ByteArrayInputStream(body.getBytes()));
document.getDocumentElement().normalize();
Map<String, Object> xmlMap = xmlToMap(document.getDocumentElement());
return JSON_MAPPER.convertValue(xmlMap, type);
} catch (Exception e) {
throw new RuntimeException("Failed to deserialize XML message", e);
}
}
/**
* Convert XML element to Map.
*/
private Map<String, Object> xmlToMap(org.w3c.dom.Element element) {
Map<String, Object> result = new HashMap<>();
for (int i = 0; i < element.getAttributes().getLength(); i++) {
org.w3c.dom.NamedNodeMap attributes = element.getAttributes();
org.w3c.dom.Node attr = attributes.item(i);
result.put("@" + attr.getNodeName(), attr.getNodeValue());
}
// Add children
for (int i = 0; i < element.getChildNodes().getLength(); i++) {
org.w3c.dom.Node node = element.getChildNodes().item(i);
if (node.getNodeType() == org.w3c.dom.Node.ELEMENT_NODE) {
org.w3c.dom.Element childElement = (org.w3c.dom.Element) node;
String tagName = childElement.getTagName();
if (childElement.getChildNodes().getLength() == 0) {
// Leaf element
result.put(tagName, childElement.getTextContent());
} else {
// Check if all children are elements (complex) or text (simple)
boolean hasElement = false;
for (int j = 0; j < childElement.getChildNodes().getLength(); j++) {
org.w3c.dom.Node childNode = childElement.getChildNodes().item(j);
if (childNode.getNodeType() == org.w3c.dom.Node.TEXT_NODE &&
StringUtils.isNotBlank(childNode.getTextContent())) {
} else if (childNode.getNodeType() == org.w3c.dom.Node.ELEMENT_NODE) {
hasElement = true;
}
}
if (hasElement) {
Map<String, Object> childMap = xmlToMap(childElement);
if (result.containsKey(tagName)) {
// Convert to list if multiple elements with same name
java.util.List<Object> list = new java.util.ArrayList<>();
if (result.get(tagName) instanceof Map) {
list.add(result.get(tagName));
}
list.add(childMap);
result.put(tagName, list);
} else {
result.put(tagName, childMap);
}
} else {
result.put(tagName, childElement.getTextContent());
}
}
}
}
// If element has only text content and no attributes or children, return text
if (element.getChildNodes().getLength() == 0) {
Map<String, Object> textMap = new HashMap<>();
textMap.put("#text", element.getTextContent());
return textMap;
}
return result;
}
@Override
public String toString() {
return "ReceivedMessage{" +
"contentType=" + contentType +
", source='" + source + '\'' +
", key='" + key + '\'' +
", body='" + body + '\'' +
'}';
}
}

View File

@ -0,0 +1,16 @@
package cz.moneta.test.harness.messaging.exception;
/**
* Exception thrown when connection to messaging system fails.
* Covers authentication failures, network issues, and connection problems.
*/
public class MessagingConnectionException extends MessagingException {
public MessagingConnectionException(String message) {
super(message);
}
public MessagingConnectionException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -0,0 +1,15 @@
package cz.moneta.test.harness.messaging.exception;
/**
* Exception thrown when destination (queue, topic) is not found or inaccessible.
*/
public class MessagingDestinationException extends MessagingException {
public MessagingDestinationException(String message) {
super(message);
}
public MessagingDestinationException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -0,0 +1,15 @@
package cz.moneta.test.harness.messaging.exception;
/**
* Abstract base exception for messaging system errors (IBM MQ, Kafka).
*/
public abstract class MessagingException extends RuntimeException {
protected MessagingException(String message) {
super(message);
}
protected MessagingException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -0,0 +1,16 @@
package cz.moneta.test.harness.messaging.exception;
/**
* Exception thrown when message schema validation fails.
* Currently primarily used for IBM MQ message format issues.
*/
public class MessagingSchemaException extends MessagingException {
public MessagingSchemaException(String message) {
super(message);
}
public MessagingSchemaException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -0,0 +1,15 @@
package cz.moneta.test.harness.messaging.exception;
/**
* Exception thrown when waiting for a message times out.
*/
public class MessagingTimeoutException extends MessagingException {
public MessagingTimeoutException(String message) {
super(message);
}
public MessagingTimeoutException(String message, Throwable cause) {
super(message, cause);
}
}