From ef4562ab74f51bc8f8f2e460b66276fd4d457a16 Mon Sep 17 00:00:00 2001 From: Radek Davidek Date: Mon, 16 Mar 2026 18:59:21 +0100 Subject: [PATCH] added IBM MQ connector --- kafka_ibm_mq_support_c8518eaa.plan 2.md | 1341 +++++++++++++++++ test-harness/pom.xml | 14 + .../connectors/messaging/IbmMqConnector.java | 447 ++++++ .../endpoints/imq/ImqFirstVisionEndpoint.java | 210 +++ .../endpoints/imq/ImqFirstVisionQueue.java | 54 + .../test/harness/messaging/JsonPathValue.java | 83 + .../harness/messaging/MessageContentType.java | 21 + .../harness/messaging/MessageResponse.java | 111 ++ .../harness/messaging/MqMessageFormat.java | 31 + .../harness/messaging/ReceivedMessage.java | 386 +++++ .../MessagingConnectionException.java | 16 + .../MessagingDestinationException.java | 15 + .../exception/MessagingException.java | 15 + .../exception/MessagingSchemaException.java | 16 + .../exception/MessagingTimeoutException.java | 15 + 15 files changed, 2775 insertions(+) create mode 100644 kafka_ibm_mq_support_c8518eaa.plan 2.md create mode 100644 test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/IbmMqConnector.java create mode 100644 test-harness/src/main/java/cz/moneta/test/harness/endpoints/imq/ImqFirstVisionEndpoint.java create mode 100644 test-harness/src/main/java/cz/moneta/test/harness/endpoints/imq/ImqFirstVisionQueue.java create mode 100644 test-harness/src/main/java/cz/moneta/test/harness/messaging/JsonPathValue.java create mode 100644 test-harness/src/main/java/cz/moneta/test/harness/messaging/MessageContentType.java create mode 100644 test-harness/src/main/java/cz/moneta/test/harness/messaging/MessageResponse.java create mode 100644 test-harness/src/main/java/cz/moneta/test/harness/messaging/MqMessageFormat.java create mode 100644 test-harness/src/main/java/cz/moneta/test/harness/messaging/ReceivedMessage.java create mode 100644 test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingConnectionException.java create mode 100644 test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingDestinationException.java create mode 100644 test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingException.java create mode 100644 test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingSchemaException.java create mode 100644 test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingTimeoutException.java diff --git a/kafka_ibm_mq_support_c8518eaa.plan 2.md b/kafka_ibm_mq_support_c8518eaa.plan 2.md new file mode 100644 index 0000000..47e7dd4 --- /dev/null +++ b/kafka_ibm_mq_support_c8518eaa.plan 2.md @@ -0,0 +1,1341 @@ +--- +name: Kafka IBM MQ Support +overview: "Rozšíření testovacího frameworku Harness o podporu messaging systémů (Kafka na Confluent Cloud a IBM MQ) s kompletní abstrakcí, aby tester nemusel znát detaily jednotlivých protokolů. Změny probíhají ve dvou projektech: test-harness-master (framework) a test-master (DSL + testy)." +todos: + - id: maven-deps + content: Přidat Maven závislosti pro kafka-clients, kafka-avro-serializer, com.ibm.mq.allclient a jakarta.jms-api do pom.xml + status: pending + - id: model-classes + content: "Vytvořit model třídy: ReceivedMessage, MessageContentType, MqMessageFormat, MessageResponse interface" + status: pending + - id: exception-classes + content: "Vytvořit hierarchii výjimek: MessagingException (abstract), MessagingConnectionException, MessagingSchemaException, MessagingDestinationException, MessagingTimeoutException" + status: pending + - id: kafka-connector + content: Implementovat KafkaConnector s SASL/PLAIN auth, Avro+SchemaRegistry, producer pooling, consumer assign/seek (bez consumer group), poll cyklus s Predicate filtrem + status: pending + - id: ibmmq-connector + content: Implementovat IbmMqConnector s JMS klientem, user+password, SSL keystore, connection pooling, podpora JSON/XML/EBCDIC_870/UTF8_1208 + status: pending + - id: kafka-endpoint + content: Vytvořit KafkaEndpoint - čte endpoints.kafka.* konfiguraci, credentials z Vaultu, lazy init KafkaConnectoru + status: pending + - id: ibmmq-endpoint + content: Vytvořit ImqFirstVisionEndpoint - čte endpoints.imq-first-vision.* konfiguraci, user + heslo z Vaultu + status: pending + - id: kafka-request + content: Implementovat KafkaRequest fluent builder - withKey, withPayload, withPayloadFromTemplate (nice), withTraceparent/requestID/activityID/sourceCodebookId, send, receiveWhere, andAssertFieldValue, andAssertWithAssertJ (nice), mapTo + status: pending + - id: ibmmq-request + content: Implementovat ImqRequest fluent builder - toQueue, withPayload, withPayloadFromTemplate (nice), asXml/asEbcdic/asUtf8, send, receiveWhere, browse, andAssertFieldValue, andAssertWithAssertJ (nice), mapTo + status: pending + - id: kafka-dsl + content: Vytvořit Kafka DSL třídu a přidat withKafka() do Harness.java + status: pending + - id: ibmmq-dsl + content: Vytvořit ImqFirstVision DSL třídu a přidat withImqFirstVision() do Harness.java + status: pending + - id: config + content: Přidat endpoints.kafka.* a endpoints.imq-first-vision.* konfiguraci do envs/ souborů + vault paths + status: pending +isProject: false +--- + +# Rozšíření testovacího frameworku o Kafka a IBM MQ + +## Současná architektura + +Framework je postaven na vrstvené architektuře: + +```mermaid +graph TD + Test["Test (@TestCase)"] --> DSL["DSL vrstva (Harness.withXxx())"] + DSL --> EP["Endpoint (implementuje Endpoint)"] + EP --> CON["Connector (implementuje Connector)"] + CON --> SYS["Cílový systém"] +``` + + + +- **Connector** ([connectors/Connector.java](test-harness-master/src/main/java/cz/moneta/test/harness/connectors/Connector.java)): nízkoúrovňová komunikace s cílovým systémem +- **Endpoint** ([endpoints/Endpoint.java](test-harness-master/src/main/java/cz/moneta/test/harness/endpoints/Endpoint.java)): obaluje Connector, čte konfiguraci ze `StoreAccessor`, singleton per test class +- **DSL Builder** (v test-master, např. [Wso2.java](test-master/src/main/java/cz/moneta/test/dsl/wso2/Wso2.java)): fluent API pro testy +- **Harness** ([Harness.java](test-master/src/main/java/cz/moneta/test/dsl/Harness.java)): vstupní bod přes `withXxx()` metody + +## Navrhovaná architektura + +Kafka a IBM MQ mají oddělené vstupní body v Harness DSL -- analogicky k `withWso2()`, `withUfo()` apod. + +```mermaid +graph TD + subgraph kafkaPath [Kafka] + TestK["harness.withKafka()"] --> KafkaDSL["Kafka DSL"] + KafkaDSL --> KafkaReq["KafkaRequest builder"] + KafkaReq --> KafkaEP["KafkaEndpoint"] + KafkaEP --> KafkaCon["KafkaConnector"] + KafkaCon --> ConflCloud["Confluent Cloud"] + end + subgraph imqPath [IBM MQ] + TestI["harness.withImqFirstVision()"] --> ImqDSL["ImqFirstVision DSL"] + ImqDSL --> ImqReq["ImqRequest builder"] + ImqReq --> ImqEP["ImqFirstVisionEndpoint"] + ImqEP --> ImqCon["IbmMqConnector"] + ImqCon --> ImqSrv["IBM MQ Server"] + end + KafkaCon -.->|"API key/secret"| Vault["HashiCorp Vault"] + ImqCon -.->|"user + password"| Vault +``` + + + +## Třídní diagram + +```mermaid +classDiagram + class Connector { + <> + +close() + } + class Endpoint { + <> + +canAccess() bool + +close() + } + + class KafkaConnector { + -KafkaProducer producer + -CachedSchemaRegistryClient schemaRegistry + +send(topic, key, jsonPayload, headers) + +receive(topic, filter, timeout) List~ReceivedMessage~ + +close() + } + + class IbmMqConnector { + -JmsConnectionFactory connectionFactory + -JMSContext jmsContext + +send(queue, payload, format, properties) + +receive(queue, selector, format, timeout) ReceivedMessage + +browse(queue, selector, format, maxMessages) List~ReceivedMessage~ + +close() + } + + class KafkaEndpoint { + -KafkaConnector connector + -StoreAccessor store + } + + class ImqFirstVisionEndpoint { + -IbmMqConnector connector + -StoreAccessor store + } + + class ReceivedMessage { + +String body + +MessageContentType contentType + +Map headers + +long timestamp + +extract(expression) String + +extractJson(path) JsonNode + +extractXml(xpath) String + } + + class MqMessageFormat { + <> + JSON + XML + EBCDIC_870 + UTF8_1208 + } + + Connector <|.. KafkaConnector + Connector <|.. IbmMqConnector + Endpoint <|.. KafkaEndpoint + Endpoint <|.. ImqFirstVisionEndpoint + KafkaEndpoint --> KafkaConnector + ImqFirstVisionEndpoint --> IbmMqConnector +``` + + + +## Detailní návrh tříd + +### 1. Connectors (test-harness-master) + +#### KafkaConnector + +Umístění: `cz.moneta.test.harness.connectors.messaging.KafkaConnector` + +- Spravuje `KafkaProducer` (lazy init, singleton, thread-safe) a `KafkaConsumer` (per-call, kvůli izolaci) +- SASL/PLAIN autentizace: API key + secret z Vaultu +- SSL context s truststore (stejný pattern jako `BaseRestConnector`) +- **Formát zpráv: Avro** -- používá Confluent Schema Registry: + - **Volba schématu je řízena názvem topicu**: Schema Registry subject = `{topic-name}-value` (Confluent TopicNameStrategy, výchozí). Tester nemusí schéma specifikovat -- connector ho získá automaticky z Registry podle topicu. + - Producer: `KafkaAvroSerializer` serializuje `GenericRecord` / `SpecificRecord` + - Consumer: `KafkaAvroDeserializer` deserializuje na `GenericRecord` + - Automatická konverze Avro `GenericRecord` -> JSON string v `ReceivedMessage` (transparentní pro testera) + - Pro odesílání: tester poskytne JSON string, connector stáhne schéma z Registry podle topicu a vytvoří `GenericRecord` +- **Vstup pro odeslání zprávy**: název topicu (= logická destinace), klíč (String), tělo zprávy (JSON String), headery (Map) +- **Podporované Kafka headery** -- framework poskytuje typované builder metody pro běžné tracing/correlation headery: + - `traceparent` -- W3C Trace Context propagace (např. `00-traceId-spanId-01`) + - `requestID` -- identifikátor požadavku + - `activityID` -- identifikátor aktivity + - `sourceCodebookId` -- identifikátor zdrojového číselníku + - Libovolné další headery přes generický `.withHeader(key, value)` +- **Izolace testů**: Consumer bez consumer group - používá `consumer.assign(partitions)` + `consumer.seekToEnd()` místo `subscribe()`. Díky tomu: + - Žádný consumer group = žádný rebalancing mezi testy + - Každý test si čte od aktuální pozice nezávisle + - Testy neovlivňují offsety ostatních testů +- **Poll cyklus**: Metoda `receive(topic, Predicate, Duration timeout)`: + 1. Vytvoří nový Consumer + 2. `assign()` na všechny partitions daného topicu + 3. `seekToEnd()` (nebo `seek` na uloženou pozici) + 4. Polling smyčka s exponenciálním backoff (100ms -> 500ms -> 1s) + 5. Každý record: Avro GenericRecord -> JSON -> test proti Predicate + 6. Vrátí první matching zprávu, nebo vyhodí `MessagingTimeoutException` + +```java +public class KafkaConnector implements Connector { + private final Properties baseConfig; + private KafkaProducer producer; + private final String schemaRegistryUrl; + private final CachedSchemaRegistryClient schemaRegistryClient; + + public KafkaConnector(String bootstrapServers, String apiKey, + String apiSecret, String schemaRegistryUrl, + String schemaRegistryApiKey, + String schemaRegistryApiSecret) { ... } + + /** + * Odeslání zprávy na topic. + * Schéma se získá automaticky z Schema Registry podle topicu + * (subject = "{topic}-value", TopicNameStrategy). + * + * @param topic název Kafka topicu (= logická destinace) + * @param key klíč zprávy (String) + * @param jsonPayload tělo zprávy jako JSON String + * @param headers Kafka headery (traceparent, requestID, activityID, + * sourceCodebookId, ...) + */ + public void send(String topic, String key, String jsonPayload, + Map headers) { + Schema schema = getSchemaForTopic(topic); + GenericRecord record = jsonToAvro(jsonPayload, schema); + ProducerRecord producerRecord = + new ProducerRecord<>(topic, key, record); + headers.forEach((k, v) -> + producerRecord.headers().add(k, v.getBytes(StandardCharsets.UTF_8))); + producer.send(producerRecord).get(); + } + + // Příjem: Avro GenericRecord se konvertuje na JSON v ReceivedMessage + public List receive(String topic, + Predicate filter, + Duration timeout) { ... } + + public Map saveOffsets(String topic) { ... } + + // Stáhne latest schéma z Registry podle topicu (subject = {topic}-value) + private Schema getSchemaForTopic(String topic) { + String subject = topic + "-value"; + SchemaMetadata meta = schemaRegistryClient.getLatestSchemaMetadata(subject); + return new Schema.Parser().parse(meta.getSchema()); + } + + private String avroToJson(GenericRecord record) { ... } + private GenericRecord jsonToAvro(String json, Schema schema) { ... } +} +``` + +#### IbmMqConnector + +Umístění: `cz.moneta.test.harness.connectors.messaging.IbmMqConnector` + +- Používá `com.ibm.mq.allclient` JMS klient +- `JmsConnectionFactory` pro připojení s user + password +- **Multi-instance Queue Manager**: Podpora connection name list ve formátu `host1(port1),host2(port2)` (např. `mq9multitst5x(1414),mq9multitst6x(1414)`). Nastavuje se přes `MQConnectionFactory.setConnectionNameList()` místo samostatných `setHostName()` / `setPort()`. IBM MQ klient automaticky provádí failover na další instanci při nedostupnosti. +- SSL/TLS s keystore a truststore +- **Podporované formáty zpráv:** + - **JSON** (výchozí): `JMS TextMessage` s plain JSON string + - **XML**: `JMS TextMessage` s XML string; příjem konvertuje XML na JSON v `ReceivedMessage` (přes Jackson `XmlMapper`) pro jednotné API + - **UTF-8 / CCSID 1208** (výchozí pro binární): `JMS BytesMessage` s payload kódovaným v UTF-8 (IBM CCSID 1208). MQMD CCSID = 1208. Vhodné pro systémy vyžadující explicitní UTF-8 BytesMessage. + - **EBCDIC / CCSID 870**: `JMS BytesMessage` s payload kódovaným v EBCDIC code page 870 (český/slovenský mainframe kódování). MQMD CCSID = 870. Při odesílání se Java String konvertuje na `byte[]` přes `Charset.forName("IBM870")`. Při příjmu se `byte[]` dekóduje zpět na Java String. +- **Logika formátu:** + - Formát se volí explicitně ve fluent API (`.asXml()`, `.asEbcdic()`, `.asUtf8()`); výchozí je JSON + - Při příjmu: connector detekuje typ JMS zprávy (`TextMessage` vs `BytesMessage`) a dekóduje odpovídajícím způsobem podle nastaveného formátu +- **Connection pooling**: Drží `JMSContext` per endpoint instance, recykluje sessions +- **Izolace testů**: Používá `QueueBrowser` pro nedestruktivní čtení (browse), nebo `createConsumer` s message selektorem +- `send()`, `receive()`, `browse()` metody + +```java +public enum MqMessageFormat { + JSON, // TextMessage, plain JSON (UTF-8 default) + XML, // TextMessage, XML string (UTF-8 default) + EBCDIC_870, // BytesMessage, EBCDIC IBM-870 (česky/slovensky) + UTF8_1208 // BytesMessage, UTF-8 IBM CCSID 1208 (výchozí pro binární) +} + +public class IbmMqConnector implements Connector { + private static final Charset EBCDIC_870 = Charset.forName("IBM870"); + private static final Charset UTF_8 = StandardCharsets.UTF_8; + + private final JmsConnectionFactory connectionFactory; + private JMSContext jmsContext; + + // connectionNameList: "host1(port1),host2(port2)" pro multi-instance QMGR + public IbmMqConnector(String connectionNameList, String channel, + String queueManager, String user, String password, + String keystorePath, String keystorePassword) { + JmsConnectionFactory cf = ...; + cf.setStringProperty(WMQConstants.WMQ_CONNECTION_NAME_LIST, connectionNameList); + cf.setStringProperty(WMQConstants.WMQ_CHANNEL, channel); + cf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, queueManager); + cf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT); + // ... + } + + // Odeslání zprávy s daným formátem + public void send(String queueName, String payload, + MqMessageFormat format, + Map properties) { + switch (format) { + case JSON, XML -> sendTextMessage(queueName, payload, properties); + case EBCDIC_870 -> sendBytesMessage(queueName, payload, EBCDIC_870, 870, properties); + case UTF8_1208 -> sendBytesMessage(queueName, payload, UTF_8, 1208, properties); + } + } + + // TextMessage pro JSON a XML (JVM default UTF-8) + private void sendTextMessage(String queueName, String payload, + Map properties) { ... } + + // BytesMessage s konverzí String -> byte[] v daném kódování + MQMD CCSID + private void sendBytesMessage(String queueName, String payload, + Charset charset, int ccsid, + Map properties) { + byte[] bytes = payload.getBytes(charset); + // Vytvoří BytesMessage, nastaví MQMD CCSID, zapíše bytes + } + + // Příjem: auto-detekce TextMessage vs BytesMessage + public ReceivedMessage receive(String queueName, String messageSelector, + MqMessageFormat expectedFormat, + Duration timeout) { ... } + + /** + * Browse (ne-destruktivní čtení) -- zprávy zůstávají ve frontě. + * Vrací seznam zpráv odpovídajících messageSelector, bez jejich odstranění. + * Použití: inspekce obsahu fronty před/po testu, ověření počtu zpráv. + */ + public List browse(String queueName, String messageSelector, + MqMessageFormat expectedFormat, + int maxMessages) { ... } + + // Dekódování přijaté zprávy podle formátu + private String decodeMessage(Message jmsMessage, + MqMessageFormat format) { + if (jmsMessage instanceof TextMessage txt) { + return txt.getText(); // JSON nebo XML string (UTF-8) + } else if (jmsMessage instanceof BytesMessage bytesMsg) { + byte[] data = new byte[(int) bytesMsg.getBodyLength()]; + bytesMsg.readBytes(data); + Charset charset = switch (format) { + case EBCDIC_870 -> EBCDIC_870; + case UTF8_1208 -> UTF_8; + default -> UTF_8; + }; + return new String(data, charset); + } + } +} +``` + +### 2. Model třídy (test-harness-master) + +#### ReceivedMessage + +Umístění: `cz.moneta.test.harness.support.messaging.ReceivedMessage` + +Body je vždy String normalizovaný do čitelné podoby bez ohledu na zdroj a wire formát: + +- Z Kafka: Avro `GenericRecord` je automaticky konvertován na JSON v `KafkaConnector` +- Z IBM MQ (JSON): JSON string z `JMS TextMessage` je předán přímo +- Z IBM MQ (XML): XML string z `JMS TextMessage` je předán jako XML; `extract()` podporuje jak JSON dotpath, tak XPath +- Z IBM MQ (EBCDIC): `byte[]` z `JMS BytesMessage` je dekódován z IBM-870 na Java String + +```java +public enum MessageContentType { + JSON, XML, RAW_TEXT +} + +public class ReceivedMessage { + private final String body; // dekódovaný textový obsah + private final MessageContentType contentType; // JSON, XML, nebo RAW_TEXT + private final Map headers; // Kafka headers nebo JMS properties + private final long timestamp; + private final String source; // topic nebo queue name + private final String key; // Kafka message key (null pro IBM MQ) + + // JSON navigace (dot/bracket notace: "items[0].sku") + public JsonNode extractJson(String path) { ... } + + // XPath navigace (pro XML zprávy: "/response/balance") + public String extractXml(String xpathExpression) { ... } + + // Univerzální extract - auto-detekce podle contentType + public String extract(String expression) { + return switch (contentType) { + case JSON -> extractJson(expression).asText(); + case XML -> extractXml(expression); + case RAW_TEXT -> body; + }; + } + + public String getKey() { ... } // Kafka klíč, null pro IBM MQ + public String getHeader(String name) { ... } // Kafka header nebo JMS property + public String getBody() { ... } // surový textový obsah + public long getTimestamp() { ... } + public MessageContentType getContentType() { ... } + + /** + * Deserializace těla zprávy do Java objektu. + * Pro JSON: Jackson ObjectMapper.readValue(body, type) + * Pro XML: JAXB Unmarshaller nebo Jackson XmlMapper + */ + public T mapTo(Class type) { ... } +} +``` + +### 3. Endpoints (test-harness-master) + +#### KafkaEndpoint + +Umístění: `cz.moneta.test.harness.endpoints.kafka.KafkaEndpoint` + +- Vzor: analogie s `Wso2GatewayEndpoint` -- konstruktor přijímá `StoreAccessor`, čte `endpoints.kafka.*` konfiguraci +- Čte konfiguraci z `StoreAccessor`: + - `endpoints.kafka.bootstrap-servers` + - `endpoints.kafka.security-protocol` + - `endpoints.kafka.sasl-mechanism` + - `endpoints.kafka.schema-registry-url` + - `endpoints.kafka.value-serializer` +- Credentials z Vaultu (path: `vault.kafka.secrets.path`): + - Kafka API key + secret + - Schema Registry API key + secret +- Lazy init KafkaConnectoru + +```java +public class KafkaEndpoint implements Endpoint { + private final KafkaConnector connector; + private final StoreAccessor store; + + public KafkaEndpoint(StoreAccessor store) { + this.store = store; + String bootstrapServers = Optional.ofNullable(store.getConfig("endpoints.kafka.bootstrap-servers")) + .orElseThrow(() -> new IllegalStateException( + "You need to configure endpoints.kafka.bootstrap-servers")); + String schemaRegistryUrl = store.getConfig("endpoints.kafka.schema-registry-url"); + // API key/secret z Vaultu + // ... + this.connector = new KafkaConnector(bootstrapServers, apiKey, apiSecret, + schemaRegistryUrl, srApiKey, srApiSecret); + } + + public void send(String topic, String key, String jsonPayload, + Map headers) { + connector.send(topic, key, jsonPayload, headers); + } + + public List receive(String topic, + Predicate filter, + Duration timeout) { + return connector.receive(topic, filter, timeout); + } + + @Override + public void close() { connector.close(); } +} +``` + +#### ImqFirstVisionEndpoint + +Umístění: `cz.moneta.test.harness.endpoints.imq.ImqFirstVisionEndpoint` + +- Vzor: analogie s `Wso2GatewayEndpoint` +- Čte konfiguraci z `StoreAccessor`: + - `endpoints.imq-first-vision.connection-name-list` -- formát `host1(port1),host2(port2)` pro multi-instance QMGR + - `endpoints.imq-first-vision.channel` + - `endpoints.imq-first-vision.queue-manager` + - `endpoints.imq-first-vision.ssl-cipher-suite` +- User i heslo z Vaultu (path: `vault.imq-first-vision.secrets.path`) +- Lazy init IbmMqConnectoru + +```java +public class ImqFirstVisionEndpoint implements Endpoint { + private final IbmMqConnector connector; + private final StoreAccessor store; + + public ImqFirstVisionEndpoint(StoreAccessor store) { + this.store = store; + // connection-name-list: "mq9multitst5x(1414),mq9multitst6x(1414)" + String connectionNameList = Optional.ofNullable( + store.getConfig("endpoints.imq-first-vision.connection-name-list")) + .orElseThrow(() -> new IllegalStateException( + "You need to configure endpoints.imq-first-vision.connection-name-list")); + String channel = store.getConfig("endpoints.imq-first-vision.channel"); + String queueManager = store.getConfig("endpoints.imq-first-vision.queue-manager"); + // user + password z Vaultu + String user = // z Vaultu (vault.imq-first-vision.secrets.path -> "user") + String password = // z Vaultu (vault.imq-first-vision.secrets.path -> "password") + // ... + this.connector = new IbmMqConnector(connectionNameList, channel, + queueManager, user, password, keystorePath, keystorePassword); + } + + public void send(String queueName, String payload, + MqMessageFormat format, Map properties) { + connector.send(queueName, payload, format, properties); + } + + public ReceivedMessage receive(String queueName, String messageSelector, + MqMessageFormat format, Duration timeout) { + return connector.receive(queueName, messageSelector, format, timeout); + } + + public List browse(String queueName, String messageSelector, + MqMessageFormat format, int maxMessages) { + return connector.browse(queueName, messageSelector, format, maxMessages); + } + + // Resolve logického názvu fronty z konfigurace + public String resolveQueue(String logicalName) { + return Optional.ofNullable( + store.getConfig("endpoints.imq-first-vision." + logicalName + ".queue")) + .orElseThrow(() -> new IllegalStateException( + "Queue '" + logicalName + "' is not configured in " + + "endpoints.imq-first-vision." + logicalName + ".queue")); + } + + @Override + public void close() { connector.close(); } +} +``` + +### 4. DSL Builder (test-master) + +#### Kafka DSL + +Umístění: `cz.moneta.test.dsl.kafka.Kafka` + +Vstupní bod přes `Harness`: + +```java +public Kafka withKafka() { + return new Kafka(this); +} +``` + +DSL třída uvnitř získá endpoint přes existující mechanismus: + +```java +public class Kafka { + private final Harness harness; + + public Kafka(Harness harness) { this.harness = harness; } + + public KafkaRequest.TopicPhase prepareRequest() { + KafkaEndpoint endpoint = harness.getEndpoint(KafkaEndpoint.class); + return KafkaRequest.builder(endpoint); + } +} +``` + +#### ImqFirstVision DSL + +Umístění: `cz.moneta.test.dsl.imq.ImqFirstVision` + +Vstupní bod přes `Harness`: + +```java +public ImqFirstVision withImqFirstVision() { + return new ImqFirstVision(this); +} +``` + +DSL třída uvnitř získá endpoint přes existující mechanismus: + +```java +public class ImqFirstVision { + private final Harness harness; + + public ImqFirstVision(Harness harness) { this.harness = harness; } + + public ImqRequest.QueuePhase prepareRequest() { + ImqFirstVisionEndpoint endpoint = harness.getEndpoint(ImqFirstVisionEndpoint.class); + return ImqRequest.builder(endpoint); + } +} +``` + +#### Registrace endpointů v BaseStoreAccessor + +`KafkaEndpoint` a `ImqFirstVisionEndpoint` **nevyžadují** explicitní registraci. Existující mechanismus v `BaseStoreAccessor.getEndpoint(Class)` je automaticky instanciuje přes reflexi (konstruktor `(StoreAccessor)`), cache-uje a spravuje lifecycle (zavírá přes `close()`). Podmínkou je, aby nové endpointy měly veřejný konstruktor `public XxxEndpoint(StoreAccessor store)` a implementovaly `Endpoint` interface. + +#### ImqFirstVisionQueue enum + +Umístění: `cz.moneta.test.harness.endpoints.imq.ImqFirstVisionQueue` + +Enum definuje logické názvy front. Fyzický název fronty se resolvuje z konfigurace (`endpoints.imq-first-vision.{logicalName}.queue`). Tester používá enum konstantu -- nemusí znát fyzický název fronty ani konfigurační klíč. + +```java +public enum ImqFirstVisionQueue { + PAYMENT_NOTIFICATIONS("payment-notifications"), + PAYMENT_REQUEST("payment-request"), + MF_REQUESTS("mf-requests"), + MF_RESPONSES("mf-responses"), + MF_EBCDIC("mf-ebcdic"), + MF_UTF8("mf-utf8"); + + private final String configKey; + + ImqFirstVisionQueue(String configKey) { + this.configKey = configKey; + } + + public String getConfigKey() { return configKey; } +} +``` + +Resoluce v `ImqFirstVisionEndpoint.resolveQueue()`: + +- `ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS` -> čte `endpoints.imq-first-vision.payment-notifications.queue` -> vrátí `"MVSW2TST3.DELIVERY.NOTIFICATION"` + +#### KafkaRequest fluent builder + +Umístění: `cz.moneta.test.harness.support.messaging.KafkaRequest` + +#### ImqRequest fluent builder + +Umístění: `cz.moneta.test.harness.support.messaging.ImqRequest` + +#### Definice fluent API pro realizaci + +Následující rozhraní a metody definují kompletní fluent API. DSL třídy (`Kafka`, `ImqFirstVision`) delegují na request buildery (`KafkaRequest`, `ImqRequest`). + +**Fáze builderu:** Každá metoda vrací rozhraní, které umožňuje další volání až do terminálu (`send()` nebo `MessageResponse`). + +**Legenda k implementaci:** +- **[EXISTUJE]** -- obdobná logika již je v frameworku (RawRestRequest), lze znovu využít nebo adaptovat +- **[NOVÉ]** -- je potřeba implementovat + +##### KafkaRequest – fázová rozhraní + +```java +// Fáze 1: Po withKafka() -- výběr směru (odeslání vs. příjem) -- [NOVÉ] +public interface KafkaSendPhase { + KafkaPayloadPhase toTopic(String topic); +} +public interface KafkaReceivePhase { + KafkaReceiveFilterPhase fromTopic(String topic); +} + +// Fáze 2a: Odeslání - payload a headery +// withPayload, withPayloadFromFile, addField, appendToArray, withHeader [EXISTUJE] v RawRestRequest +// withKey, withTraceparent/RequestID/ActivityID/SourceCodebookId, send [NOVÉ] +public interface KafkaPayloadPhase { + KafkaPayloadPhase withKey(String key); // [NOVÉ] + KafkaPayloadPhase withPayload(String json); // [EXISTUJE] + KafkaPayloadPhase withPayloadFromFile(String path); // [EXISTUJE] + KafkaPayloadPhase withPayloadFromTemplate(String path, Map variables); // [EXISTUJE] nice to have + KafkaPayloadPhase addField(String fieldName, Object value); + KafkaPayloadPhase addField(String path, String fieldName, Object value); + KafkaPayloadPhase appendToArray(String path, Object value); + KafkaPayloadPhase withTraceparent(String value); // [NOVÉ] + KafkaPayloadPhase withRequestID(String value); // [NOVÉ] + KafkaPayloadPhase withActivityID(String value); // [NOVÉ] + KafkaPayloadPhase withSourceCodebookId(String value); // [NOVÉ] + KafkaPayloadPhase withHeader(String key, String value); // [EXISTUJE] + void send(); // [NOVÉ] +} + +// Fáze 2b: Příjem - filtr a timeout -- [NOVÉ] +public interface KafkaReceiveFilterPhase { + KafkaAwaitingPhase receiveWhere(Predicate filter); +} +public interface KafkaAwaitingPhase { + MessageResponse withTimeout(long duration, TimeUnit unit); +} + +// MessageResponse -- obdobné RawRestRequest.Response +// andAssertFieldValue, andAssertPresent, andAssertNotPresent, extract, mapTo [EXISTUJE] +// andAssertHeaderValue, andAssertBodyContains, andAssertWithAssertJ [NOVÉ] +public interface MessageResponse { + MessageResponse andAssertFieldValue(String path, String value); // [EXISTUJE] + MessageResponse andAssertPresent(String path); // [EXISTUJE] + MessageResponse andAssertNotPresent(String path); // [EXISTUJE] + MessageResponse andAssertHeaderValue(String headerName, String value); // [NOVÉ] + MessageResponse andAssertBodyContains(String substring); // [NOVÉ] primárně IBM MQ + ObjectAssert andAssertWithAssertJ(); // [NOVÉ] nice to have + JsonPathValue extract(String path); // [EXISTUJE] + T mapTo(Class type); // [EXISTUJE] jako post(Class) + ReceivedMessage getMessage(); + String getBody(); + String getHeader(String name); +} +``` + +##### ImqRequest – fázová rozhraní + +```java +// Fáze 1: Po withImqFirstVision() -- [NOVÉ] +public interface ImqSendPhase { + ImqPayloadPhase toQueue(ImqFirstVisionQueue queue); +} +public interface ImqReceivePhase { + ImqReceiveFilterPhase fromQueue(ImqFirstVisionQueue queue); +} + +// Fáze 2a: Odeslání - asXml/asEbcdic/asUtf8 [NOVÉ], payload/addField/appendToArray [EXISTUJE] +public interface ImqPayloadPhase { + ImqPayloadPhase asXml(); // [NOVÉ] + ImqPayloadPhase asEbcdic(); // [NOVÉ] + ImqPayloadPhase asUtf8(); // [NOVÉ] + ImqPayloadPhase withPayload(String payload); + ImqPayloadPhase withPayloadFromFile(String path); + ImqPayloadPhase withPayloadFromTemplate(String path, Map variables); + ImqPayloadPhase addField(String fieldName, Object value); + ImqPayloadPhase addField(String path, String fieldName, Object value); + ImqPayloadPhase appendToArray(String path, Object value); + void send(); +} + +// Fáze 2b: Příjem - withSelector, receiveWhere, browse [NOVÉ] +public interface ImqReceiveFilterPhase { + ImqReceiveFilterPhase withSelector(String jmsMessageSelector); // [NOVÉ] + ImqAwaitingPhase receiveWhere(Predicate filter); + List browse(int maxMessages); // [NOVÉ] +} +public interface ImqAwaitingPhase { + MessageResponse withTimeout(long duration, TimeUnit unit); +} +``` + +##### Přehledná tabulka metod + +| Kontext | Metoda | Návrat | Poznámka | +|---------|--------|--------|----------| +| Harness | `withKafka()` | Kafka | [NOVÉ] vstupní bod | +| Harness | `withImqFirstVision()` | ImqFirstVision | [NOVÉ] vstupní bod | +| Kafka | `toTopic(String)` | KafkaPayloadPhase | [NOVÉ] odeslání | +| Kafka | `fromTopic(String)` | KafkaReceiveFilterPhase | [NOVÉ] příjem | +| KafkaPayloadPhase | `withKey(String)` | KafkaPayloadPhase | [NOVÉ] Kafka specifické | +| KafkaPayloadPhase | `withPayload(String)` | KafkaPayloadPhase | [EXISTUJE] RawRestRequest.withPayload | +| KafkaPayloadPhase | `withPayloadFromFile(String)` | KafkaPayloadPhase | [EXISTUJE] RawRestRequest.withPayloadFromFile | +| KafkaPayloadPhase | `withPayloadFromTemplate(String, Map)` | KafkaPayloadPhase | [EXISTUJE] RawRestRequest.withPayloadFromTemplate (Template API) -- nice to have | +| KafkaPayloadPhase | `addField(...)` | KafkaPayloadPhase | [EXISTUJE] RawRestRequest.addField, JSON pouze | +| KafkaPayloadPhase | `appendToArray(...)` | KafkaPayloadPhase | [EXISTUJE] RawRestRequest.appendToArray, JSON pouze | +| KafkaPayloadPhase | `withTraceparent/RequestID/ActivityID/SourceCodebookId(String)` | KafkaPayloadPhase | [NOVÉ] Kafka headery | +| KafkaPayloadPhase | `withHeader(String, String)` | KafkaPayloadPhase | [EXISTUJE] RawRestRequest.withHeader | +| KafkaPayloadPhase | `send()` | void | [NOVÉ] terminál (messaging specifické) | +| KafkaReceiveFilterPhase | `receiveWhere(Predicate)` | KafkaAwaitingPhase | [NOVÉ] | +| ImqFirstVision | `toQueue(ImqFirstVisionQueue)` | ImqPayloadPhase | [NOVÉ] | +| ImqFirstVision | `fromQueue(ImqFirstVisionQueue)` | ImqReceiveFilterPhase | [NOVÉ] | +| ImqPayloadPhase | `asXml()`, `asEbcdic()`, `asUtf8()` | ImqPayloadPhase | [NOVÉ] IBM MQ formát | +| ImqPayloadPhase | `withPayload`, `withPayloadFromFile`, ... | ImqPayloadPhase | [EXISTUJE] jako Kafka | +| ImqReceiveFilterPhase | `withSelector(String)` | ImqReceiveFilterPhase | [NOVÉ] JMS selector | +| KafkaAwaitingPhase | `withTimeout(long, TimeUnit)` | MessageResponse | [NOVÉ] | +| ImqReceiveFilterPhase | `receiveWhere(Predicate)` | ImqAwaitingPhase | [NOVÉ] | +| ImqAwaitingPhase | `withTimeout(long, TimeUnit)` | MessageResponse | [NOVÉ] | +| ImqReceiveFilterPhase | `browse(int maxMessages)` | List<ReceivedMessage> | [NOVÉ] terminál, nedestruktivní | +| MessageResponse | `andAssertFieldValue(path, value)` | MessageResponse | [EXISTUJE] RawRestRequest.Response.andAssertFieldValue | +| MessageResponse | `andAssertPresent(path)` | MessageResponse | [EXISTUJE] RawRestRequest.Response.andAssertPresent | +| MessageResponse | `andAssertNotPresent(path)` | MessageResponse | [EXISTUJE] RawRestRequest.Response.andAssertNotPresent | +| MessageResponse | `andAssertHeaderValue(name, value)` | MessageResponse | [NOVÉ] Kafka header / JMS property | +| MessageResponse | `andAssertBodyContains(substring)` | MessageResponse | [NOVÉ] EBCDIC/UTF-8 | +| MessageResponse | `andAssertWithAssertJ()` | ObjectAssert | [NOVÉ] nice to have | +| MessageResponse | `extract(path)` | JsonPathValue | [EXISTUJE] RawRestRequest.Response.extract, .asText() | +| MessageResponse | `mapTo(Class)` | T | [EXISTUJE] RawRestRequest.post(Class) -- deserializace | +| MessageResponse | `getBody()`, `getHeader(name)` | String | [EXISTUJE] Response přístup k datům | + +**Poznámka k řetězci receiveWhere:** U Kafky `receiveWhere(filter)` vrací objekt s `withTimeout()` – timeout je povinný před assert. U IBM MQ může být timeout defaultní nebo explicitní dle implementace. + +Vzor podobný `RawRestRequest` - fluent builder s fázovými interfaces: + +```java +// ============================================= +// === harness.withKafka() -- Kafka (Avro) ===== +// ============================================= +// Vstup: topic (schéma se resolví automaticky z Registry), klíč, tělo (JSON), headery + +// Odeslání zprávy na Kafka topic se standardními headery +harness.withKafka() + .toTopic("order-events") + .withKey("order-123") + .withPayload("{\"orderId\": \"123\", \"status\": \"CREATED\"}") + .withTraceparent("00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01") + .withRequestID("req-001") + .withActivityID("act-555") + .withSourceCodebookId("CB-01") + .send(); + +// Libovolné vlastní headery +harness.withKafka() + .toTopic("order-events") + .withKey("order-456") + .withPayload("{\"orderId\": \"456\", \"status\": \"SHIPPED\"}") + .withHeader("customHeader", "customValue") + .send(); + +// Odeslání z JSON souboru +harness.withKafka() + .toTopic("order-events") + .withKey("order-789") + .withPayloadFromFile("messaging/order_event.json") + .withRequestID("req-002") + .send(); + +// Příjem zprávy z Kafka s filtrem (Avro -> JSON konverze automaticky) +harness.withKafka() + .fromTopic("order-events") + .receiveWhere(msg -> msg.extract("orderId").equals("123")) + .withTimeout(30, TimeUnit.SECONDS) + .andAssertFieldValue("status", "CREATED"); + +// ============================================= +// === harness.withImqFirstVision() -- IBM MQ == +// ============================================= + +// --- JSON (výchozí formát) --- +// Tester používá enum ImqFirstVisionQueue -- fyzický název fronty se resolvuje z konfigurace + +// Odeslání JSON zprávy do IBM MQ fronty +harness.withImqFirstVision() + .toQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS) + .withPayload("{\"paymentId\": \"PAY-456\", \"result\": \"OK\"}") + .send(); + +// Příjem JSON zprávy z IBM MQ fronty +harness.withImqFirstVision() + .fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS) + .receiveWhere(msg -> msg.extract("paymentId").equals("PAY-456")) + .withTimeout(10, TimeUnit.SECONDS) + .andAssertFieldValue("result", "OK"); + +// --- XML --- + +// Odeslání XML zprávy +harness.withImqFirstVision() + .toQueue(ImqFirstVisionQueue.MF_REQUESTS) + .asXml() + .withPayload("12345BALANCE") + .send(); + +// Odeslání XML ze souboru +harness.withImqFirstVision() + .toQueue(ImqFirstVisionQueue.MF_REQUESTS) + .asXml() + .withPayloadFromFile("messaging/mf_request.xml") + .send(); + +// Příjem XML zprávy s XPath filtrací +harness.withImqFirstVision() + .fromQueue(ImqFirstVisionQueue.MF_RESPONSES) + .asXml() + .receiveWhere(msg -> msg.extract("/response/accountId").equals("12345")) + .withTimeout(15, TimeUnit.SECONDS) + .andAssertFieldValue("/response/balance", "50000"); + +// --- UTF-8 / CCSID 1208 (BytesMessage) --- + +// Odeslání zprávy v UTF-8 jako BytesMessage (CCSID 1208) +harness.withImqFirstVision() + .toQueue(ImqFirstVisionQueue.MF_UTF8) + .asUtf8() + .withPayload("DATA|12345|ÚČET|CZK") + .send(); + +// Příjem UTF-8 BytesMessage +harness.withImqFirstVision() + .fromQueue(ImqFirstVisionQueue.MF_UTF8) + .asUtf8() + .receiveWhere(msg -> msg.getBody().contains("12345")) + .withTimeout(15, TimeUnit.SECONDS); + +// --- EBCDIC / CCSID 870 --- + +// Odeslání zprávy v EBCDIC kódování (mainframe, CZ/SK znaky) +harness.withImqFirstVision() + .toQueue(ImqFirstVisionQueue.MF_EBCDIC) + .asEbcdic() + .withPayload("PŘIKAZ|12345|ZŮSTATEK|CZK") + .send(); + +// Příjem EBCDIC zprávy (automaticky dekódováno z IBM-870 na String) +harness.withImqFirstVision() + .fromQueue(ImqFirstVisionQueue.MF_EBCDIC) + .asEbcdic() + .receiveWhere(msg -> msg.getBody().contains("12345")) + .withTimeout(15, TimeUnit.SECONDS); +``` + +#### Inkrementální stavba payload pomocí addField / appendToArray + +Stejný vzor jako v existující třídě `RawRestRequest.Builder` (viz `addField()`, `appendToArray()` na řádcích 253-342 v [RawRestRequest.java](test-harness-master/src/main/java/cz/moneta/test/harness/support/rest/RawRestRequest.java)). Funguje pro JSON payloady (Kafka i IBM MQ JSON formát). Používá Jackson `ObjectMapper` pro manipulaci s JSON stromem. + +```java +// === Kafka: Sestavení payload přes addField === + +// Začátek s prázdným JSON objektem +harness.withKafka() + .toTopic("order-events") + .withKey("order-123") + .withPayload("{}") + .addField("orderId", "123") + .addField("status", "CREATED") + .addField("items", Arrays.asList( + Map.of("sku", "A001", "qty", 2), + Map.of("sku", "B002", "qty", 1))) + .addField("items[1]", "note", "fragile") + .withRequestID("req-001") + .send(); +// Výsledný payload: {"orderId":"123","status":"CREATED", +// "items":[{"sku":"A001","qty":2},{"sku":"B002","qty":1,"note":"fragile"}]} + +// Přidání do existujícího payloadu ze souboru +harness.withKafka() + .toTopic("order-events") + .withKey("order-456") + .withPayloadFromFile("messaging/order_event.json") + .addField("metadata", new Object()) + .addField("metadata", "source", "test-harness") + .addField("metadata", "timestamp", System.currentTimeMillis()) + .send(); + +// Přidání prvku do existujícího pole +harness.withKafka() + .toTopic("order-events") + .withKey("order-789") + .withPayloadFromFile("messaging/order_base.json") + .appendToArray("items", Map.of("sku", "C003", "qty", 5)) + .send(); + +// === IBM MQ: Sestavení JSON payload přes addField === + +harness.withImqFirstVision() + .toQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS) + .withPayload("{}") + .addField("paymentId", "PAY-456") + .addField("amount", 15000) + .addField("currency", "CZK") + .addField("beneficiary", new Object()) + .addField("beneficiary", "name", "Jan Novák") + .addField("beneficiary", "accountNumber", "1234567890/0100") + .send(); +``` + +Podporované metody na builder fázi po `withPayload()` / `withPayloadFromFile()`: + +- `addField(String fieldName, Object value)` -- přidá pole do kořene JSON +- `addField(String path, String fieldName, Object value)` -- přidá pole do vnořeného uzlu; `path` používá tečkovou notaci a hranatou závorku pro indexy polí (např. `"items[2].details"`) +- `appendToArray(String path, Object value)` -- přidá prvek na konec existujícího JSON pole +- `withPayloadFromTemplate(String templatePath, Map variables)` -- (nice to have) načte šablonu ze souboru a nahradí placeholdery (`${varName}`) hodnotami z mapy; hodnoty mohou pocházet z `harness.store()`. Pro složitější payloady bez nutnosti programově sestavovat JSON. + +Poznámka: `addField` a `appendToArray` fungují pouze pro JSON formát. Pro XML a EBCDIC/UTF8 formáty v IBM MQ se payload sestavuje jako celý String. + +#### Varianty výběru zprávy v receiveWhere + +##### Kafka -- receiveWhere + +Kafka `receiveWhere` přijímá `Predicate` -- libovolný Java predikát na deserializovanou zprávu (Avro -> JSON). Zprávy se prohledávají sekvenčně v polling smyčce. Vrátí se **první** zpráva, která vyhovuje predikátu. + +```java +// --- Filtrace podle obsahu JSON pole --- +harness.withKafka() + .fromTopic("order-events") + .receiveWhere(msg -> msg.extract("orderId").equals("123")) + .withTimeout(30, TimeUnit.SECONDS); + +// --- Filtrace podle více polí (AND) --- +harness.withKafka() + .fromTopic("order-events") + .receiveWhere(msg -> + msg.extract("orderId").equals("123") + && msg.extract("status").equals("CREATED")) + .withTimeout(30, TimeUnit.SECONDS); + +// --- Filtrace podle Kafka headeru --- +harness.withKafka() + .fromTopic("order-events") + .receiveWhere(msg -> "req-001".equals(msg.getHeader("requestID"))) + .withTimeout(30, TimeUnit.SECONDS); + +// --- Filtrace podle klíče zprávy --- +harness.withKafka() + .fromTopic("order-events") + .receiveWhere(msg -> "order-123".equals(msg.getKey())) + .withTimeout(30, TimeUnit.SECONDS); + +// --- Filtrace kombinací headeru a obsahu --- +harness.withKafka() + .fromTopic("order-events") + .receiveWhere(msg -> + "act-555".equals(msg.getHeader("activityID")) + && msg.extract("status").equals("SHIPPED")) + .withTimeout(30, TimeUnit.SECONDS); + +// --- Příjem první dostupné zprávy (bez filtru) --- +harness.withKafka() + .fromTopic("order-events") + .receiveWhere(msg -> true) + .withTimeout(10, TimeUnit.SECONDS); + +// --- Příjem s vlastním timeoutem --- +harness.withKafka() + .fromTopic("order-events") + .receiveWhere(msg -> msg.extract("orderId").equals("123")) + .withTimeout(60, TimeUnit.SECONDS); // delší timeout pro pomalé systémy +``` + +ReceivedMessage metody dostupné v predikátu (Kafka): + +- `msg.extract("dotPath")` -- hodnota JSON pole (dot/bracket notace) +- `msg.getHeader("headerName")` -- hodnota Kafka headeru +- `msg.getKey()` -- klíč zprávy +- `msg.getBody()` -- celé tělo jako JSON String +- `msg.getTimestamp()` -- timestamp zprávy + +##### IBM MQ -- receiveWhere + +IBM MQ podporuje dvě úrovně filtrace: + +1. **JMS Message Selector** (server-side) -- efektivní filtr na JMS properties/headery, vyhodnocuje se na straně MQ serveru +2. **Predicate na obsahu** (client-side) -- filtr na tělo zprávy po deserializaci, vyhodnocuje se v klientu + +```java +// --- Filtrace podle obsahu JSON pole --- +harness.withImqFirstVision() + .fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS) + .receiveWhere(msg -> msg.extract("paymentId").equals("PAY-456")) + .withTimeout(10, TimeUnit.SECONDS); + +// --- Filtrace podle JMS CorrelationID (server-side selector) --- +harness.withImqFirstVision() + .fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS) + .withSelector("JMSCorrelationID = 'corr-789'") + .receiveWhere(msg -> true) + .withTimeout(10, TimeUnit.SECONDS); + +// --- Kombinace JMS selectoru a content filtru --- +harness.withImqFirstVision() + .fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS) + .withSelector("JMSType = 'payment'") + .receiveWhere(msg -> msg.extract("amount").equals("15000")) + .withTimeout(10, TimeUnit.SECONDS); + +// --- XML zprávy s XPath filtrací --- +harness.withImqFirstVision() + .fromQueue(ImqFirstVisionQueue.MF_RESPONSES) + .asXml() + .receiveWhere(msg -> msg.extract("/response/accountId").equals("12345")) + .withTimeout(15, TimeUnit.SECONDS); + +// --- EBCDIC / UTF-8 zprávy -- filtrace přes raw body --- +harness.withImqFirstVision() + .fromQueue(ImqFirstVisionQueue.MF_EBCDIC) + .asEbcdic() + .receiveWhere(msg -> msg.getBody().contains("12345")) + .withTimeout(15, TimeUnit.SECONDS); + +// --- Příjem první dostupné zprávy --- +harness.withImqFirstVision() + .fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS) + .receiveWhere(msg -> true) + .withTimeout(10, TimeUnit.SECONDS); + +// --- Browse (ne-destruktivní čtení, zprávy zůstávají ve frontě) --- +List peeked = harness.withImqFirstVision() + .fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS) + .browse(10); // max 10 zpráv z fronty +// s JMS selektorem: .withSelector("JMSType = 'payment'").browse(10) +``` + +ReceivedMessage metody dostupné v predikátu (IBM MQ): + +- `msg.extract("expression")` -- hodnota pole (JSON dot-path pro JSON, XPath pro XML) +- `msg.getBody()` -- celé tělo zprávy jako String +- `msg.getHeader("jmsProperty")` -- hodnota JMS property + +Klíčový rozdíl: `.withSelector()` je server-side filtr (SQL-92 subset, filtruje JMS properties) -- použijte ho pro efektivní filtraci podle korelačních ID. `.receiveWhere()` je client-side filtr na obsahu zprávy. + +#### Chybové stavy -- výjimky a assert metody + +##### Výjimky (propagovány okamžitě, test failuje) + +Tyto chyby signalizují infrastrukturní problém nebo chybu konfigurace -- test nemůže pokračovat: + +```java +// --- Chyba připojení (Kafka i IBM MQ) --- +// KafkaConnectionException / JMSException obalené do RuntimeException +// Příčiny: špatný bootstrap-servers, connection-name-list, credentials, síťový problém +try { + harness.withKafka() + .toTopic("order-events") + .withKey("order-123") + .withPayload("{\"orderId\": \"123\"}") + .send(); +} catch (MessagingConnectionException e) { + // "Failed to connect to Kafka cluster at pkc-xxxxx:9092: Authentication failed" + // "Failed to connect to IBM MQ: mq9multitst5x(1414),mq9multitst6x(1414) - MQRC 2035 (NOT_AUTHORIZED)" +} + +// --- Chyba schématu (Kafka) --- +// Schema Registry vrátí 404 nebo payload neodpovídá schématu +try { + harness.withKafka() + .toTopic("nonexistent-topic") + .withKey("key") + .withPayload("{\"unknownField\": \"value\"}") + .send(); +} catch (MessagingSchemaException e) { + // "Schema not found for subject 'nonexistent-topic-value' in Schema Registry" + // "Payload does not conform to Avro schema for topic 'order-events': field 'orderId' is required" +} + +// --- Queue neexistuje (IBM MQ) --- +// Výjimka nastane i při chybné konfiguraci logického názvu v properties +try { + harness.withImqFirstVision() + .toQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS) // resolvuje se z konfigurace + .withPayload("{}") + .send(); +} catch (MessagingDestinationException e) { + // "Queue 'MVSW2TST3.DELIVERY.NOTIFICATION' not found: MQRC 2085 (UNKNOWN_OBJECT_NAME)" +} + +// --- Timeout při příjmu (Kafka i IBM MQ) --- +// Žádná zpráva nevyhověla predikátu v daném čase +try { + harness.withKafka() + .fromTopic("order-events") + .receiveWhere(msg -> msg.extract("orderId").equals("nonexistent")) + .withTimeout(5, TimeUnit.SECONDS); +} catch (MessagingTimeoutException e) { + // "No message matching filter found on topic 'order-events' within 5 seconds" + // "No message matching filter found on queue 'PAYMENT.NOTIFICATIONS' within 10 seconds" +} + +// --- Chyba konfigurace (endpoint není nakonfigurován) --- +try { + harness.withKafka() + .toTopic("order-events") + .withKey("key") + .withPayload("{}") + .send(); +} catch (IllegalStateException e) { + // "You need to configure endpoints.kafka.bootstrap-servers" + // Stejný vzor jako u Wso2GatewayEndpoint +} +``` + +Hierarchie výjimek: + +``` +MessagingException (abstract, extends RuntimeException) +├── MessagingConnectionException -- selhání připojení, autentizace +├── MessagingSchemaException -- Avro schema mismatch, schema not found +├── MessagingDestinationException -- topic/queue neexistuje, přístupová práva +└── MessagingTimeoutException -- receive timeout, žádná matching zpráva +``` + +##### Assert metody (fluent, na přijaté zprávě) + +Tyto metody se volají na výsledku `receiveWhere` -- zpráva byla nalezena a nyní se ověřuje její obsah. Assert selhání = `AssertionError` (standardní JUnit pattern). + +```java +// --- JSON assert metody (Kafka i IBM MQ JSON) --- + +harness.withKafka() + .fromTopic("order-events") + .receiveWhere(msg -> msg.extract("orderId").equals("123")) + .withTimeout(30, TimeUnit.SECONDS) + // Assert na hodnotu pole (dot/bracket notace) + .andAssertFieldValue("status", "CREATED") + .andAssertFieldValue("items[0].sku", "A001") + // Assert na přítomnost/nepřítomnost pole + .andAssertPresent("items[0].qty") + .andAssertNotPresent("deletedField") + // Assert na Kafka header + .andAssertHeaderValue("requestID", "req-001") + // Extrakce hodnoty pro další použití v testu + .extract("internalId").asText(); // -> String pro uložení do harness.store() + +// --- XML assert metody (IBM MQ XML) --- + +harness.withImqFirstVision() + .fromQueue(ImqFirstVisionQueue.MF_RESPONSES) + .asXml() + .receiveWhere(msg -> msg.extract("/response/accountId").equals("12345")) + .withTimeout(15, TimeUnit.SECONDS) + // XPath assert + .andAssertFieldValue("/response/balance", "50000") + .andAssertFieldValue("/response/currency", "CZK") + .andAssertPresent("/response/timestamp") + .andAssertNotPresent("/response/error"); + +// --- Raw body assert (EBCDIC/UTF-8) --- + +harness.withImqFirstVision() + .fromQueue(ImqFirstVisionQueue.MF_EBCDIC) + .asEbcdic() + .receiveWhere(msg -> msg.getBody().contains("12345")) + .withTimeout(15, TimeUnit.SECONDS) + .andAssertBodyContains("ZŮSTATEK"); + +// --- Deserializace do Java objektu (mapTo) --- + +PaymentNotification notif = harness.withImqFirstVision() + .fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS) + .receiveWhere(msg -> msg.extract("paymentId").equals("PAY-456")) + .withTimeout(10, TimeUnit.SECONDS) + .mapTo(PaymentNotification.class); +``` + +Kompletní tabulka assert metod na `MessageResponse`: + +- `andAssertFieldValue(String path, String value)` -- JSON dot-path nebo XPath +- `andAssertPresent(String path)` -- ověří existenci pole +- `andAssertNotPresent(String path)` -- ověří neexistenci pole +- `andAssertHeaderValue(String headerName, String value)` -- Kafka header nebo JMS property +- `andAssertBodyContains(String substring)` -- raw body contains (pro EBCDIC/UTF-8/raw text) +- `andAssertWithAssertJ()` -- *(nice to have)* vrací AssertJ `AbstractObjectAssert` pro komplexní fluent assertions . +- `extract(String path)` -- extrahuje hodnotu pro další použití +- `mapTo(Class type)` -- deserializace těla zprávy do Java objektu (Jackson/JAXB) +- `getBody()` -- raw body String +- `getHeader(String name)` -- raw header/property value + +### 5. Konfigurace + +Konfigurační klíče dodržují konvenci existujících endpointů v Harness: prefix `endpoints.{system-name}.{property}` (viz `endpoints.wso2.gw.url`, `endpoints.udebs.url` atd.). + +#### Properties soubory (envs/tst1): + +```properties +# Kafka Confluent Cloud (Avro + Schema Registry) +endpoints.kafka.bootstrap-servers=pkc-xxxxx.eu-central-1.aws.confluent.cloud:9092 +endpoints.kafka.security-protocol=SASL_SSL +endpoints.kafka.sasl-mechanism=PLAIN +endpoints.kafka.schema-registry-url=https://psrc-xxxxx.eu-central-1.aws.confluent.cloud +endpoints.kafka.value-serializer=avro +# API key + secret a Schema Registry credentials se načítají z Vaultu + +# IBM MQ - First Vision (multi-instance QMGR) +endpoints.imq-first-vision.connection-name-list=mq9multitst5x(1414),mq9multitst6x(1414) +endpoints.imq-first-vision.channel=CLIENT.CHANNEL +endpoints.imq-first-vision.queue-manager=MVSW2TST3 +endpoints.imq-first-vision.ssl-cipher-suite=TLS_RSA_WITH_AES_256_CBC_SHA256 +# user + heslo se načítají z Vaultu + +# Logické destinace (musí odpovídat enum ImqFirstVisionQueue) +endpoints.imq-first-vision.payment-notifications.queue=MVSW2TST3.DELIVERY.NOTIFICATION +endpoints.imq-first-vision.payment-request.queue=MVSW2TST3.DELIVERY.REQUEST +endpoints.imq-first-vision.mf-requests.queue=MVSW2TST3.MF.REQUESTS +endpoints.imq-first-vision.mf-responses.queue=MVSW2TST3.MF.RESPONSES +endpoints.imq-first-vision.mf-ebcdic.queue=MVSW2TST3.MF.EBCDIC +endpoints.imq-first-vision.mf-utf8.queue=MVSW2TST3.MF.UTF8 +``` + +#### Vault paths: + +```properties +vault.kafka.secrets.path=/kv/autotesty/tst1/kafka +vault.imq-first-vision.secrets.path=/kv/autotesty/tst1/imq-first-vision +``` + +### 6. Maven závislosti + +V [test-harness-master/pom.xml](test-harness-master/pom.xml) přidat: + +- `org.apache.kafka:kafka-clients` (Kafka producer/consumer) +- `io.confluent:kafka-avro-serializer` (Avro + Schema Registry) +- `com.ibm.mq:com.ibm.mq.allclient` (IBM MQ JMS klient) +- `jakarta.jms:jakarta.jms-api` (JMS API) +- `org.assertj:assertj-core` (pro `andAssertWithAssertJ()`, nice to have ) + +### 7. Řešení klíčových problémů + +#### Poll cyklus a hledání zprávy v Kafce + +```mermaid +flowchart TD + Start["receive(topic, filter, timeout)"] --> CreateConsumer["Vytvořit KafkaConsumer bez group.id"] + CreateConsumer --> Assign["assign() na všechny partitions"] + Assign --> SeekEnd["seekToEnd() pro všechny partitions"] + SeekEnd --> SavePos["Uložit aktuální pozice"] + SavePos --> Poll["poll(pollInterval)"] + Poll --> Check{"Jsou nějaké záznamy?"} + Check -->|Ano| Filter{"Odpovídá filter?"} + Check -->|Ne| Timeout{"Vypršel timeout?"} + Filter -->|Ano| Return["Vrátit ReceivedMessage"] + Filter -->|Ne| Timeout + Timeout -->|Ne| Backoff["Backoff wait"] + Backoff --> Poll + Timeout -->|Ano| Throw["Throw MessagingTimeoutException"] +``` + + + +Klíčové body: + +- `seekToEnd()` se volá **před** triggerem akce v testu (tester typicky: 1. připraví listener, 2. provede akci, 3. čeká na zprávu) +- Alternativně: seek na pozici uloženou **před** testem +- Polling interval: 100ms, exponential backoff do max 1s +- Default timeout: 30s (konfigurovatelný) + +#### Izolace testů (consumer groups) + +**Kafka:** + +- Consumer **bez `group.id`** s manuálním `assign()` + `seek()` +- Každý příjem zprávy vytvoří nový Consumer, přečte data a zavře ho +- Žádné sdílení offsetů mezi testy ani vlákny +- Alternativa pro komplexní scénáře: unikátní `group.id` per test (UUID) + +**IBM MQ:** + +- Pro nedestruktivní čtení: `QueueBrowser` (browse bez odstranění zprávy) +- JMS message selector (`JMSCorrelationID = 'xxx'`) pro cílený příjem +- Každý test si filtruje zprávy podle korelačních údajů + +### 8. Souhrnný seznam souborů k vytvoření/úpravě + +#### Nové soubory v test-harness-master: + +- `connectors/messaging/KafkaConnector.java` +- `connectors/messaging/IbmMqConnector.java` +- `endpoints/kafka/KafkaEndpoint.java` +- `endpoints/imq/ImqFirstVisionEndpoint.java` +- `endpoints/imq/ImqFirstVisionQueue.java` (enum: logické názvy front) +- `support/messaging/ReceivedMessage.java` +- `support/messaging/MessageContentType.java` (enum: JSON, XML, RAW_TEXT) +- `support/messaging/MqMessageFormat.java` (enum: JSON, XML, EBCDIC_870, UTF8_1208) +- `support/messaging/MessageResponse.java` (interface: assert, extract, mapTo -- sdílený pro Kafka i IBM MQ) +- `support/messaging/KafkaRequest.java` (fluent builder pro Kafka) +- `support/messaging/ImqRequest.java` (fluent builder pro IBM MQ) +- `messaging/exception/MessagingException.java` (abstract, extends RuntimeException) +- `messaging/exception/MessagingConnectionException.java` +- `messaging/exception/MessagingSchemaException.java` +- `messaging/exception/MessagingDestinationException.java` +- `messaging/exception/MessagingTimeoutException.java` + +#### Nové soubory v test-master: + +- `dsl/kafka/Kafka.java` (DSL třída pro `withKafka()`) +- `dsl/imq/ImqFirstVision.java` (DSL třída pro `withImqFirstVision()`) + +#### Úpravy existujících souborů: + +- `test-harness-master/pom.xml` - přidat závislosti Kafka + IBM MQ +- `test-master/pom.xml` - případné závislosti +- `test-master/.../Harness.java` - přidat `withKafka()` a `withImqFirstVision()` metody +- `test-master/src/test/resources/envs/tst1` - přidat `endpoints.kafka.`* a `endpoints.imq-first-vision.`* +- `test-master/src/test/resources/envs/ppe` - přidat `endpoints.kafka.`* a `endpoints.imq-first-vision.`* + diff --git a/test-harness/pom.xml b/test-harness/pom.xml index 8703504..886b8c2 100644 --- a/test-harness/pom.xml +++ b/test-harness/pom.xml @@ -29,6 +29,8 @@ 1.9.3 1.6 4.0.3 + 9.4.5.0 + 2.0.1 @@ -285,6 +287,18 @@ ${cxf.version} + + + com.ibm.mq + com.ibm.mq.allclient + ${ibm.mq.version} + + + javax.jms + javax.jms-api + ${javax.jms.version} + + javax.servlet diff --git a/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/IbmMqConnector.java b/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/IbmMqConnector.java new file mode 100644 index 0000000..7684210 --- /dev/null +++ b/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/IbmMqConnector.java @@ -0,0 +1,447 @@ +package cz.moneta.test.harness.connectors.messaging; + +import cz.moneta.test.harness.connectors.Connector; +import cz.moneta.test.harness.messaging.MessageContentType; +import cz.moneta.test.harness.messaging.MqMessageFormat; +import cz.moneta.test.harness.messaging.ReceivedMessage; +import cz.moneta.test.harness.messaging.exception.MessagingConnectionException; +import cz.moneta.test.harness.messaging.exception.MessagingDestinationException; +import cz.moneta.test.harness.messaging.exception.MessagingTimeoutException; +import com.ibm.mq.jms.MQConnectionFactory; +import com.ibm.msg.client.wmq.WMQConstants; +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import javax.jms.*; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * IBM MQ connector using JMS client with Jakarta JMS API. + * Supports multi-instance Queue Manager, SSL/TLS, and multiple message formats. + *

