--- name: Kafka IBM MQ Support overview: "Rozšíření testovacího frameworku Harness o podporu messaging systémů (Kafka na Confluent Cloud a IBM MQ) s kompletní abstrakcí, aby tester nemusel znát detaily jednotlivých protokolů. Změny probíhají ve dvou projektech: test-harness-master (framework) a test-master (DSL + testy)." todos: - id: maven-deps content: Přidat Maven závislosti pro kafka-clients, kafka-avro-serializer, com.ibm.mq.allclient a jakarta.jms-api do pom.xml status: pending - id: model-classes content: "Vytvořit model třídy: ReceivedMessage, MessageContentType, MqMessageFormat, MessageResponse interface" status: pending - id: exception-classes content: "Vytvořit hierarchii výjimek: MessagingException (abstract), MessagingConnectionException, MessagingSchemaException, MessagingDestinationException, MessagingTimeoutException" status: pending - id: kafka-connector content: Implementovat KafkaConnector s SASL/PLAIN auth, Avro+SchemaRegistry, producer pooling, consumer assign/seek (bez consumer group), poll cyklus s Predicate filtrem status: pending - id: ibmmq-connector content: Implementovat IbmMqConnector s JMS klientem, user+password, SSL keystore, connection pooling, podpora JSON/XML/EBCDIC_870/UTF8_1208 status: pending - id: kafka-endpoint content: Vytvořit KafkaEndpoint - čte endpoints.kafka.* konfiguraci, credentials z Vaultu, lazy init KafkaConnectoru status: pending - id: ibmmq-endpoint content: Vytvořit ImqFirstVisionEndpoint - čte endpoints.imq-first-vision.* konfiguraci, user + heslo z Vaultu status: pending - id: kafka-request content: Implementovat KafkaRequest fluent builder - withKey, withPayload, withPayloadFromTemplate (nice), withTraceparent/requestID/activityID/sourceCodebookId, send, receiveWhere, andAssertFieldValue, andAssertWithAssertJ (nice), mapTo status: pending - id: ibmmq-request content: Implementovat ImqRequest fluent builder - toQueue, withPayload, withPayloadFromTemplate (nice), asXml/asEbcdic/asUtf8, send, receiveWhere, browse, andAssertFieldValue, andAssertWithAssertJ (nice), mapTo status: pending - id: kafka-dsl content: Vytvořit Kafka DSL třídu a přidat withKafka() do Harness.java status: pending - id: ibmmq-dsl content: Vytvořit ImqFirstVision DSL třídu a přidat withImqFirstVision() do Harness.java status: pending - id: config content: Přidat endpoints.kafka.* a endpoints.imq-first-vision.* konfiguraci do envs/ souborů + vault paths status: pending isProject: false --- # Rozšíření testovacího frameworku o Kafka a IBM MQ ## Současná architektura Framework je postaven na vrstvené architektuře: ```mermaid graph TD Test["Test (@TestCase)"] --> DSL["DSL vrstva (Harness.withXxx())"] DSL --> EP["Endpoint (implementuje Endpoint)"] EP --> CON["Connector (implementuje Connector)"] CON --> SYS["Cílový systém"] ``` - **Connector** ([connectors/Connector.java](test-harness-master/src/main/java/cz/moneta/test/harness/connectors/Connector.java)): nízkoúrovňová komunikace s cílovým systémem - **Endpoint** ([endpoints/Endpoint.java](test-harness-master/src/main/java/cz/moneta/test/harness/endpoints/Endpoint.java)): obaluje Connector, čte konfiguraci ze `StoreAccessor`, singleton per test class - **DSL Builder** (v test-master, např. [Wso2.java](test-master/src/main/java/cz/moneta/test/dsl/wso2/Wso2.java)): fluent API pro testy - **Harness** ([Harness.java](test-master/src/main/java/cz/moneta/test/dsl/Harness.java)): vstupní bod přes `withXxx()` metody ## Navrhovaná architektura Kafka a IBM MQ mají oddělené vstupní body v Harness DSL -- analogicky k `withWso2()`, `withUfo()` apod. ```mermaid graph TD subgraph kafkaPath [Kafka] TestK["harness.withKafka()"] --> KafkaDSL["Kafka DSL"] KafkaDSL --> KafkaReq["KafkaRequest builder"] KafkaReq --> KafkaEP["KafkaEndpoint"] KafkaEP --> KafkaCon["KafkaConnector"] KafkaCon --> ConflCloud["Confluent Cloud"] end subgraph imqPath [IBM MQ] TestI["harness.withImqFirstVision()"] --> ImqDSL["ImqFirstVision DSL"] ImqDSL --> ImqReq["ImqRequest builder"] ImqReq --> ImqEP["ImqFirstVisionEndpoint"] ImqEP --> ImqCon["IbmMqConnector"] ImqCon --> ImqSrv["IBM MQ Server"] end KafkaCon -.->|"API key/secret"| Vault["HashiCorp Vault"] ImqCon -.->|"user + password"| Vault ``` ## Třídní diagram ```mermaid classDiagram class Connector { <> +close() } class Endpoint { <> +canAccess() bool +close() } class KafkaConnector { -KafkaProducer producer -CachedSchemaRegistryClient schemaRegistry +send(topic, key, jsonPayload, headers) +receive(topic, filter, timeout) List~ReceivedMessage~ +close() } class IbmMqConnector { -JmsConnectionFactory connectionFactory -JMSContext jmsContext +send(queue, payload, format, properties) +receive(queue, selector, format, timeout) ReceivedMessage +browse(queue, selector, format, maxMessages) List~ReceivedMessage~ +close() } class KafkaEndpoint { -KafkaConnector connector -StoreAccessor store } class ImqFirstVisionEndpoint { -IbmMqConnector connector -StoreAccessor store } class ReceivedMessage { +String body +MessageContentType contentType +Map headers +long timestamp +extract(expression) String +extractJson(path) JsonNode +extractXml(xpath) String } class MqMessageFormat { <> JSON XML EBCDIC_870 UTF8_1208 } Connector <|.. KafkaConnector Connector <|.. IbmMqConnector Endpoint <|.. KafkaEndpoint Endpoint <|.. ImqFirstVisionEndpoint KafkaEndpoint --> KafkaConnector ImqFirstVisionEndpoint --> IbmMqConnector ``` ## Detailní návrh tříd ### 1. Connectors (test-harness-master) #### KafkaConnector Umístění: `cz.moneta.test.harness.connectors.messaging.KafkaConnector` - Spravuje `KafkaProducer` (lazy init, singleton, thread-safe) a `KafkaConsumer` (per-call, kvůli izolaci) - SASL/PLAIN autentizace: API key + secret z Vaultu - SSL context s truststore (stejný pattern jako `BaseRestConnector`) - **Formát zpráv: Avro** -- používá Confluent Schema Registry: - **Volba schématu je řízena názvem topicu**: Schema Registry subject = `{topic-name}-value` (Confluent TopicNameStrategy, výchozí). Tester nemusí schéma specifikovat -- connector ho získá automaticky z Registry podle topicu. - Producer: `KafkaAvroSerializer` serializuje `GenericRecord` / `SpecificRecord` - Consumer: `KafkaAvroDeserializer` deserializuje na `GenericRecord` - Automatická konverze Avro `GenericRecord` -> JSON string v `ReceivedMessage` (transparentní pro testera) - Pro odesílání: tester poskytne JSON string, connector stáhne schéma z Registry podle topicu a vytvoří `GenericRecord` - **Vstup pro odeslání zprávy**: název topicu (= logická destinace), klíč (String), tělo zprávy (JSON String), headery (Map) - **Podporované Kafka headery** -- framework poskytuje typované builder metody pro běžné tracing/correlation headery: - `traceparent` -- W3C Trace Context propagace (např. `00-traceId-spanId-01`) - `requestID` -- identifikátor požadavku - `activityID` -- identifikátor aktivity - `sourceCodebookId` -- identifikátor zdrojového číselníku - Libovolné další headery přes generický `.withHeader(key, value)` - **Izolace testů**: Consumer bez consumer group - používá `consumer.assign(partitions)` + `consumer.seekToEnd()` místo `subscribe()`. Díky tomu: - Žádný consumer group = žádný rebalancing mezi testy - Každý test si čte od aktuální pozice nezávisle - Testy neovlivňují offsety ostatních testů - **Poll cyklus**: Metoda `receive(topic, Predicate, Duration timeout)`: 1. Vytvoří nový Consumer 2. `assign()` na všechny partitions daného topicu 3. `seekToEnd()` (nebo `seek` na uloženou pozici) 4. Polling smyčka s exponenciálním backoff (100ms -> 500ms -> 1s) 5. Každý record: Avro GenericRecord -> JSON -> test proti Predicate 6. Vrátí první matching zprávu, nebo vyhodí `MessagingTimeoutException` ```java public class KafkaConnector implements Connector { private final Properties baseConfig; private KafkaProducer producer; private final String schemaRegistryUrl; private final CachedSchemaRegistryClient schemaRegistryClient; public KafkaConnector(String bootstrapServers, String apiKey, String apiSecret, String schemaRegistryUrl, String schemaRegistryApiKey, String schemaRegistryApiSecret) { ... } /** * Odeslání zprávy na topic. * Schéma se získá automaticky z Schema Registry podle topicu * (subject = "{topic}-value", TopicNameStrategy). * * @param topic název Kafka topicu (= logická destinace) * @param key klíč zprávy (String) * @param jsonPayload tělo zprávy jako JSON String * @param headers Kafka headery (traceparent, requestID, activityID, * sourceCodebookId, ...) */ public void send(String topic, String key, String jsonPayload, Map headers) { Schema schema = getSchemaForTopic(topic); GenericRecord record = jsonToAvro(jsonPayload, schema); ProducerRecord producerRecord = new ProducerRecord<>(topic, key, record); headers.forEach((k, v) -> producerRecord.headers().add(k, v.getBytes(StandardCharsets.UTF_8))); producer.send(producerRecord).get(); } // Příjem: Avro GenericRecord se konvertuje na JSON v ReceivedMessage public List receive(String topic, Predicate filter, Duration timeout) { ... } public Map saveOffsets(String topic) { ... } // Stáhne latest schéma z Registry podle topicu (subject = {topic}-value) private Schema getSchemaForTopic(String topic) { String subject = topic + "-value"; SchemaMetadata meta = schemaRegistryClient.getLatestSchemaMetadata(subject); return new Schema.Parser().parse(meta.getSchema()); } private String avroToJson(GenericRecord record) { ... } private GenericRecord jsonToAvro(String json, Schema schema) { ... } } ``` #### IbmMqConnector Umístění: `cz.moneta.test.harness.connectors.messaging.IbmMqConnector` - Používá `com.ibm.mq.allclient` JMS klient - `JmsConnectionFactory` pro připojení s user + password - **Multi-instance Queue Manager**: Podpora connection name list ve formátu `host1(port1),host2(port2)` (např. `mq9multitst5x(1414),mq9multitst6x(1414)`). Nastavuje se přes `MQConnectionFactory.setConnectionNameList()` místo samostatných `setHostName()` / `setPort()`. IBM MQ klient automaticky provádí failover na další instanci při nedostupnosti. - SSL/TLS s keystore a truststore - **Podporované formáty zpráv:** - **JSON** (výchozí): `JMS TextMessage` s plain JSON string - **XML**: `JMS TextMessage` s XML string; příjem konvertuje XML na JSON v `ReceivedMessage` (přes Jackson `XmlMapper`) pro jednotné API - **UTF-8 / CCSID 1208** (výchozí pro binární): `JMS BytesMessage` s payload kódovaným v UTF-8 (IBM CCSID 1208). MQMD CCSID = 1208. Vhodné pro systémy vyžadující explicitní UTF-8 BytesMessage. - **EBCDIC / CCSID 870**: `JMS BytesMessage` s payload kódovaným v EBCDIC code page 870 (český/slovenský mainframe kódování). MQMD CCSID = 870. Při odesílání se Java String konvertuje na `byte[]` přes `Charset.forName("IBM870")`. Při příjmu se `byte[]` dekóduje zpět na Java String. - **Logika formátu:** - Formát se volí explicitně ve fluent API (`.asXml()`, `.asEbcdic()`, `.asUtf8()`); výchozí je JSON - Při příjmu: connector detekuje typ JMS zprávy (`TextMessage` vs `BytesMessage`) a dekóduje odpovídajícím způsobem podle nastaveného formátu - **Connection pooling**: Drží `JMSContext` per endpoint instance, recykluje sessions - **Izolace testů**: Používá `QueueBrowser` pro nedestruktivní čtení (browse), nebo `createConsumer` s message selektorem - `send()`, `receive()`, `browse()` metody ```java public enum MqMessageFormat { JSON, // TextMessage, plain JSON (UTF-8 default) XML, // TextMessage, XML string (UTF-8 default) EBCDIC_870, // BytesMessage, EBCDIC IBM-870 (česky/slovensky) UTF8_1208 // BytesMessage, UTF-8 IBM CCSID 1208 (výchozí pro binární) } public class IbmMqConnector implements Connector { private static final Charset EBCDIC_870 = Charset.forName("IBM870"); private static final Charset UTF_8 = StandardCharsets.UTF_8; private final JmsConnectionFactory connectionFactory; private JMSContext jmsContext; // connectionNameList: "host1(port1),host2(port2)" pro multi-instance QMGR public IbmMqConnector(String connectionNameList, String channel, String queueManager, String user, String password, String keystorePath, String keystorePassword) { JmsConnectionFactory cf = ...; cf.setStringProperty(WMQConstants.WMQ_CONNECTION_NAME_LIST, connectionNameList); cf.setStringProperty(WMQConstants.WMQ_CHANNEL, channel); cf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, queueManager); cf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT); // ... } // Odeslání zprávy s daným formátem public void send(String queueName, String payload, MqMessageFormat format, Map properties) { switch (format) { case JSON, XML -> sendTextMessage(queueName, payload, properties); case EBCDIC_870 -> sendBytesMessage(queueName, payload, EBCDIC_870, 870, properties); case UTF8_1208 -> sendBytesMessage(queueName, payload, UTF_8, 1208, properties); } } // TextMessage pro JSON a XML (JVM default UTF-8) private void sendTextMessage(String queueName, String payload, Map properties) { ... } // BytesMessage s konverzí String -> byte[] v daném kódování + MQMD CCSID private void sendBytesMessage(String queueName, String payload, Charset charset, int ccsid, Map properties) { byte[] bytes = payload.getBytes(charset); // Vytvoří BytesMessage, nastaví MQMD CCSID, zapíše bytes } // Příjem: auto-detekce TextMessage vs BytesMessage public ReceivedMessage receive(String queueName, String messageSelector, MqMessageFormat expectedFormat, Duration timeout) { ... } /** * Browse (ne-destruktivní čtení) -- zprávy zůstávají ve frontě. * Vrací seznam zpráv odpovídajících messageSelector, bez jejich odstranění. * Použití: inspekce obsahu fronty před/po testu, ověření počtu zpráv. */ public List browse(String queueName, String messageSelector, MqMessageFormat expectedFormat, int maxMessages) { ... } // Dekódování přijaté zprávy podle formátu private String decodeMessage(Message jmsMessage, MqMessageFormat format) { if (jmsMessage instanceof TextMessage txt) { return txt.getText(); // JSON nebo XML string (UTF-8) } else if (jmsMessage instanceof BytesMessage bytesMsg) { byte[] data = new byte[(int) bytesMsg.getBodyLength()]; bytesMsg.readBytes(data); Charset charset = switch (format) { case EBCDIC_870 -> EBCDIC_870; case UTF8_1208 -> UTF_8; default -> UTF_8; }; return new String(data, charset); } } } ``` ### 2. Model třídy (test-harness-master) #### ReceivedMessage Umístění: `cz.moneta.test.harness.support.messaging.ReceivedMessage` Body je vždy String normalizovaný do čitelné podoby bez ohledu na zdroj a wire formát: - Z Kafka: Avro `GenericRecord` je automaticky konvertován na JSON v `KafkaConnector` - Z IBM MQ (JSON): JSON string z `JMS TextMessage` je předán přímo - Z IBM MQ (XML): XML string z `JMS TextMessage` je předán jako XML; `extract()` podporuje jak JSON dotpath, tak XPath - Z IBM MQ (EBCDIC): `byte[]` z `JMS BytesMessage` je dekódován z IBM-870 na Java String ```java public enum MessageContentType { JSON, XML, RAW_TEXT } public class ReceivedMessage { private final String body; // dekódovaný textový obsah private final MessageContentType contentType; // JSON, XML, nebo RAW_TEXT private final Map headers; // Kafka headers nebo JMS properties private final long timestamp; private final String source; // topic nebo queue name private final String key; // Kafka message key (null pro IBM MQ) // JSON navigace (dot/bracket notace: "items[0].sku") public JsonNode extractJson(String path) { ... } // XPath navigace (pro XML zprávy: "/response/balance") public String extractXml(String xpathExpression) { ... } // Univerzální extract - auto-detekce podle contentType public String extract(String expression) { return switch (contentType) { case JSON -> extractJson(expression).asText(); case XML -> extractXml(expression); case RAW_TEXT -> body; }; } public String getKey() { ... } // Kafka klíč, null pro IBM MQ public String getHeader(String name) { ... } // Kafka header nebo JMS property public String getBody() { ... } // surový textový obsah public long getTimestamp() { ... } public MessageContentType getContentType() { ... } /** * Deserializace těla zprávy do Java objektu. * Pro JSON: Jackson ObjectMapper.readValue(body, type) * Pro XML: JAXB Unmarshaller nebo Jackson XmlMapper */ public T mapTo(Class type) { ... } } ``` ### 3. Endpoints (test-harness-master) #### KafkaEndpoint Umístění: `cz.moneta.test.harness.endpoints.kafka.KafkaEndpoint` - Vzor: analogie s `Wso2GatewayEndpoint` -- konstruktor přijímá `StoreAccessor`, čte `endpoints.kafka.*` konfiguraci - Čte konfiguraci z `StoreAccessor`: - `endpoints.kafka.bootstrap-servers` - `endpoints.kafka.security-protocol` - `endpoints.kafka.sasl-mechanism` - `endpoints.kafka.schema-registry-url` - `endpoints.kafka.value-serializer` - Credentials z Vaultu (path: `vault.kafka.secrets.path`): - Kafka API key + secret - Schema Registry API key + secret - Lazy init KafkaConnectoru ```java public class KafkaEndpoint implements Endpoint { private final KafkaConnector connector; private final StoreAccessor store; public KafkaEndpoint(StoreAccessor store) { this.store = store; String bootstrapServers = Optional.ofNullable(store.getConfig("endpoints.kafka.bootstrap-servers")) .orElseThrow(() -> new IllegalStateException( "You need to configure endpoints.kafka.bootstrap-servers")); String schemaRegistryUrl = store.getConfig("endpoints.kafka.schema-registry-url"); // API key/secret z Vaultu // ... this.connector = new KafkaConnector(bootstrapServers, apiKey, apiSecret, schemaRegistryUrl, srApiKey, srApiSecret); } public void send(String topic, String key, String jsonPayload, Map headers) { connector.send(topic, key, jsonPayload, headers); } public List receive(String topic, Predicate filter, Duration timeout) { return connector.receive(topic, filter, timeout); } @Override public void close() { connector.close(); } } ``` #### ImqFirstVisionEndpoint Umístění: `cz.moneta.test.harness.endpoints.imq.ImqFirstVisionEndpoint` - Vzor: analogie s `Wso2GatewayEndpoint` - Čte konfiguraci z `StoreAccessor`: - `endpoints.imq-first-vision.connection-name-list` -- formát `host1(port1),host2(port2)` pro multi-instance QMGR - `endpoints.imq-first-vision.channel` - `endpoints.imq-first-vision.queue-manager` - `endpoints.imq-first-vision.ssl-cipher-suite` - User i heslo z Vaultu (path: `vault.imq-first-vision.secrets.path`) - Lazy init IbmMqConnectoru ```java public class ImqFirstVisionEndpoint implements Endpoint { private final IbmMqConnector connector; private final StoreAccessor store; public ImqFirstVisionEndpoint(StoreAccessor store) { this.store = store; // connection-name-list: "mq9multitst5x(1414),mq9multitst6x(1414)" String connectionNameList = Optional.ofNullable( store.getConfig("endpoints.imq-first-vision.connection-name-list")) .orElseThrow(() -> new IllegalStateException( "You need to configure endpoints.imq-first-vision.connection-name-list")); String channel = store.getConfig("endpoints.imq-first-vision.channel"); String queueManager = store.getConfig("endpoints.imq-first-vision.queue-manager"); // user + password z Vaultu String user = // z Vaultu (vault.imq-first-vision.secrets.path -> "user") String password = // z Vaultu (vault.imq-first-vision.secrets.path -> "password") // ... this.connector = new IbmMqConnector(connectionNameList, channel, queueManager, user, password, keystorePath, keystorePassword); } public void send(String queueName, String payload, MqMessageFormat format, Map properties) { connector.send(queueName, payload, format, properties); } public ReceivedMessage receive(String queueName, String messageSelector, MqMessageFormat format, Duration timeout) { return connector.receive(queueName, messageSelector, format, timeout); } public List browse(String queueName, String messageSelector, MqMessageFormat format, int maxMessages) { return connector.browse(queueName, messageSelector, format, maxMessages); } // Resolve logického názvu fronty z konfigurace public String resolveQueue(String logicalName) { return Optional.ofNullable( store.getConfig("endpoints.imq-first-vision." + logicalName + ".queue")) .orElseThrow(() -> new IllegalStateException( "Queue '" + logicalName + "' is not configured in " + "endpoints.imq-first-vision." + logicalName + ".queue")); } @Override public void close() { connector.close(); } } ``` ### 4. DSL Builder (test-master) #### Kafka DSL Umístění: `cz.moneta.test.dsl.kafka.Kafka` Vstupní bod přes `Harness`: ```java public Kafka withKafka() { return new Kafka(this); } ``` DSL třída uvnitř získá endpoint přes existující mechanismus: ```java public class Kafka { private final Harness harness; public Kafka(Harness harness) { this.harness = harness; } public KafkaRequest.TopicPhase prepareRequest() { KafkaEndpoint endpoint = harness.getEndpoint(KafkaEndpoint.class); return KafkaRequest.builder(endpoint); } } ``` #### ImqFirstVision DSL Umístění: `cz.moneta.test.dsl.imq.ImqFirstVision` Vstupní bod přes `Harness`: ```java public ImqFirstVision withImqFirstVision() { return new ImqFirstVision(this); } ``` DSL třída uvnitř získá endpoint přes existující mechanismus: ```java public class ImqFirstVision { private final Harness harness; public ImqFirstVision(Harness harness) { this.harness = harness; } public ImqRequest.QueuePhase prepareRequest() { ImqFirstVisionEndpoint endpoint = harness.getEndpoint(ImqFirstVisionEndpoint.class); return ImqRequest.builder(endpoint); } } ``` #### Registrace endpointů v BaseStoreAccessor `KafkaEndpoint` a `ImqFirstVisionEndpoint` **nevyžadují** explicitní registraci. Existující mechanismus v `BaseStoreAccessor.getEndpoint(Class)` je automaticky instanciuje přes reflexi (konstruktor `(StoreAccessor)`), cache-uje a spravuje lifecycle (zavírá přes `close()`). Podmínkou je, aby nové endpointy měly veřejný konstruktor `public XxxEndpoint(StoreAccessor store)` a implementovaly `Endpoint` interface. #### ImqFirstVisionQueue enum Umístění: `cz.moneta.test.harness.endpoints.imq.ImqFirstVisionQueue` Enum definuje logické názvy front. Fyzický název fronty se resolvuje z konfigurace (`endpoints.imq-first-vision.{logicalName}.queue`). Tester používá enum konstantu -- nemusí znát fyzický název fronty ani konfigurační klíč. ```java public enum ImqFirstVisionQueue { PAYMENT_NOTIFICATIONS("payment-notifications"), PAYMENT_REQUEST("payment-request"), MF_REQUESTS("mf-requests"), MF_RESPONSES("mf-responses"), MF_EBCDIC("mf-ebcdic"), MF_UTF8("mf-utf8"); private final String configKey; ImqFirstVisionQueue(String configKey) { this.configKey = configKey; } public String getConfigKey() { return configKey; } } ``` Resoluce v `ImqFirstVisionEndpoint.resolveQueue()`: - `ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS` -> čte `endpoints.imq-first-vision.payment-notifications.queue` -> vrátí `"MVSW2TST3.DELIVERY.NOTIFICATION"` #### KafkaRequest fluent builder Umístění: `cz.moneta.test.harness.support.messaging.KafkaRequest` #### ImqRequest fluent builder Umístění: `cz.moneta.test.harness.support.messaging.ImqRequest` #### Definice fluent API pro realizaci Následující rozhraní a metody definují kompletní fluent API. DSL třídy (`Kafka`, `ImqFirstVision`) delegují na request buildery (`KafkaRequest`, `ImqRequest`). **Fáze builderu:** Každá metoda vrací rozhraní, které umožňuje další volání až do terminálu (`send()` nebo `MessageResponse`). **Legenda k implementaci:** - **[EXISTUJE]** -- obdobná logika již je v frameworku (RawRestRequest), lze znovu využít nebo adaptovat - **[NOVÉ]** -- je potřeba implementovat ##### KafkaRequest – fázová rozhraní ```java // Fáze 1: Po withKafka() -- výběr směru (odeslání vs. příjem) -- [NOVÉ] public interface KafkaSendPhase { KafkaPayloadPhase toTopic(String topic); } public interface KafkaReceivePhase { KafkaReceiveFilterPhase fromTopic(String topic); } // Fáze 2a: Odeslání - payload a headery // withPayload, withPayloadFromFile, addField, appendToArray, withHeader [EXISTUJE] v RawRestRequest // withKey, withTraceparent/RequestID/ActivityID/SourceCodebookId, send [NOVÉ] public interface KafkaPayloadPhase { KafkaPayloadPhase withKey(String key); // [NOVÉ] KafkaPayloadPhase withPayload(String json); // [EXISTUJE] KafkaPayloadPhase withPayloadFromFile(String path); // [EXISTUJE] KafkaPayloadPhase withPayloadFromTemplate(String path, Map variables); // [EXISTUJE] nice to have KafkaPayloadPhase addField(String fieldName, Object value); KafkaPayloadPhase addField(String path, String fieldName, Object value); KafkaPayloadPhase appendToArray(String path, Object value); KafkaPayloadPhase withTraceparent(String value); // [NOVÉ] KafkaPayloadPhase withRequestID(String value); // [NOVÉ] KafkaPayloadPhase withActivityID(String value); // [NOVÉ] KafkaPayloadPhase withSourceCodebookId(String value); // [NOVÉ] KafkaPayloadPhase withHeader(String key, String value); // [EXISTUJE] void send(); // [NOVÉ] } // Fáze 2b: Příjem - filtr a timeout -- [NOVÉ] public interface KafkaReceiveFilterPhase { KafkaAwaitingPhase receiveWhere(Predicate filter); } public interface KafkaAwaitingPhase { MessageResponse withTimeout(long duration, TimeUnit unit); } // MessageResponse -- obdobné RawRestRequest.Response // andAssertFieldValue, andAssertPresent, andAssertNotPresent, extract, mapTo [EXISTUJE] // andAssertHeaderValue, andAssertBodyContains, andAssertWithAssertJ [NOVÉ] public interface MessageResponse { MessageResponse andAssertFieldValue(String path, String value); // [EXISTUJE] MessageResponse andAssertPresent(String path); // [EXISTUJE] MessageResponse andAssertNotPresent(String path); // [EXISTUJE] MessageResponse andAssertHeaderValue(String headerName, String value); // [NOVÉ] MessageResponse andAssertBodyContains(String substring); // [NOVÉ] primárně IBM MQ ObjectAssert andAssertWithAssertJ(); // [NOVÉ] nice to have JsonPathValue extract(String path); // [EXISTUJE] T mapTo(Class type); // [EXISTUJE] jako post(Class) ReceivedMessage getMessage(); String getBody(); String getHeader(String name); } ``` ##### ImqRequest – fázová rozhraní ```java // Fáze 1: Po withImqFirstVision() -- [NOVÉ] public interface ImqSendPhase { ImqPayloadPhase toQueue(ImqFirstVisionQueue queue); } public interface ImqReceivePhase { ImqReceiveFilterPhase fromQueue(ImqFirstVisionQueue queue); } // Fáze 2a: Odeslání - asXml/asEbcdic/asUtf8 [NOVÉ], payload/addField/appendToArray [EXISTUJE] public interface ImqPayloadPhase { ImqPayloadPhase asXml(); // [NOVÉ] ImqPayloadPhase asEbcdic(); // [NOVÉ] ImqPayloadPhase asUtf8(); // [NOVÉ] ImqPayloadPhase withPayload(String payload); ImqPayloadPhase withPayloadFromFile(String path); ImqPayloadPhase withPayloadFromTemplate(String path, Map variables); ImqPayloadPhase addField(String fieldName, Object value); ImqPayloadPhase addField(String path, String fieldName, Object value); ImqPayloadPhase appendToArray(String path, Object value); void send(); } // Fáze 2b: Příjem - withSelector, receiveWhere, browse [NOVÉ] public interface ImqReceiveFilterPhase { ImqReceiveFilterPhase withSelector(String jmsMessageSelector); // [NOVÉ] ImqAwaitingPhase receiveWhere(Predicate filter); List browse(int maxMessages); // [NOVÉ] } public interface ImqAwaitingPhase { MessageResponse withTimeout(long duration, TimeUnit unit); } ``` ##### Přehledná tabulka metod | Kontext | Metoda | Návrat | Poznámka | |---------|--------|--------|----------| | Harness | `withKafka()` | Kafka | [NOVÉ] vstupní bod | | Harness | `withImqFirstVision()` | ImqFirstVision | [NOVÉ] vstupní bod | | Kafka | `toTopic(String)` | KafkaPayloadPhase | [NOVÉ] odeslání | | Kafka | `fromTopic(String)` | KafkaReceiveFilterPhase | [NOVÉ] příjem | | KafkaPayloadPhase | `withKey(String)` | KafkaPayloadPhase | [NOVÉ] Kafka specifické | | KafkaPayloadPhase | `withPayload(String)` | KafkaPayloadPhase | [EXISTUJE] RawRestRequest.withPayload | | KafkaPayloadPhase | `withPayloadFromFile(String)` | KafkaPayloadPhase | [EXISTUJE] RawRestRequest.withPayloadFromFile | | KafkaPayloadPhase | `withPayloadFromTemplate(String, Map)` | KafkaPayloadPhase | [EXISTUJE] RawRestRequest.withPayloadFromTemplate (Template API) -- nice to have | | KafkaPayloadPhase | `addField(...)` | KafkaPayloadPhase | [EXISTUJE] RawRestRequest.addField, JSON pouze | | KafkaPayloadPhase | `appendToArray(...)` | KafkaPayloadPhase | [EXISTUJE] RawRestRequest.appendToArray, JSON pouze | | KafkaPayloadPhase | `withTraceparent/RequestID/ActivityID/SourceCodebookId(String)` | KafkaPayloadPhase | [NOVÉ] Kafka headery | | KafkaPayloadPhase | `withHeader(String, String)` | KafkaPayloadPhase | [EXISTUJE] RawRestRequest.withHeader | | KafkaPayloadPhase | `send()` | void | [NOVÉ] terminál (messaging specifické) | | KafkaReceiveFilterPhase | `receiveWhere(Predicate)` | KafkaAwaitingPhase | [NOVÉ] | | ImqFirstVision | `toQueue(ImqFirstVisionQueue)` | ImqPayloadPhase | [NOVÉ] | | ImqFirstVision | `fromQueue(ImqFirstVisionQueue)` | ImqReceiveFilterPhase | [NOVÉ] | | ImqPayloadPhase | `asXml()`, `asEbcdic()`, `asUtf8()` | ImqPayloadPhase | [NOVÉ] IBM MQ formát | | ImqPayloadPhase | `withPayload`, `withPayloadFromFile`, ... | ImqPayloadPhase | [EXISTUJE] jako Kafka | | ImqReceiveFilterPhase | `withSelector(String)` | ImqReceiveFilterPhase | [NOVÉ] JMS selector | | KafkaAwaitingPhase | `withTimeout(long, TimeUnit)` | MessageResponse | [NOVÉ] | | ImqReceiveFilterPhase | `receiveWhere(Predicate)` | ImqAwaitingPhase | [NOVÉ] | | ImqAwaitingPhase | `withTimeout(long, TimeUnit)` | MessageResponse | [NOVÉ] | | ImqReceiveFilterPhase | `browse(int maxMessages)` | List<ReceivedMessage> | [NOVÉ] terminál, nedestruktivní | | MessageResponse | `andAssertFieldValue(path, value)` | MessageResponse | [EXISTUJE] RawRestRequest.Response.andAssertFieldValue | | MessageResponse | `andAssertPresent(path)` | MessageResponse | [EXISTUJE] RawRestRequest.Response.andAssertPresent | | MessageResponse | `andAssertNotPresent(path)` | MessageResponse | [EXISTUJE] RawRestRequest.Response.andAssertNotPresent | | MessageResponse | `andAssertHeaderValue(name, value)` | MessageResponse | [NOVÉ] Kafka header / JMS property | | MessageResponse | `andAssertBodyContains(substring)` | MessageResponse | [NOVÉ] EBCDIC/UTF-8 | | MessageResponse | `andAssertWithAssertJ()` | ObjectAssert | [NOVÉ] nice to have | | MessageResponse | `extract(path)` | JsonPathValue | [EXISTUJE] RawRestRequest.Response.extract, .asText() | | MessageResponse | `mapTo(Class)` | T | [EXISTUJE] RawRestRequest.post(Class) -- deserializace | | MessageResponse | `getBody()`, `getHeader(name)` | String | [EXISTUJE] Response přístup k datům | **Poznámka k řetězci receiveWhere:** U Kafky `receiveWhere(filter)` vrací objekt s `withTimeout()` – timeout je povinný před assert. U IBM MQ může být timeout defaultní nebo explicitní dle implementace. Vzor podobný `RawRestRequest` - fluent builder s fázovými interfaces: ```java // ============================================= // === harness.withKafka() -- Kafka (Avro) ===== // ============================================= // Vstup: topic (schéma se resolví automaticky z Registry), klíč, tělo (JSON), headery // Odeslání zprávy na Kafka topic se standardními headery harness.withKafka() .toTopic("order-events") .withKey("order-123") .withPayload("{\"orderId\": \"123\", \"status\": \"CREATED\"}") .withTraceparent("00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01") .withRequestID("req-001") .withActivityID("act-555") .withSourceCodebookId("CB-01") .send(); // Libovolné vlastní headery harness.withKafka() .toTopic("order-events") .withKey("order-456") .withPayload("{\"orderId\": \"456\", \"status\": \"SHIPPED\"}") .withHeader("customHeader", "customValue") .send(); // Odeslání z JSON souboru harness.withKafka() .toTopic("order-events") .withKey("order-789") .withPayloadFromFile("messaging/order_event.json") .withRequestID("req-002") .send(); // Příjem zprávy z Kafka s filtrem (Avro -> JSON konverze automaticky) harness.withKafka() .fromTopic("order-events") .receiveWhere(msg -> msg.extract("orderId").equals("123")) .withTimeout(30, TimeUnit.SECONDS) .andAssertFieldValue("status", "CREATED"); // ============================================= // === harness.withImqFirstVision() -- IBM MQ == // ============================================= // --- JSON (výchozí formát) --- // Tester používá enum ImqFirstVisionQueue -- fyzický název fronty se resolvuje z konfigurace // Odeslání JSON zprávy do IBM MQ fronty harness.withImqFirstVision() .toQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS) .withPayload("{\"paymentId\": \"PAY-456\", \"result\": \"OK\"}") .send(); // Příjem JSON zprávy z IBM MQ fronty harness.withImqFirstVision() .fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS) .receiveWhere(msg -> msg.extract("paymentId").equals("PAY-456")) .withTimeout(10, TimeUnit.SECONDS) .andAssertFieldValue("result", "OK"); // --- XML --- // Odeslání XML zprávy harness.withImqFirstVision() .toQueue(ImqFirstVisionQueue.MF_REQUESTS) .asXml() .withPayload("12345BALANCE") .send(); // Odeslání XML ze souboru harness.withImqFirstVision() .toQueue(ImqFirstVisionQueue.MF_REQUESTS) .asXml() .withPayloadFromFile("messaging/mf_request.xml") .send(); // Příjem XML zprávy s XPath filtrací harness.withImqFirstVision() .fromQueue(ImqFirstVisionQueue.MF_RESPONSES) .asXml() .receiveWhere(msg -> msg.extract("/response/accountId").equals("12345")) .withTimeout(15, TimeUnit.SECONDS) .andAssertFieldValue("/response/balance", "50000"); // --- UTF-8 / CCSID 1208 (BytesMessage) --- // Odeslání zprávy v UTF-8 jako BytesMessage (CCSID 1208) harness.withImqFirstVision() .toQueue(ImqFirstVisionQueue.MF_UTF8) .asUtf8() .withPayload("DATA|12345|ÚČET|CZK") .send(); // Příjem UTF-8 BytesMessage harness.withImqFirstVision() .fromQueue(ImqFirstVisionQueue.MF_UTF8) .asUtf8() .receiveWhere(msg -> msg.getBody().contains("12345")) .withTimeout(15, TimeUnit.SECONDS); // --- EBCDIC / CCSID 870 --- // Odeslání zprávy v EBCDIC kódování (mainframe, CZ/SK znaky) harness.withImqFirstVision() .toQueue(ImqFirstVisionQueue.MF_EBCDIC) .asEbcdic() .withPayload("PŘIKAZ|12345|ZŮSTATEK|CZK") .send(); // Příjem EBCDIC zprávy (automaticky dekódováno z IBM-870 na String) harness.withImqFirstVision() .fromQueue(ImqFirstVisionQueue.MF_EBCDIC) .asEbcdic() .receiveWhere(msg -> msg.getBody().contains("12345")) .withTimeout(15, TimeUnit.SECONDS); ``` #### Inkrementální stavba payload pomocí addField / appendToArray Stejný vzor jako v existující třídě `RawRestRequest.Builder` (viz `addField()`, `appendToArray()` na řádcích 253-342 v [RawRestRequest.java](test-harness-master/src/main/java/cz/moneta/test/harness/support/rest/RawRestRequest.java)). Funguje pro JSON payloady (Kafka i IBM MQ JSON formát). Používá Jackson `ObjectMapper` pro manipulaci s JSON stromem. ```java // === Kafka: Sestavení payload přes addField === // Začátek s prázdným JSON objektem harness.withKafka() .toTopic("order-events") .withKey("order-123") .withPayload("{}") .addField("orderId", "123") .addField("status", "CREATED") .addField("items", Arrays.asList( Map.of("sku", "A001", "qty", 2), Map.of("sku", "B002", "qty", 1))) .addField("items[1]", "note", "fragile") .withRequestID("req-001") .send(); // Výsledný payload: {"orderId":"123","status":"CREATED", // "items":[{"sku":"A001","qty":2},{"sku":"B002","qty":1,"note":"fragile"}]} // Přidání do existujícího payloadu ze souboru harness.withKafka() .toTopic("order-events") .withKey("order-456") .withPayloadFromFile("messaging/order_event.json") .addField("metadata", new Object()) .addField("metadata", "source", "test-harness") .addField("metadata", "timestamp", System.currentTimeMillis()) .send(); // Přidání prvku do existujícího pole harness.withKafka() .toTopic("order-events") .withKey("order-789") .withPayloadFromFile("messaging/order_base.json") .appendToArray("items", Map.of("sku", "C003", "qty", 5)) .send(); // === IBM MQ: Sestavení JSON payload přes addField === harness.withImqFirstVision() .toQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS) .withPayload("{}") .addField("paymentId", "PAY-456") .addField("amount", 15000) .addField("currency", "CZK") .addField("beneficiary", new Object()) .addField("beneficiary", "name", "Jan Novák") .addField("beneficiary", "accountNumber", "1234567890/0100") .send(); ``` Podporované metody na builder fázi po `withPayload()` / `withPayloadFromFile()`: - `addField(String fieldName, Object value)` -- přidá pole do kořene JSON - `addField(String path, String fieldName, Object value)` -- přidá pole do vnořeného uzlu; `path` používá tečkovou notaci a hranatou závorku pro indexy polí (např. `"items[2].details"`) - `appendToArray(String path, Object value)` -- přidá prvek na konec existujícího JSON pole - `withPayloadFromTemplate(String templatePath, Map variables)` -- (nice to have) načte šablonu ze souboru a nahradí placeholdery (`${varName}`) hodnotami z mapy; hodnoty mohou pocházet z `harness.store()`. Pro složitější payloady bez nutnosti programově sestavovat JSON. Poznámka: `addField` a `appendToArray` fungují pouze pro JSON formát. Pro XML a EBCDIC/UTF8 formáty v IBM MQ se payload sestavuje jako celý String. #### Varianty výběru zprávy v receiveWhere ##### Kafka -- receiveWhere Kafka `receiveWhere` přijímá `Predicate` -- libovolný Java predikát na deserializovanou zprávu (Avro -> JSON). Zprávy se prohledávají sekvenčně v polling smyčce. Vrátí se **první** zpráva, která vyhovuje predikátu. ```java // --- Filtrace podle obsahu JSON pole --- harness.withKafka() .fromTopic("order-events") .receiveWhere(msg -> msg.extract("orderId").equals("123")) .withTimeout(30, TimeUnit.SECONDS); // --- Filtrace podle více polí (AND) --- harness.withKafka() .fromTopic("order-events") .receiveWhere(msg -> msg.extract("orderId").equals("123") && msg.extract("status").equals("CREATED")) .withTimeout(30, TimeUnit.SECONDS); // --- Filtrace podle Kafka headeru --- harness.withKafka() .fromTopic("order-events") .receiveWhere(msg -> "req-001".equals(msg.getHeader("requestID"))) .withTimeout(30, TimeUnit.SECONDS); // --- Filtrace podle klíče zprávy --- harness.withKafka() .fromTopic("order-events") .receiveWhere(msg -> "order-123".equals(msg.getKey())) .withTimeout(30, TimeUnit.SECONDS); // --- Filtrace kombinací headeru a obsahu --- harness.withKafka() .fromTopic("order-events") .receiveWhere(msg -> "act-555".equals(msg.getHeader("activityID")) && msg.extract("status").equals("SHIPPED")) .withTimeout(30, TimeUnit.SECONDS); // --- Příjem první dostupné zprávy (bez filtru) --- harness.withKafka() .fromTopic("order-events") .receiveWhere(msg -> true) .withTimeout(10, TimeUnit.SECONDS); // --- Příjem s vlastním timeoutem --- harness.withKafka() .fromTopic("order-events") .receiveWhere(msg -> msg.extract("orderId").equals("123")) .withTimeout(60, TimeUnit.SECONDS); // delší timeout pro pomalé systémy ``` ReceivedMessage metody dostupné v predikátu (Kafka): - `msg.extract("dotPath")` -- hodnota JSON pole (dot/bracket notace) - `msg.getHeader("headerName")` -- hodnota Kafka headeru - `msg.getKey()` -- klíč zprávy - `msg.getBody()` -- celé tělo jako JSON String - `msg.getTimestamp()` -- timestamp zprávy ##### IBM MQ -- receiveWhere IBM MQ podporuje dvě úrovně filtrace: 1. **JMS Message Selector** (server-side) -- efektivní filtr na JMS properties/headery, vyhodnocuje se na straně MQ serveru 2. **Predicate na obsahu** (client-side) -- filtr na tělo zprávy po deserializaci, vyhodnocuje se v klientu ```java // --- Filtrace podle obsahu JSON pole --- harness.withImqFirstVision() .fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS) .receiveWhere(msg -> msg.extract("paymentId").equals("PAY-456")) .withTimeout(10, TimeUnit.SECONDS); // --- Filtrace podle JMS CorrelationID (server-side selector) --- harness.withImqFirstVision() .fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS) .withSelector("JMSCorrelationID = 'corr-789'") .receiveWhere(msg -> true) .withTimeout(10, TimeUnit.SECONDS); // --- Kombinace JMS selectoru a content filtru --- harness.withImqFirstVision() .fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS) .withSelector("JMSType = 'payment'") .receiveWhere(msg -> msg.extract("amount").equals("15000")) .withTimeout(10, TimeUnit.SECONDS); // --- XML zprávy s XPath filtrací --- harness.withImqFirstVision() .fromQueue(ImqFirstVisionQueue.MF_RESPONSES) .asXml() .receiveWhere(msg -> msg.extract("/response/accountId").equals("12345")) .withTimeout(15, TimeUnit.SECONDS); // --- EBCDIC / UTF-8 zprávy -- filtrace přes raw body --- harness.withImqFirstVision() .fromQueue(ImqFirstVisionQueue.MF_EBCDIC) .asEbcdic() .receiveWhere(msg -> msg.getBody().contains("12345")) .withTimeout(15, TimeUnit.SECONDS); // --- Příjem první dostupné zprávy --- harness.withImqFirstVision() .fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS) .receiveWhere(msg -> true) .withTimeout(10, TimeUnit.SECONDS); // --- Browse (ne-destruktivní čtení, zprávy zůstávají ve frontě) --- List peeked = harness.withImqFirstVision() .fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS) .browse(10); // max 10 zpráv z fronty // s JMS selektorem: .withSelector("JMSType = 'payment'").browse(10) ``` ReceivedMessage metody dostupné v predikátu (IBM MQ): - `msg.extract("expression")` -- hodnota pole (JSON dot-path pro JSON, XPath pro XML) - `msg.getBody()` -- celé tělo zprávy jako String - `msg.getHeader("jmsProperty")` -- hodnota JMS property Klíčový rozdíl: `.withSelector()` je server-side filtr (SQL-92 subset, filtruje JMS properties) -- použijte ho pro efektivní filtraci podle korelačních ID. `.receiveWhere()` je client-side filtr na obsahu zprávy. #### Chybové stavy -- výjimky a assert metody ##### Výjimky (propagovány okamžitě, test failuje) Tyto chyby signalizují infrastrukturní problém nebo chybu konfigurace -- test nemůže pokračovat: ```java // --- Chyba připojení (Kafka i IBM MQ) --- // KafkaConnectionException / JMSException obalené do RuntimeException // Příčiny: špatný bootstrap-servers, connection-name-list, credentials, síťový problém try { harness.withKafka() .toTopic("order-events") .withKey("order-123") .withPayload("{\"orderId\": \"123\"}") .send(); } catch (MessagingConnectionException e) { // "Failed to connect to Kafka cluster at pkc-xxxxx:9092: Authentication failed" // "Failed to connect to IBM MQ: mq9multitst5x(1414),mq9multitst6x(1414) - MQRC 2035 (NOT_AUTHORIZED)" } // --- Chyba schématu (Kafka) --- // Schema Registry vrátí 404 nebo payload neodpovídá schématu try { harness.withKafka() .toTopic("nonexistent-topic") .withKey("key") .withPayload("{\"unknownField\": \"value\"}") .send(); } catch (MessagingSchemaException e) { // "Schema not found for subject 'nonexistent-topic-value' in Schema Registry" // "Payload does not conform to Avro schema for topic 'order-events': field 'orderId' is required" } // --- Queue neexistuje (IBM MQ) --- // Výjimka nastane i při chybné konfiguraci logického názvu v properties try { harness.withImqFirstVision() .toQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS) // resolvuje se z konfigurace .withPayload("{}") .send(); } catch (MessagingDestinationException e) { // "Queue 'MVSW2TST3.DELIVERY.NOTIFICATION' not found: MQRC 2085 (UNKNOWN_OBJECT_NAME)" } // --- Timeout při příjmu (Kafka i IBM MQ) --- // Žádná zpráva nevyhověla predikátu v daném čase try { harness.withKafka() .fromTopic("order-events") .receiveWhere(msg -> msg.extract("orderId").equals("nonexistent")) .withTimeout(5, TimeUnit.SECONDS); } catch (MessagingTimeoutException e) { // "No message matching filter found on topic 'order-events' within 5 seconds" // "No message matching filter found on queue 'PAYMENT.NOTIFICATIONS' within 10 seconds" } // --- Chyba konfigurace (endpoint není nakonfigurován) --- try { harness.withKafka() .toTopic("order-events") .withKey("key") .withPayload("{}") .send(); } catch (IllegalStateException e) { // "You need to configure endpoints.kafka.bootstrap-servers" // Stejný vzor jako u Wso2GatewayEndpoint } ``` Hierarchie výjimek: ``` MessagingException (abstract, extends RuntimeException) ├── MessagingConnectionException -- selhání připojení, autentizace ├── MessagingSchemaException -- Avro schema mismatch, schema not found ├── MessagingDestinationException -- topic/queue neexistuje, přístupová práva └── MessagingTimeoutException -- receive timeout, žádná matching zpráva ``` ##### Assert metody (fluent, na přijaté zprávě) Tyto metody se volají na výsledku `receiveWhere` -- zpráva byla nalezena a nyní se ověřuje její obsah. Assert selhání = `AssertionError` (standardní JUnit pattern). ```java // --- JSON assert metody (Kafka i IBM MQ JSON) --- harness.withKafka() .fromTopic("order-events") .receiveWhere(msg -> msg.extract("orderId").equals("123")) .withTimeout(30, TimeUnit.SECONDS) // Assert na hodnotu pole (dot/bracket notace) .andAssertFieldValue("status", "CREATED") .andAssertFieldValue("items[0].sku", "A001") // Assert na přítomnost/nepřítomnost pole .andAssertPresent("items[0].qty") .andAssertNotPresent("deletedField") // Assert na Kafka header .andAssertHeaderValue("requestID", "req-001") // Extrakce hodnoty pro další použití v testu .extract("internalId").asText(); // -> String pro uložení do harness.store() // --- XML assert metody (IBM MQ XML) --- harness.withImqFirstVision() .fromQueue(ImqFirstVisionQueue.MF_RESPONSES) .asXml() .receiveWhere(msg -> msg.extract("/response/accountId").equals("12345")) .withTimeout(15, TimeUnit.SECONDS) // XPath assert .andAssertFieldValue("/response/balance", "50000") .andAssertFieldValue("/response/currency", "CZK") .andAssertPresent("/response/timestamp") .andAssertNotPresent("/response/error"); // --- Raw body assert (EBCDIC/UTF-8) --- harness.withImqFirstVision() .fromQueue(ImqFirstVisionQueue.MF_EBCDIC) .asEbcdic() .receiveWhere(msg -> msg.getBody().contains("12345")) .withTimeout(15, TimeUnit.SECONDS) .andAssertBodyContains("ZŮSTATEK"); // --- Deserializace do Java objektu (mapTo) --- PaymentNotification notif = harness.withImqFirstVision() .fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS) .receiveWhere(msg -> msg.extract("paymentId").equals("PAY-456")) .withTimeout(10, TimeUnit.SECONDS) .mapTo(PaymentNotification.class); ``` Kompletní tabulka assert metod na `MessageResponse`: - `andAssertFieldValue(String path, String value)` -- JSON dot-path nebo XPath - `andAssertPresent(String path)` -- ověří existenci pole - `andAssertNotPresent(String path)` -- ověří neexistenci pole - `andAssertHeaderValue(String headerName, String value)` -- Kafka header nebo JMS property - `andAssertBodyContains(String substring)` -- raw body contains (pro EBCDIC/UTF-8/raw text) - `andAssertWithAssertJ()` -- *(nice to have)* vrací AssertJ `AbstractObjectAssert` pro komplexní fluent assertions . - `extract(String path)` -- extrahuje hodnotu pro další použití - `mapTo(Class type)` -- deserializace těla zprávy do Java objektu (Jackson/JAXB) - `getBody()` -- raw body String - `getHeader(String name)` -- raw header/property value ### 5. Konfigurace Konfigurační klíče dodržují konvenci existujících endpointů v Harness: prefix `endpoints.{system-name}.{property}` (viz `endpoints.wso2.gw.url`, `endpoints.udebs.url` atd.). #### Properties soubory (envs/tst1): ```properties # Kafka Confluent Cloud (Avro + Schema Registry) endpoints.kafka.bootstrap-servers=pkc-xxxxx.eu-central-1.aws.confluent.cloud:9092 endpoints.kafka.security-protocol=SASL_SSL endpoints.kafka.sasl-mechanism=PLAIN endpoints.kafka.schema-registry-url=https://psrc-xxxxx.eu-central-1.aws.confluent.cloud endpoints.kafka.value-serializer=avro # API key + secret a Schema Registry credentials se načítají z Vaultu # IBM MQ - First Vision (multi-instance QMGR) endpoints.imq-first-vision.connection-name-list=mq9multitst5x(1414),mq9multitst6x(1414) endpoints.imq-first-vision.channel=CLIENT.CHANNEL endpoints.imq-first-vision.queue-manager=MVSW2TST3 endpoints.imq-first-vision.ssl-cipher-suite=TLS_RSA_WITH_AES_256_CBC_SHA256 # user + heslo se načítají z Vaultu # Logické destinace (musí odpovídat enum ImqFirstVisionQueue) endpoints.imq-first-vision.payment-notifications.queue=MVSW2TST3.DELIVERY.NOTIFICATION endpoints.imq-first-vision.payment-request.queue=MVSW2TST3.DELIVERY.REQUEST endpoints.imq-first-vision.mf-requests.queue=MVSW2TST3.MF.REQUESTS endpoints.imq-first-vision.mf-responses.queue=MVSW2TST3.MF.RESPONSES endpoints.imq-first-vision.mf-ebcdic.queue=MVSW2TST3.MF.EBCDIC endpoints.imq-first-vision.mf-utf8.queue=MVSW2TST3.MF.UTF8 ``` #### Vault paths: ```properties vault.kafka.secrets.path=/kv/autotesty/tst1/kafka vault.imq-first-vision.secrets.path=/kv/autotesty/tst1/imq-first-vision ``` ### 6. Maven závislosti V [test-harness-master/pom.xml](test-harness-master/pom.xml) přidat: - `org.apache.kafka:kafka-clients` (Kafka producer/consumer) - `io.confluent:kafka-avro-serializer` (Avro + Schema Registry) - `com.ibm.mq:com.ibm.mq.allclient` (IBM MQ JMS klient) - `jakarta.jms:jakarta.jms-api` (JMS API) - `org.assertj:assertj-core` (pro `andAssertWithAssertJ()`, nice to have ) ### 7. Řešení klíčových problémů #### Poll cyklus a hledání zprávy v Kafce ```mermaid flowchart TD Start["receive(topic, filter, timeout)"] --> CreateConsumer["Vytvořit KafkaConsumer bez group.id"] CreateConsumer --> Assign["assign() na všechny partitions"] Assign --> SeekEnd["seekToEnd() pro všechny partitions"] SeekEnd --> SavePos["Uložit aktuální pozice"] SavePos --> Poll["poll(pollInterval)"] Poll --> Check{"Jsou nějaké záznamy?"} Check -->|Ano| Filter{"Odpovídá filter?"} Check -->|Ne| Timeout{"Vypršel timeout?"} Filter -->|Ano| Return["Vrátit ReceivedMessage"] Filter -->|Ne| Timeout Timeout -->|Ne| Backoff["Backoff wait"] Backoff --> Poll Timeout -->|Ano| Throw["Throw MessagingTimeoutException"] ``` Klíčové body: - `seekToEnd()` se volá **před** triggerem akce v testu (tester typicky: 1. připraví listener, 2. provede akci, 3. čeká na zprávu) - Alternativně: seek na pozici uloženou **před** testem - Polling interval: 100ms, exponential backoff do max 1s - Default timeout: 30s (konfigurovatelný) #### Izolace testů (consumer groups) **Kafka:** - Consumer **bez `group.id`** s manuálním `assign()` + `seek()` - Každý příjem zprávy vytvoří nový Consumer, přečte data a zavře ho - Žádné sdílení offsetů mezi testy ani vlákny - Alternativa pro komplexní scénáře: unikátní `group.id` per test (UUID) **IBM MQ:** - Pro nedestruktivní čtení: `QueueBrowser` (browse bez odstranění zprávy) - JMS message selector (`JMSCorrelationID = 'xxx'`) pro cílený příjem - Každý test si filtruje zprávy podle korelačních údajů ### 8. Souhrnný seznam souborů k vytvoření/úpravě #### Nové soubory v test-harness-master: - `connectors/messaging/KafkaConnector.java` - `connectors/messaging/IbmMqConnector.java` - `endpoints/kafka/KafkaEndpoint.java` - `endpoints/imq/ImqFirstVisionEndpoint.java` - `endpoints/imq/ImqFirstVisionQueue.java` (enum: logické názvy front) - `support/messaging/ReceivedMessage.java` - `support/messaging/MessageContentType.java` (enum: JSON, XML, RAW_TEXT) - `support/messaging/MqMessageFormat.java` (enum: JSON, XML, EBCDIC_870, UTF8_1208) - `support/messaging/MessageResponse.java` (interface: assert, extract, mapTo -- sdílený pro Kafka i IBM MQ) - `support/messaging/KafkaRequest.java` (fluent builder pro Kafka) - `support/messaging/ImqRequest.java` (fluent builder pro IBM MQ) - `messaging/exception/MessagingException.java` (abstract, extends RuntimeException) - `messaging/exception/MessagingConnectionException.java` - `messaging/exception/MessagingSchemaException.java` - `messaging/exception/MessagingDestinationException.java` - `messaging/exception/MessagingTimeoutException.java` #### Nové soubory v test-master: - `dsl/kafka/Kafka.java` (DSL třída pro `withKafka()`) - `dsl/imq/ImqFirstVision.java` (DSL třída pro `withImqFirstVision()`) #### Úpravy existujících souborů: - `test-harness-master/pom.xml` - přidat závislosti Kafka + IBM MQ - `test-master/pom.xml` - případné závislosti - `test-master/.../Harness.java` - přidat `withKafka()` a `withImqFirstVision()` metody - `test-master/src/test/resources/envs/tst1` - přidat `endpoints.kafka.`* a `endpoints.imq-first-vision.`* - `test-master/src/test/resources/envs/ppe` - přidat `endpoints.kafka.`* a `endpoints.imq-first-vision.`*