+ * Supported formats: + * - JSON: JMS TextMessage with plain JSON string (default) + * - XML: JMS TextMessage with XML string + * - UTF-8 (CCSID 1208): JMS BytesMessage with UTF-8 encoding + * - EBCDIC (CCSID 870): JMS BytesMessage with EBCDIC IBM-870 encoding + */ +public class IbmMqConnector implements Connector { + + private static final Logger LOG = LogManager.getLogger(IbmMqConnector.class); + + private static final Charset EBCDIC_870 = Charset.forName("IBM870"); + private static final Charset UTF_8 = StandardCharsets.UTF_8; + + private static final long DEFAULT_POLL_INTERVAL_MS = 100; + private static final long DEFAULT_MAX_POLL_INTERVAL_MS = 1000; + + private final MQConnectionFactory connectionFactory; + private JMSContext jmsContext; + private final String queueManager; + + /** + * Constructor with multi-instance Queue Manager support. + * + * @param connectionNameList Connection name list in format "host1(port1),host2(port2)" + * @param channel MQ channel name + * @param queueManager Queue Manager name + * @param user Username for authentication + * @param password Password for authentication + * @param keystorePath Path to SSL keystore (can be null for non-SSL) + * @param keystorePassword Password for SSL keystore + * @param sslCipherSuite SSL cipher suite to use (e.g., "TLS_RSA_WITH_AES_256_CBC_SHA256") + */ + public IbmMqConnector(String connectionNameList, String channel, String queueManager, + String user, String password, + String keystorePath, String keystorePassword, String sslCipherSuite) { + this.queueManager = queueManager; + + try { + MQConnectionFactory cf = new MQConnectionFactory(); + + // Set connection name list for multi-instance QMGR + cf.setChannel(channel); + cf.setQueueManager(queueManager); + cf.setConnectionNameList(connectionNameList); + cf.setTransportType(WMQConstants.WMQ_CM_CLIENT); + + // Set authentication + cf.setStringProperty(WMQConstants.USERID, user); + cf.setStringProperty(WMQConstants.PASSWORD, password); + + // SSL configuration if keystore is provided + if (StringUtils.isNotBlank(keystorePath)) { + System.setProperty("javax.net.ssl.keyStore", keystorePath); + System.setProperty("javax.net.ssl.trustStore", keystorePath); + if (StringUtils.isNotBlank(keystorePassword)) { + System.setProperty("javax.net.ssl.keyStorePassword", keystorePassword); + System.setProperty("javax.net.ssl.trustStorePassword", keystorePassword); + } + if (StringUtils.isNotBlank(sslCipherSuite)) { + cf.setSSLCipherSuite(sslCipherSuite); + } + } + + this.connectionFactory = cf; + + // Initialize JMS context + connect(); + + } catch (Exception e) { + throw new MessagingConnectionException( + "Failed to create IBM MQ connection to " + queueManager, e); + } + } + + /** + * Connect to IBM MQ. + */ + private void connect() { + try { + this.jmsContext = connectionFactory.createContext(); + this.jmsContext.start(); + LOG.info("Connected to IBM MQ: {}", queueManager); + } catch (Exception e) { + throw new MessagingConnectionException( + "Failed to connect to IBM MQ: " + queueManager + " - " + e.getMessage(), e); + } + } + + /** + * Send a JSON or XML message as TextMessage. + */ + private void sendTextMessage(String queueName, String payload, Map properties) { + javax.jms.Queue queue = getQueue(queueName); + + TextMessage message = jmsContext.createTextMessage(payload); + + // Set JMS properties + if (properties != null) { + for (Map.Entry entry : properties.entrySet()) { + try { + message.setStringProperty(entry.getKey(), entry.getValue()); + } catch (JMSException e) { + LOG.warn("Failed to set property: {}", entry.getKey(), e); + } + } + } + + try { + jmsContext.createProducer().send(queue, message); + } catch (RuntimeException e) { + throw new MessagingDestinationException("Failed to send message to queue: " + queueName, e); + } + LOG.debug("Sent JSON/XML message to queue: {}", queueName); + } + + /** + * Send a message as BytesMessage with specific encoding and CCSID. + */ + private void sendBytesMessage(String queueName, String payload, Charset charset, + int ccsid, Map properties) { + javax.jms.Queue queue = getQueue(queueName); + + BytesMessage message = jmsContext.createBytesMessage(); + + // Convert payload to bytes using specified charset + byte[] bytes = payload.getBytes(charset); + try { + message.writeBytes(bytes); + message.setIntProperty("CCSID", ccsid); + } catch (JMSException e) { + throw new MessagingDestinationException("Failed to create bytes message", e); + } + + // Set JMS properties + if (properties != null) { + for (Map.Entry entry : properties.entrySet()) { + try { + message.setStringProperty(entry.getKey(), entry.getValue()); + } catch (JMSException e) { + LOG.warn("Failed to set property: {}", entry.getKey(), e); + } + } + } + + try { + jmsContext.createProducer().send(queue, message); + } catch (RuntimeException e) { + throw new MessagingDestinationException("Failed to send message to queue: " + queueName, e); + } + LOG.debug("Sent {} message to queue: {}", charset, queueName); + } + + /** + * Send a message to a queue with specified format. + * + * @param queueName Queue name + * @param payload Message payload + * @param format Message format (JSON, XML, EBCDIC_870, UTF8_1208) + * @param properties JMS properties to set + */ + public void send(String queueName, String payload, MqMessageFormat format, + Map properties) { + switch (format) { + case JSON, XML -> sendTextMessage(queueName, payload, properties); + case EBCDIC_870 -> sendBytesMessage(queueName, payload, EBCDIC_870, 870, properties); + case UTF8_1208 -> sendBytesMessage(queueName, payload, UTF_8, 1208, properties); + } + } + + /** + * Receive a message from a queue with timeout. + * + * @param queueName Queue name + * @param messageSelector JMS message selector (optional) + * @param format Expected message format + * @param timeout Timeout duration + * @return Received message + */ + public ReceivedMessage receive(String queueName, String messageSelector, + MqMessageFormat format, java.time.Duration timeout) { + long timeoutMs = timeout.toMillis(); + + javax.jms.Queue queue = getQueue(queueName); + MessageConsumer consumer = (MessageConsumer) (messageSelector == null || messageSelector.isBlank() + ? jmsContext.createConsumer(queue) + : jmsContext.createConsumer(queue, messageSelector)); + + AtomicBoolean messageFound = new AtomicBoolean(false); + ReceivedMessage received = null; + + long pollInterval = DEFAULT_POLL_INTERVAL_MS; + long remainingTime = timeoutMs; + + try { + while (remainingTime > 0 && !messageFound.get()) { + Message message = consumer.receive(remainingTime); + + if (message != null) { + received = decodeMessage(message, queueName, format); + messageFound.set(true); + } else { + // Exponential backoff + pollInterval = Math.min(pollInterval * 2, DEFAULT_MAX_POLL_INTERVAL_MS); + remainingTime -= pollInterval; + } + } + + if (received == null) { + throw new MessagingTimeoutException( + "No message matching filter found on queue '" + queueName + + "' within " + timeout.toMillis() + "ms"); + } + + return received; + + } catch (MessagingTimeoutException e) { + throw e; + } catch (Exception e) { + throw new MessagingDestinationException("Failed to receive message from queue: " + queueName, e); + } finally { + try { + consumer.close(); + } catch (JMSException e) { + LOG.warn("Failed to close consumer", e); + } + } + } + + /** + * Browse a queue (non-destructive read). + * + * @param queueName Queue name + * @param messageSelector JMS message selector (optional) + * @param format Expected message format + * @param maxMessages Maximum number of messages to browse + * @return List of received messages + */ + public List browse(String queueName, String messageSelector, + MqMessageFormat format, int maxMessages) { + List messages = new ArrayList<>(); + + javax.jms.Queue queue = getQueue(queueName); + MessageConsumer consumer = (MessageConsumer) (messageSelector == null || messageSelector.isBlank() + ? jmsContext.createConsumer(queue) + : jmsContext.createConsumer(queue, messageSelector)); + + int count = 0; + try { + while (count < maxMessages) { + Message message = consumer.receiveNoWait(); + + if (message == null) { + break; + } + + ReceivedMessage received = decodeMessage(message, queueName, format); + messages.add(received); + count++; + } + + return messages; + + } catch (Exception e) { + throw new MessagingDestinationException("Failed to browse queue: " + queueName, e); + } finally { + try { + consumer.close(); + } catch (JMSException e) { + LOG.warn("Failed to close consumer", e); + } + } + } + + /** + * Decode a JMS message to ReceivedMessage. + */ + private ReceivedMessage decodeMessage(Message jmsMessage, String queueName, MqMessageFormat format) { + long timestamp; + try { + timestamp = jmsMessage.getJMSTimestamp(); + } catch (JMSException e) { + timestamp = System.currentTimeMillis(); + } + if (timestamp == 0) { + timestamp = System.currentTimeMillis(); + } + + String body; + MessageContentType contentType; + Map headers = new HashMap<>(); + + // Extract JMS properties as headers + extractJmsProperties(jmsMessage, headers); + + if (jmsMessage instanceof TextMessage textMessage) { + try { + body = textMessage.getText(); + } catch (JMSException e) { + throw new RuntimeException("Failed to read text message body", e); + } + contentType = switch (format) { + case XML -> MessageContentType.XML; + default -> MessageContentType.JSON; + }; + } else if (jmsMessage instanceof BytesMessage bytesMessage) { + int ccsid; + try { + ccsid = bytesMessage.getIntProperty("CCSID"); + } catch (JMSException e) { + ccsid = 1208; // default UTF-8 + } + body = decodeBytesMessage(bytesMessage, ccsid); + contentType = MessageContentType.RAW_TEXT; + } else { + try { + throw new IllegalArgumentException("Unsupported message type: " + jmsMessage.getJMSType()); + } catch (JMSException e) { + throw new IllegalArgumentException("Unsupported message type", e); + } + } + + return new ReceivedMessage(body, contentType, headers, timestamp, queueName, null); + } + + /** + * Decode BytesMessage body based on CCSID. + */ + private String decodeBytesMessage(BytesMessage bytesMessage, int ccsid) { + try { + long bodyLength; + try { + bodyLength = bytesMessage.getBodyLength(); + } catch (JMSException e) { + throw new RuntimeException("Failed to get message body length", e); + } + byte[] data = new byte[(int) bodyLength]; + bytesMessage.readBytes(data); + + Charset charset = switch (ccsid) { + case 870 -> EBCDIC_870; + case 1208 -> UTF_8; + default -> UTF_8; + }; + + return new String(data, charset); + } catch (JMSException e) { + throw new RuntimeException("Failed to read BytesMessage body", e); + } + } + + /** + * Extract JMS properties as headers. + */ + @SuppressWarnings("unchecked") + private void extractJmsProperties(Message message, Map headers) { + try { + // Common JMS headers + headers.put("JMSMessageID", message.getJMSMessageID()); + try { + headers.put("JMSType", message.getJMSType() != null ? message.getJMSType() : ""); + } catch (JMSException e) { + headers.put("JMSType", ""); + } + try { + headers.put("JMSDestination", message.getJMSDestination() != null ? + message.getJMSDestination().toString() : ""); + } catch (JMSException e) { + headers.put("JMSDestination", ""); + } + try { + headers.put("JMSDeliveryMode", String.valueOf(message.getJMSDeliveryMode())); + } catch (JMSException e) { + headers.put("JMSDeliveryMode", ""); + } + try { + headers.put("JMSPriority", String.valueOf(message.getJMSPriority())); + } catch (JMSException e) { + headers.put("JMSPriority", ""); + } + try { + headers.put("JMSTimestamp", String.valueOf(message.getJMSTimestamp())); + } catch (JMSException e) { + headers.put("JMSTimestamp", ""); + } + + // Extract custom properties + Enumeration propertyNames = (Enumeration) message.getPropertyNames(); + while (propertyNames.hasMoreElements()) { + String propName = propertyNames.nextElement(); + Object propValue = message.getObjectProperty(propName); + if (propValue != null) { + headers.put(propName, propValue.toString()); + } + } + } catch (JMSException e) { + LOG.warn("Failed to extract JMS properties", e); + } + } + + /** + * Get Queue object from queue name. + */ + private javax.jms.Queue getQueue(String queueName) { + return jmsContext.createQueue(queueName); + } + + @Override + public void close() { + if (jmsContext != null) { + try { + jmsContext.close(); + LOG.info("Closed connection to IBM MQ: {}", queueManager); + } catch (Exception e) { + LOG.error("Failed to close IBM MQ connection", e); + } + } + } +} diff --git a/test-harness/src/main/java/cz/moneta/test/harness/endpoints/imq/ImqFirstVisionEndpoint.java b/test-harness/src/main/java/cz/moneta/test/harness/endpoints/imq/ImqFirstVisionEndpoint.java new file mode 100644 index 0000000..350d663 --- /dev/null +++ b/test-harness/src/main/java/cz/moneta/test/harness/endpoints/imq/ImqFirstVisionEndpoint.java @@ -0,0 +1,210 @@ +package cz.moneta.test.harness.endpoints.imq; + +import cz.moneta.test.harness.connectors.messaging.IbmMqConnector; +import cz.moneta.test.harness.context.StoreAccessor; +import cz.moneta.test.harness.endpoints.Endpoint; +import cz.moneta.test.harness.messaging.MqMessageFormat; +import cz.moneta.test.harness.messaging.ReceivedMessage; +import cz.moneta.test.harness.connectors.VaultConnector; +import cz.moneta.test.harness.support.auth.Credentials; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.time.Duration; +import java.util.List; +import java.util.Optional; + +/** + * IBM MQ First Vision endpoint. + * Provides high-level access to IBM MQ queues with configuration from StoreAccessor. + *

+ * Credentials are loaded from HashiCorp Vault. + */ +public class ImqFirstVisionEndpoint implements Endpoint { + + private static final Logger LOG = LogManager.getLogger(ImqFirstVisionEndpoint.class); + + private final IbmMqConnector connector; + private final StoreAccessor store; + + // Configuration keys + private static final String CONNECTION_NAME_LIST_KEY = "endpoints.imq-first-vision.connection-name-list"; + private static final String CHANNEL_KEY = "endpoints.imq-first-vision.channel"; + private static final String QUEUE_MANAGER_KEY = "endpoints.imq-first-vision.queue-manager"; + private static final String SSL_CIPHER_SUITE_KEY = "endpoints.imq-first-vision.ssl-cipher-suite"; + private static final String VAULT_PATH_KEY = "vault.imq-first-vision.secrets.path"; + + /** + * Constructor that reads configuration from StoreAccessor. + */ + public ImqFirstVisionEndpoint(StoreAccessor store) { + this.store = store; + + // Read configuration + String connectionNameList = getConfig(CONNECTION_NAME_LIST_KEY); + String channel = getConfig(CHANNEL_KEY); + String queueManager = getConfig(QUEUE_MANAGER_KEY); + String sslCipherSuite = getConfig(SSL_CIPHER_SUITE_KEY); + + // Load credentials from Vault + String vaultPath = getVaultPath(); + Credentials credentials = loadCredentialsFromVault(vaultPath); + + // SSL configuration (optional) + String keystorePath = null; + String keystorePassword = null; + + try { + this.connector = new IbmMqConnector( + connectionNameList, + channel, + queueManager, + credentials.getUsername(), + credentials.getPassword(), + keystorePath, + keystorePassword, + sslCipherSuite + ); + + LOG.info("Initialized IBM MQ First Vision endpoint for queue manager: {}", queueManager); + + } catch (Exception e) { + throw new IllegalStateException("Failed to initialize IBM MQ endpoint", e); + } + } + + /** + * Get a configuration value from StoreAccessor. + */ + private String getConfig(String key) { + return Optional.ofNullable(store.getConfig(key)) + .orElseThrow(() -> new IllegalStateException( + "You need to configure " + key)); + } + + /** + * Get vault path from configuration. + */ + private String getVaultPath() { + return Optional.ofNullable(store.getConfig(VAULT_PATH_KEY)) + .orElseThrow(() -> new IllegalStateException( + "You need to configure " + VAULT_PATH_KEY)); + } + + /** + * Load credentials from HashiCorp Vault. + */ + private Credentials loadCredentialsFromVault(String vaultPath) { + try { + // Get vault URL from configuration + String vaultUrl = getConfig("vault.url"); + String vaultUser = getConfig("vault.user"); + String vaultPassword = getConfig("vault.password"); + + VaultConnector vaultConnector = new VaultConnector(vaultUrl, vaultUser, vaultPassword); + + Optional credentials = vaultConnector.getUsernameAndPassword(vaultPath); + + return credentials.orElseThrow(() -> new IllegalStateException( + "Credentials not found in Vault at path: " + vaultPath)); + + } catch (Exception e) { + throw new IllegalStateException("Failed to load credentials from Vault", e); + } + } + + /** + * Send a message to a queue. + * + * @param queueName Physical queue name or logical name (from ImqFirstVisionQueue) + * @param payload Message payload + * @param format Message format + * @param properties JMS properties + */ + public void send(String queueName, String payload, MqMessageFormat format, + java.util.Map properties) { + connector.send(queueName, payload, format, properties); + } + + /** + * Send a message to a queue using logical queue name. + */ + public void send(ImqFirstVisionQueue queue, String payload, MqMessageFormat format, + java.util.Map properties) { + String physicalQueueName = resolveQueue(queue); + connector.send(physicalQueueName, payload, format, properties); + } + + /** + * Receive a message from a queue. + * + * @param queueName Physical queue name or logical name + * @param messageSelector JMS message selector (optional) + * @param format Expected message format + * @param timeout Timeout duration + * @return Received message + */ + public ReceivedMessage receive(String queueName, String messageSelector, + MqMessageFormat format, Duration timeout) { + return connector.receive(queueName, messageSelector, format, timeout); + } + + /** + * Receive a message from a queue using logical queue name. + */ + public ReceivedMessage receive(ImqFirstVisionQueue queue, String messageSelector, + MqMessageFormat format, Duration timeout) { + String physicalQueueName = resolveQueue(queue); + return connector.receive(physicalQueueName, messageSelector, format, timeout); + } + + /** + * Browse a queue (non-destructive read). + * + * @param queueName Physical queue name or logical name + * @param messageSelector JMS message selector (optional) + * @param format Expected message format + * @param maxMessages Maximum number of messages + * @return List of received messages + */ + public List browse(String queueName, String messageSelector, + MqMessageFormat format, int maxMessages) { + return connector.browse(queueName, messageSelector, format, maxMessages); + } + + /** + * Browse a queue using logical queue name. + */ + public List browse(ImqFirstVisionQueue queue, String messageSelector, + MqMessageFormat format, int maxMessages) { + String physicalQueueName = resolveQueue(queue); + return connector.browse(physicalQueueName, messageSelector, format, maxMessages); + } + + /** + * Resolve logical queue name to physical queue name. + * + * @param logicalName Logical queue name or ImqFirstVisionQueue enum + * @return Physical queue name + */ + public String resolveQueue(String logicalName) { + String configKey = "endpoints.imq-first-vision." + logicalName + ".queue"; + return Optional.ofNullable(store.getConfig(configKey)) + .orElseThrow(() -> new IllegalStateException( + "Queue '" + logicalName + "' is not configured in " + configKey)); + } + + /** + * Resolve ImqFirstVisionQueue enum to physical queue name. + */ + public String resolveQueue(ImqFirstVisionQueue queue) { + return resolveQueue(queue.getConfigKey()); + } + + @Override + public void close() { + if (connector != null) { + connector.close(); + } + } +} diff --git a/test-harness/src/main/java/cz/moneta/test/harness/endpoints/imq/ImqFirstVisionQueue.java b/test-harness/src/main/java/cz/moneta/test/harness/endpoints/imq/ImqFirstVisionQueue.java new file mode 100644 index 0000000..1b90eb5 --- /dev/null +++ b/test-harness/src/main/java/cz/moneta/test/harness/endpoints/imq/ImqFirstVisionQueue.java @@ -0,0 +1,54 @@ +package cz.moneta.test.harness.endpoints.imq; + +/** + * Logical queue names for IBM MQ First Vision. + * Physical queue names are resolved from configuration. + */ +public enum ImqFirstVisionQueue { + /** + * Payment notifications queue. + */ + PAYMENT_NOTIFICATIONS("payment-notifications"), + + /** + * Payment request queue. + */ + PAYMENT_REQUEST("payment-request"), + + /** + * MF (Money Flow) requests queue. + */ + MF_REQUESTS("mf-requests"), + + /** + * MF (Money Flow) responses queue. + */ + MF_RESPONSES("mf-responses"), + + /** + * MF (Money Flow) EBCDIC queue. + */ + MF_EBCDIC("mf-ebcdic"), + + /** + * MF (Money Flow) UTF-8 queue. + */ + MF_UTF8("mf-utf8"); + + private static final String BASE_CONFIG_KEY = "endpoints.imq-first-vision."; + private static final String QUEUE_SUFFIX = ".queue"; + + private final String configKey; + + ImqFirstVisionQueue(String configKey) { + this.configKey = BASE_CONFIG_KEY + configKey + QUEUE_SUFFIX; + } + + /** + * Get the configuration key for this queue. + * Used to resolve physical queue name from configuration. + */ + public String getConfigKey() { + return configKey; + } +} diff --git a/test-harness/src/main/java/cz/moneta/test/harness/messaging/JsonPathValue.java b/test-harness/src/main/java/cz/moneta/test/harness/messaging/JsonPathValue.java new file mode 100644 index 0000000..c743d45 --- /dev/null +++ b/test-harness/src/main/java/cz/moneta/test/harness/messaging/JsonPathValue.java @@ -0,0 +1,83 @@ +package cz.moneta.test.harness.messaging; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.NullNode; + +/** + * Wrapper for extracted JSON path values. + * Provides fluent methods for value extraction and conversion. + */ +public class JsonPathValue { + + private final JsonNode node; + private final String rawValue; + + public JsonPathValue(JsonNode node) { + this.node = node; + this.rawValue = node != null ? node.asText() : null; + } + + public JsonPathValue(String rawValue) { + this.node = null; + this.rawValue = rawValue; + } + + /** + * Get the value as a string. + */ + public String asText() { + if (node != null && !(node instanceof NullNode)) { + return node.asText(); + } + return rawValue; + } + + /** + * Get the value as an integer. + */ + public int asInt() { + if (node != null && !(node instanceof NullNode)) { + return node.asInt(); + } + return Integer.parseInt(rawValue); + } + + /** + * Get the value as a long. + */ + public long asLong() { + if (node != null && !(node instanceof NullNode)) { + return node.asLong(); + } + return Long.parseLong(rawValue); + } + + /** + * Get the value as a boolean. + */ + public boolean asBoolean() { + if (node != null && !(node instanceof NullNode)) { + return node.asBoolean(); + } + return Boolean.parseBoolean(rawValue); + } + + /** + * Check if the value is null or missing. + */ + public boolean isNull() { + return node == null || node instanceof NullNode || rawValue == null; + } + + /** + * Get the underlying JsonNode. + */ + public JsonNode getNode() { + return node; + } + + @Override + public String toString() { + return asText(); + } +} diff --git a/test-harness/src/main/java/cz/moneta/test/harness/messaging/MessageContentType.java b/test-harness/src/main/java/cz/moneta/test/harness/messaging/MessageContentType.java new file mode 100644 index 0000000..693affc --- /dev/null +++ b/test-harness/src/main/java/cz/moneta/test/harness/messaging/MessageContentType.java @@ -0,0 +1,21 @@ +package cz.moneta.test.harness.messaging; + +/** + * Content type of a received message. + */ +public enum MessageContentType { + /** + * JSON content - body is a JSON string. + */ + JSON, + + /** + * XML content - body is an XML string. + */ + XML, + + /** + * Raw text content - body is plain text (e.g., EBCDIC decoded, UTF-8). + */ + RAW_TEXT +} diff --git a/test-harness/src/main/java/cz/moneta/test/harness/messaging/MessageResponse.java b/test-harness/src/main/java/cz/moneta/test/harness/messaging/MessageResponse.java new file mode 100644 index 0000000..54ca26d --- /dev/null +++ b/test-harness/src/main/java/cz/moneta/test/harness/messaging/MessageResponse.java @@ -0,0 +1,111 @@ +package cz.moneta.test.harness.messaging; + +import org.assertj.core.api.AbstractObjectAssert; + +/** + * Response interface for received messages. + * Provides assertion methods for verifying message content. + * Shared interface for both Kafka and IBM MQ message responses. + */ +public interface MessageResponse { + + /** + * Assert that a field in the message body has the expected value. + * For JSON: uses JSON path (dot/bracket notation). + * For XML: uses XPath expression. + * + * @param path JSON path or XPath expression + * @param value expected value as string + * @return this instance for fluent assertions + * @throws AssertionError if assertion fails + */ + MessageResponse andAssertFieldValue(String path, String value); + + /** + * Assert that a field exists in the message body. + * + * @param path JSON path or XPath expression + * @return this instance for fluent assertions + * @throws AssertionError if assertion fails + */ + MessageResponse andAssertPresent(String path); + + /** + * Assert that a field does not exist in the message body. + * + * @param path JSON path or XPath expression + * @return this instance for fluent assertions + * @throws AssertionError if assertion fails + */ + MessageResponse andAssertNotPresent(String path); + + /** + * Assert that a header (Kafka header or JMS property) has the expected value. + * + * @param headerName name of the header/property + * @param value expected value + * @return this instance for fluent assertions + * @throws AssertionError if assertion fails + */ + MessageResponse andAssertHeaderValue(String headerName, String value); + + /** + * Assert that the message body contains a substring. + * Primarily used for EBCDIC/UTF-8 raw text assertions. + * + * @param substring expected substring + * @return this instance for fluent assertions + * @throws AssertionError if assertion fails + */ + MessageResponse andAssertBodyContains(String substring); + + /** + * Get AssertJ fluent assertion for complex object assertions. + * + * @return AssertJ AbstractObjectAssert for fluent assertions + */ + AbstractObjectAssert andAssertWithAssertJ(); + + /** + * Extract a value from the message body. + * For JSON: uses JSON path (dot/bracket notation). + * For XML: uses XPath expression. + * + * @param path JSON path or XPath expression + * @return JsonPathValue wrapper for the extracted value + */ + JsonPathValue extract(String path); + + /** + * Deserialize the message body to a Java object. + * For JSON: uses Jackson ObjectMapper. + * For XML: uses JAXB or Jackson XmlMapper. + * + * @param type target type + * @param target type + * @return deserialized object + */ + T mapTo(Class type); + + /** + * Get the raw message body. + * + * @return message body as string + */ + String getBody(); + + /** + * Get a header value (Kafka header or JMS property). + * + * @param name header/property name + * @return header value or null if not present + */ + String getHeader(String name); + + /** + * Get the underlying received message. + * + * @return ReceivedMessage instance + */ + ReceivedMessage getMessage(); +} diff --git a/test-harness/src/main/java/cz/moneta/test/harness/messaging/MqMessageFormat.java b/test-harness/src/main/java/cz/moneta/test/harness/messaging/MqMessageFormat.java new file mode 100644 index 0000000..dc0856f --- /dev/null +++ b/test-harness/src/main/java/cz/moneta/test/harness/messaging/MqMessageFormat.java @@ -0,0 +1,31 @@ +package cz.moneta.test.harness.messaging; + +/** + * Message format for IBM MQ. + * Defines how messages are encoded and transmitted. + */ +public enum MqMessageFormat { + /** + * JSON format - JMS TextMessage with plain JSON string. + * Default format for IBM MQ. + */ + JSON, + + /** + * XML format - JMS TextMessage with XML string. + * XML is decoded and can be queried using XPath. + */ + XML, + + /** + * EBCDIC format - JMS BytesMessage with EBCDIC IBM-870 encoding. + * Used for mainframe systems (Czech/Slovak characters). + */ + EBCDIC_870, + + /** + * UTF-8 format - JMS BytesMessage with UTF-8 (CCSID 1208) encoding. + * Used for binary data with explicit UTF-8 encoding. + */ + UTF8_1208 +} diff --git a/test-harness/src/main/java/cz/moneta/test/harness/messaging/ReceivedMessage.java b/test-harness/src/main/java/cz/moneta/test/harness/messaging/ReceivedMessage.java new file mode 100644 index 0000000..f731b84 --- /dev/null +++ b/test-harness/src/main/java/cz/moneta/test/harness/messaging/ReceivedMessage.java @@ -0,0 +1,386 @@ +package cz.moneta.test.harness.messaging; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.xpath.XPath; +import javax.xml.xpath.XPathConstants; +import javax.xml.xpath.XPathFactory; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Represents a received message from a messaging system. + * Body is always normalized to a String regardless of source and wire format. + *

+ * For Kafka: Avro GenericRecord is automatically converted to JSON. + * For IBM MQ (JSON): JSON string from JMS TextMessage. + * For IBM MQ (XML): XML string from JMS TextMessage. + * For IBM MQ (EBCDIC): byte[] from JMS BytesMessage decoded from IBM-870. + */ +public class ReceivedMessage { + + private static final ObjectMapper JSON_MAPPER = new ObjectMapper(); + private static final Pattern JSON_PATH_PATTERN = Pattern.compile("^([\\w.]+)\\[([\\d]+)\\](.*)$"); + + private final String body; + private final MessageContentType contentType; + private final Map headers; + private final long timestamp; + private final String source; + private final String key; + + public ReceivedMessage(String body, MessageContentType contentType, Map headers, + long timestamp, String source, String key) { + this.body = body; + this.contentType = contentType; + this.headers = headers != null ? Collections.unmodifiableMap(new HashMap<>(headers)) : new HashMap<>(); + this.timestamp = timestamp; + this.source = source; + this.key = key; + } + + /** + * Extract a JSON value using JSON path (dot/bracket notation). + * Supports paths like "items[0].sku" or "nested.field". + * + * @param path JSON path + * @return JsonNode for the extracted value + */ + public JsonNode extractJson(String path) { + if (body == null || StringUtils.isEmpty(path)) { + return null; + } + + try { + JsonNode root = JSON_MAPPER.readTree(body); + return evaluateJsonPath(root, path); + } catch (Exception e) { + throw new RuntimeException("Failed to extract JSON path: " + path, e); + } + } + + /** + * Extract a value using XPath (for XML messages). + * + * @param xpathExpression XPath expression + * @return extracted value as string + */ + public String extractXml(String xpathExpression) { + if (body == null || StringUtils.isEmpty(xpathExpression)) { + return null; + } + + try { + DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); + factory.setFeature("http://xml.org/sax/features/external-general-entities", false); + factory.setFeature("http://xml.org/sax/features/external-parameter-entities", false); + DocumentBuilder builder = factory.newDocumentBuilder(); + javax.xml.parsers.DocumentBuilder finalBuilder = builder; + + var document = finalBuilder.parse(new java.io.ByteArrayInputStream(body.getBytes())); + document.getDocumentElement().normalize(); + + XPath xpath = XPathFactory.newInstance().newXPath(); + Object result = xpath.evaluate(xpathExpression, document, XPathConstants.NODE); + + if (result instanceof org.w3c.dom.Node domNode) { + return domNode.getTextContent(); + } + + return null; + } catch (Exception e) { + throw new RuntimeException("Failed to evaluate XPath: " + xpathExpression, e); + } + } + + /** + * Universal extract method - auto-detects content type and uses appropriate extraction. + * + * @param expression JSON path or XPath expression + * @return extracted value as string + */ + public String extract(String expression) { + return switch (contentType) { + case JSON -> extractJson(expression).asText(); + case XML -> extractXml(expression); + case RAW_TEXT -> body; + }; + } + + /** + * Evaluate JSON path on a JSON node. + * Supports dot notation and bracket notation for arrays. + */ + private JsonNode evaluateJsonPath(JsonNode node, String path) { + if (StringUtils.isEmpty(path)) { + return node; + } + + String[] parts = tokenizePath(path); + JsonNode current = node; + + for (String part : parts) { + if (current == null) { + return null; + } + + if (part.isEmpty()) { + continue; + } + + if (part.contains("[")) { + // Array access + Matcher matcher = JSON_PATH_PATTERN.matcher(part); + if (matcher.matches()) { + String arrayName = matcher.group(1); + int index = Integer.parseInt(matcher.group(2)); + String remaining = matcher.group(3); + + if (current.isArray()) { + current = index < current.size() ? current.get(index) : null; + } else if (current.isObject()) { + JsonNode arrayNode = current.get(arrayName); + if (arrayNode != null && arrayNode.isArray()) { + current = index < arrayNode.size() ? arrayNode.get(index) : null; + } else { + current = null; + } + } + + // Continue with remaining path + if (StringUtils.isNotBlank(remaining)) { + current = evaluateJsonPath(current, remaining); + } + } else { + current = current.get(part); + } + } else if (part.contains(".")) { + // Navigate through object properties + String[] segments = part.split("\\."); + for (String segment : segments) { + if (StringUtils.isEmpty(segment)) { + continue; + } + current = current.get(segment); + } + } else { + current = current.get(part); + } + } + + return current; + } + + /** + * Tokenize JSON path into segments. + */ + private String[] tokenizePath(String path) { + if (StringUtils.isEmpty(path)) { + return new String[0]; + } + + java.util.List tokens = new java.util.ArrayList<>(); + StringBuilder current = new StringBuilder(); + boolean inBracket = false; + + for (int i = 0; i < path.length(); i++) { + char c = path.charAt(i); + if (c == '[') { + inBracket = true; + current.append(c); + } else if (c == ']') { + inBracket = false; + current.append(c); + } else if (c == '.' && !inBracket) { + if (current.length() > 0) { + tokens.add(current.toString()); + current.setLength(0); + } + } else { + current.append(c); + } + } + + if (current.length() > 0) { + tokens.add(current.toString()); + } + + return tokens.toArray(new String[0]); + } + + /** + * Get the message key (Kafka message key, null for IBM MQ). + */ + public String getKey() { + return key; + } + + /** + * Get a header value (Kafka header or JMS property). + */ + public String getHeader(String name) { + return headers.get(name); + } + + /** + * Get all headers. + */ + public Map getHeaders() { + return headers; + } + + /** + * Get the message body. + */ + public String getBody() { + return body; + } + + /** + * Get the message timestamp. + */ + public long getTimestamp() { + return timestamp; + } + + /** + * Get the content type. + */ + public MessageContentType getContentType() { + return contentType; + } + + /** + * Get the source (topic or queue name). + */ + public String getSource() { + return source; + } + + /** + * Deserialize the message body to a Java object. + * + * @param type target type + * @param target type + * @return deserialized object + */ + public T mapTo(Class type) { + if (body == null) { + return null; + } + + try { + if (contentType == MessageContentType.XML) { + // XML deserialization using JAXB + return mapXmlTo(type); + } else { + // JSON deserialization using Jackson + return JSON_MAPPER.readValue(body, type); + } + } catch (Exception e) { + throw new RuntimeException("Failed to deserialize message to " + type.getName(), e); + } + } + + /** + * Deserialize the message body to a Java object for XML. + * Uses JAXB-like parsing - simplified for basic XML structures. + */ + private T mapXmlTo(Class type) { + try { + // For XML, parse to a simple map structure + DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); + DocumentBuilder builder = factory.newDocumentBuilder(); + javax.xml.parsers.DocumentBuilder finalBuilder = builder; + + var document = finalBuilder.parse(new java.io.ByteArrayInputStream(body.getBytes())); + document.getDocumentElement().normalize(); + + Map xmlMap = xmlToMap(document.getDocumentElement()); + return JSON_MAPPER.convertValue(xmlMap, type); + } catch (Exception e) { + throw new RuntimeException("Failed to deserialize XML message", e); + } + } + + /** + * Convert XML element to Map. + */ + private Map xmlToMap(org.w3c.dom.Element element) { + Map result = new HashMap<>(); + + for (int i = 0; i < element.getAttributes().getLength(); i++) { + org.w3c.dom.NamedNodeMap attributes = element.getAttributes(); + org.w3c.dom.Node attr = attributes.item(i); + result.put("@" + attr.getNodeName(), attr.getNodeValue()); + } + + // Add children + for (int i = 0; i < element.getChildNodes().getLength(); i++) { + org.w3c.dom.Node node = element.getChildNodes().item(i); + if (node.getNodeType() == org.w3c.dom.Node.ELEMENT_NODE) { + org.w3c.dom.Element childElement = (org.w3c.dom.Element) node; + String tagName = childElement.getTagName(); + + if (childElement.getChildNodes().getLength() == 0) { + // Leaf element + result.put(tagName, childElement.getTextContent()); + } else { + // Check if all children are elements (complex) or text (simple) + boolean hasElement = false; + for (int j = 0; j < childElement.getChildNodes().getLength(); j++) { + org.w3c.dom.Node childNode = childElement.getChildNodes().item(j); + if (childNode.getNodeType() == org.w3c.dom.Node.TEXT_NODE && + StringUtils.isNotBlank(childNode.getTextContent())) { + } else if (childNode.getNodeType() == org.w3c.dom.Node.ELEMENT_NODE) { + hasElement = true; + } + } + + if (hasElement) { + Map childMap = xmlToMap(childElement); + if (result.containsKey(tagName)) { + // Convert to list if multiple elements with same name + java.util.List list = new java.util.ArrayList<>(); + if (result.get(tagName) instanceof Map) { + list.add(result.get(tagName)); + } + list.add(childMap); + result.put(tagName, list); + } else { + result.put(tagName, childMap); + } + } else { + result.put(tagName, childElement.getTextContent()); + } + } + } + } + + // If element has only text content and no attributes or children, return text + if (element.getChildNodes().getLength() == 0) { + Map textMap = new HashMap<>(); + textMap.put("#text", element.getTextContent()); + return textMap; + } + + return result; + } + + @Override + public String toString() { + return "ReceivedMessage{" + + "contentType=" + contentType + + ", source='" + source + '\'' + + ", key='" + key + '\'' + + ", body='" + body + '\'' + + '}'; + } +} diff --git a/test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingConnectionException.java b/test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingConnectionException.java new file mode 100644 index 0000000..85a1d3f --- /dev/null +++ b/test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingConnectionException.java @@ -0,0 +1,16 @@ +package cz.moneta.test.harness.messaging.exception; + +/** + * Exception thrown when connection to messaging system fails. + * Covers authentication failures, network issues, and connection problems. + */ +public class MessagingConnectionException extends MessagingException { + + public MessagingConnectionException(String message) { + super(message); + } + + public MessagingConnectionException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingDestinationException.java b/test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingDestinationException.java new file mode 100644 index 0000000..5bd5ae5 --- /dev/null +++ b/test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingDestinationException.java @@ -0,0 +1,15 @@ +package cz.moneta.test.harness.messaging.exception; + +/** + * Exception thrown when destination (queue, topic) is not found or inaccessible. + */ +public class MessagingDestinationException extends MessagingException { + + public MessagingDestinationException(String message) { + super(message); + } + + public MessagingDestinationException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingException.java b/test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingException.java new file mode 100644 index 0000000..f923bda --- /dev/null +++ b/test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingException.java @@ -0,0 +1,15 @@ +package cz.moneta.test.harness.messaging.exception; + +/** + * Abstract base exception for messaging system errors (IBM MQ, Kafka). + */ +public abstract class MessagingException extends RuntimeException { + + protected MessagingException(String message) { + super(message); + } + + protected MessagingException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingSchemaException.java b/test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingSchemaException.java new file mode 100644 index 0000000..5e0860a --- /dev/null +++ b/test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingSchemaException.java @@ -0,0 +1,16 @@ +package cz.moneta.test.harness.messaging.exception; + +/** + * Exception thrown when message schema validation fails. + * Currently primarily used for IBM MQ message format issues. + */ +public class MessagingSchemaException extends MessagingException { + + public MessagingSchemaException(String message) { + super(message); + } + + public MessagingSchemaException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingTimeoutException.java b/test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingTimeoutException.java new file mode 100644 index 0000000..5d69880 --- /dev/null +++ b/test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingTimeoutException.java @@ -0,0 +1,15 @@ +package cz.moneta.test.harness.messaging.exception; + +/** + * Exception thrown when waiting for a message times out. + */ +public class MessagingTimeoutException extends MessagingException { + + public MessagingTimeoutException(String message) { + super(message); + } + + public MessagingTimeoutException(String message, Throwable cause) { + super(message, cause); + } +}