harness/kafka_ibm_mq_support_c8518eaa.plan 2.md
2026-03-16 18:59:21 +01:00

1342 lines
57 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

---
name: Kafka IBM MQ Support
overview: "Rozšíření testovacího frameworku Harness o podporu messaging systémů (Kafka na Confluent Cloud a IBM MQ) s kompletní abstrakcí, aby tester nemusel znát detaily jednotlivých protokolů. Změny probíhají ve dvou projektech: test-harness-master (framework) a test-master (DSL + testy)."
todos:
- id: maven-deps
content: Přidat Maven závislosti pro kafka-clients, kafka-avro-serializer, com.ibm.mq.allclient a jakarta.jms-api do pom.xml
status: pending
- id: model-classes
content: "Vytvořit model třídy: ReceivedMessage, MessageContentType, MqMessageFormat, MessageResponse interface"
status: pending
- id: exception-classes
content: "Vytvořit hierarchii výjimek: MessagingException (abstract), MessagingConnectionException, MessagingSchemaException, MessagingDestinationException, MessagingTimeoutException"
status: pending
- id: kafka-connector
content: Implementovat KafkaConnector s SASL/PLAIN auth, Avro+SchemaRegistry, producer pooling, consumer assign/seek (bez consumer group), poll cyklus s Predicate filtrem
status: pending
- id: ibmmq-connector
content: Implementovat IbmMqConnector s JMS klientem, user+password, SSL keystore, connection pooling, podpora JSON/XML/EBCDIC_870/UTF8_1208
status: pending
- id: kafka-endpoint
content: Vytvořit KafkaEndpoint - čte endpoints.kafka.* konfiguraci, credentials z Vaultu, lazy init KafkaConnectoru
status: pending
- id: ibmmq-endpoint
content: Vytvořit ImqFirstVisionEndpoint - čte endpoints.imq-first-vision.* konfiguraci, user + heslo z Vaultu
status: pending
- id: kafka-request
content: Implementovat KafkaRequest fluent builder - withKey, withPayload, withPayloadFromTemplate (nice), withTraceparent/requestID/activityID/sourceCodebookId, send, receiveWhere, andAssertFieldValue, andAssertWithAssertJ (nice), mapTo
status: pending
- id: ibmmq-request
content: Implementovat ImqRequest fluent builder - toQueue, withPayload, withPayloadFromTemplate (nice), asXml/asEbcdic/asUtf8, send, receiveWhere, browse, andAssertFieldValue, andAssertWithAssertJ (nice), mapTo
status: pending
- id: kafka-dsl
content: Vytvořit Kafka DSL třídu a přidat withKafka() do Harness.java
status: pending
- id: ibmmq-dsl
content: Vytvořit ImqFirstVision DSL třídu a přidat withImqFirstVision() do Harness.java
status: pending
- id: config
content: Přidat endpoints.kafka.* a endpoints.imq-first-vision.* konfiguraci do envs/ souborů + vault paths
status: pending
isProject: false
---
# Rozšíření testovacího frameworku o Kafka a IBM MQ
## Současná architektura
Framework je postaven na vrstvené architektuře:
```mermaid
graph TD
Test["Test (@TestCase)"] --> DSL["DSL vrstva (Harness.withXxx())"]
DSL --> EP["Endpoint (implementuje Endpoint)"]
EP --> CON["Connector (implementuje Connector)"]
CON --> SYS["Cílový systém"]
```
- **Connector** ([connectors/Connector.java](test-harness-master/src/main/java/cz/moneta/test/harness/connectors/Connector.java)): nízkoúrovňová komunikace s cílovým systémem
- **Endpoint** ([endpoints/Endpoint.java](test-harness-master/src/main/java/cz/moneta/test/harness/endpoints/Endpoint.java)): obaluje Connector, čte konfiguraci ze `StoreAccessor`, singleton per test class
- **DSL Builder** (v test-master, např. [Wso2.java](test-master/src/main/java/cz/moneta/test/dsl/wso2/Wso2.java)): fluent API pro testy
- **Harness** ([Harness.java](test-master/src/main/java/cz/moneta/test/dsl/Harness.java)): vstupní bod přes `withXxx()` metody
## Navrhovaná architektura
Kafka a IBM MQ mají oddělené vstupní body v Harness DSL -- analogicky k `withWso2()`, `withUfo()` apod.
```mermaid
graph TD
subgraph kafkaPath [Kafka]
TestK["harness.withKafka()"] --> KafkaDSL["Kafka DSL"]
KafkaDSL --> KafkaReq["KafkaRequest builder"]
KafkaReq --> KafkaEP["KafkaEndpoint"]
KafkaEP --> KafkaCon["KafkaConnector"]
KafkaCon --> ConflCloud["Confluent Cloud"]
end
subgraph imqPath [IBM MQ]
TestI["harness.withImqFirstVision()"] --> ImqDSL["ImqFirstVision DSL"]
ImqDSL --> ImqReq["ImqRequest builder"]
ImqReq --> ImqEP["ImqFirstVisionEndpoint"]
ImqEP --> ImqCon["IbmMqConnector"]
ImqCon --> ImqSrv["IBM MQ Server"]
end
KafkaCon -.->|"API key/secret"| Vault["HashiCorp Vault"]
ImqCon -.->|"user + password"| Vault
```
## Třídní diagram
```mermaid
classDiagram
class Connector {
<<interface>>
+close()
}
class Endpoint {
<<interface>>
+canAccess() bool
+close()
}
class KafkaConnector {
-KafkaProducer producer
-CachedSchemaRegistryClient schemaRegistry
+send(topic, key, jsonPayload, headers)
+receive(topic, filter, timeout) List~ReceivedMessage~
+close()
}
class IbmMqConnector {
-JmsConnectionFactory connectionFactory
-JMSContext jmsContext
+send(queue, payload, format, properties)
+receive(queue, selector, format, timeout) ReceivedMessage
+browse(queue, selector, format, maxMessages) List~ReceivedMessage~
+close()
}
class KafkaEndpoint {
-KafkaConnector connector
-StoreAccessor store
}
class ImqFirstVisionEndpoint {
-IbmMqConnector connector
-StoreAccessor store
}
class ReceivedMessage {
+String body
+MessageContentType contentType
+Map headers
+long timestamp
+extract(expression) String
+extractJson(path) JsonNode
+extractXml(xpath) String
}
class MqMessageFormat {
<<enum>>
JSON
XML
EBCDIC_870
UTF8_1208
}
Connector <|.. KafkaConnector
Connector <|.. IbmMqConnector
Endpoint <|.. KafkaEndpoint
Endpoint <|.. ImqFirstVisionEndpoint
KafkaEndpoint --> KafkaConnector
ImqFirstVisionEndpoint --> IbmMqConnector
```
## Detailní návrh tříd
### 1. Connectors (test-harness-master)
#### KafkaConnector
Umístění: `cz.moneta.test.harness.connectors.messaging.KafkaConnector`
- Spravuje `KafkaProducer` (lazy init, singleton, thread-safe) a `KafkaConsumer` (per-call, kvůli izolaci)
- SASL/PLAIN autentizace: API key + secret z Vaultu
- SSL context s truststore (stejný pattern jako `BaseRestConnector`)
- **Formát zpráv: Avro** -- používá Confluent Schema Registry:
- **Volba schématu je řízena názvem topicu**: Schema Registry subject = `{topic-name}-value` (Confluent TopicNameStrategy, výchozí). Tester nemusí schéma specifikovat -- connector ho získá automaticky z Registry podle topicu.
- Producer: `KafkaAvroSerializer` serializuje `GenericRecord` / `SpecificRecord`
- Consumer: `KafkaAvroDeserializer` deserializuje na `GenericRecord`
- Automatická konverze Avro `GenericRecord` -> JSON string v `ReceivedMessage` (transparentní pro testera)
- Pro odesílání: tester poskytne JSON string, connector stáhne schéma z Registry podle topicu a vytvoří `GenericRecord`
- **Vstup pro odeslání zprávy**: název topicu (= logická destinace), klíč (String), tělo zprávy (JSON String), headery (Map)
- **Podporované Kafka headery** -- framework poskytuje typované builder metody pro běžné tracing/correlation headery:
- `traceparent` -- W3C Trace Context propagace (např. `00-traceId-spanId-01`)
- `requestID` -- identifikátor požadavku
- `activityID` -- identifikátor aktivity
- `sourceCodebookId` -- identifikátor zdrojového číselníku
- Libovolné další headery přes generický `.withHeader(key, value)`
- **Izolace testů**: Consumer bez consumer group - používá `consumer.assign(partitions)` + `consumer.seekToEnd()` místo `subscribe()`. Díky tomu:
- Žádný consumer group = žádný rebalancing mezi testy
- Každý test si čte od aktuální pozice nezávisle
- Testy neovlivňují offsety ostatních testů
- **Poll cyklus**: Metoda `receive(topic, Predicate<ReceivedMessage>, Duration timeout)`:
1. Vytvoří nový Consumer
2. `assign()` na všechny partitions daného topicu
3. `seekToEnd()` (nebo `seek` na uloženou pozici)
4. Polling smyčka s exponenciálním backoff (100ms -> 500ms -> 1s)
5. Každý record: Avro GenericRecord -> JSON -> test proti Predicate
6. Vrátí první matching zprávu, nebo vyhodí `MessagingTimeoutException`
```java
public class KafkaConnector implements Connector {
private final Properties baseConfig;
private KafkaProducer<String, GenericRecord> producer;
private final String schemaRegistryUrl;
private final CachedSchemaRegistryClient schemaRegistryClient;
public KafkaConnector(String bootstrapServers, String apiKey,
String apiSecret, String schemaRegistryUrl,
String schemaRegistryApiKey,
String schemaRegistryApiSecret) { ... }
/**
* Odeslání zprávy na topic.
* Schéma se získá automaticky z Schema Registry podle topicu
* (subject = "{topic}-value", TopicNameStrategy).
*
* @param topic název Kafka topicu (= logická destinace)
* @param key klíč zprávy (String)
* @param jsonPayload tělo zprávy jako JSON String
* @param headers Kafka headery (traceparent, requestID, activityID,
* sourceCodebookId, ...)
*/
public void send(String topic, String key, String jsonPayload,
Map<String, String> headers) {
Schema schema = getSchemaForTopic(topic);
GenericRecord record = jsonToAvro(jsonPayload, schema);
ProducerRecord<String, GenericRecord> producerRecord =
new ProducerRecord<>(topic, key, record);
headers.forEach((k, v) ->
producerRecord.headers().add(k, v.getBytes(StandardCharsets.UTF_8)));
producer.send(producerRecord).get();
}
// Příjem: Avro GenericRecord se konvertuje na JSON v ReceivedMessage
public List<ReceivedMessage> receive(String topic,
Predicate<ReceivedMessage> filter,
Duration timeout) { ... }
public Map<TopicPartition, Long> saveOffsets(String topic) { ... }
// Stáhne latest schéma z Registry podle topicu (subject = {topic}-value)
private Schema getSchemaForTopic(String topic) {
String subject = topic + "-value";
SchemaMetadata meta = schemaRegistryClient.getLatestSchemaMetadata(subject);
return new Schema.Parser().parse(meta.getSchema());
}
private String avroToJson(GenericRecord record) { ... }
private GenericRecord jsonToAvro(String json, Schema schema) { ... }
}
```
#### IbmMqConnector
Umístění: `cz.moneta.test.harness.connectors.messaging.IbmMqConnector`
- Používá `com.ibm.mq.allclient` JMS klient
- `JmsConnectionFactory` pro připojení s user + password
- **Multi-instance Queue Manager**: Podpora connection name list ve formátu `host1(port1),host2(port2)` (např. `mq9multitst5x(1414),mq9multitst6x(1414)`). Nastavuje se přes `MQConnectionFactory.setConnectionNameList()` místo samostatných `setHostName()` / `setPort()`. IBM MQ klient automaticky provádí failover na další instanci při nedostupnosti.
- SSL/TLS s keystore a truststore
- **Podporované formáty zpráv:**
- **JSON** (výchozí): `JMS TextMessage` s plain JSON string
- **XML**: `JMS TextMessage` s XML string; příjem konvertuje XML na JSON v `ReceivedMessage` (přes Jackson `XmlMapper`) pro jednotné API
- **UTF-8 / CCSID 1208** (výchozí pro binární): `JMS BytesMessage` s payload kódovaným v UTF-8 (IBM CCSID 1208). MQMD CCSID = 1208. Vhodné pro systémy vyžadující explicitní UTF-8 BytesMessage.
- **EBCDIC / CCSID 870**: `JMS BytesMessage` s payload kódovaným v EBCDIC code page 870 (český/slovenský mainframe kódování). MQMD CCSID = 870. Při odesílání se Java String konvertuje na `byte[]` přes `Charset.forName("IBM870")`. Při příjmu se `byte[]` dekóduje zpět na Java String.
- **Logika formátu:**
- Formát se volí explicitně ve fluent API (`.asXml()`, `.asEbcdic()`, `.asUtf8()`); výchozí je JSON
- Při příjmu: connector detekuje typ JMS zprávy (`TextMessage` vs `BytesMessage`) a dekóduje odpovídajícím způsobem podle nastaveného formátu
- **Connection pooling**: Drží `JMSContext` per endpoint instance, recykluje sessions
- **Izolace testů**: Používá `QueueBrowser` pro nedestruktivní čtení (browse), nebo `createConsumer` s message selektorem
- `send()`, `receive()`, `browse()` metody
```java
public enum MqMessageFormat {
JSON, // TextMessage, plain JSON (UTF-8 default)
XML, // TextMessage, XML string (UTF-8 default)
EBCDIC_870, // BytesMessage, EBCDIC IBM-870 (česky/slovensky)
UTF8_1208 // BytesMessage, UTF-8 IBM CCSID 1208 (výchozí pro binární)
}
public class IbmMqConnector implements Connector {
private static final Charset EBCDIC_870 = Charset.forName("IBM870");
private static final Charset UTF_8 = StandardCharsets.UTF_8;
private final JmsConnectionFactory connectionFactory;
private JMSContext jmsContext;
// connectionNameList: "host1(port1),host2(port2)" pro multi-instance QMGR
public IbmMqConnector(String connectionNameList, String channel,
String queueManager, String user, String password,
String keystorePath, String keystorePassword) {
JmsConnectionFactory cf = ...;
cf.setStringProperty(WMQConstants.WMQ_CONNECTION_NAME_LIST, connectionNameList);
cf.setStringProperty(WMQConstants.WMQ_CHANNEL, channel);
cf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, queueManager);
cf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
// ...
}
// Odeslání zprávy s daným formátem
public void send(String queueName, String payload,
MqMessageFormat format,
Map<String, String> properties) {
switch (format) {
case JSON, XML -> sendTextMessage(queueName, payload, properties);
case EBCDIC_870 -> sendBytesMessage(queueName, payload, EBCDIC_870, 870, properties);
case UTF8_1208 -> sendBytesMessage(queueName, payload, UTF_8, 1208, properties);
}
}
// TextMessage pro JSON a XML (JVM default UTF-8)
private void sendTextMessage(String queueName, String payload,
Map<String, String> properties) { ... }
// BytesMessage s konverzí String -> byte[] v daném kódování + MQMD CCSID
private void sendBytesMessage(String queueName, String payload,
Charset charset, int ccsid,
Map<String, String> properties) {
byte[] bytes = payload.getBytes(charset);
// Vytvoří BytesMessage, nastaví MQMD CCSID, zapíše bytes
}
// Příjem: auto-detekce TextMessage vs BytesMessage
public ReceivedMessage receive(String queueName, String messageSelector,
MqMessageFormat expectedFormat,
Duration timeout) { ... }
/**
* Browse (ne-destruktivní čtení) -- zprávy zůstávají ve frontě.
* Vrací seznam zpráv odpovídajících messageSelector, bez jejich odstranění.
* Použití: inspekce obsahu fronty před/po testu, ověření počtu zpráv.
*/
public List<ReceivedMessage> browse(String queueName, String messageSelector,
MqMessageFormat expectedFormat,
int maxMessages) { ... }
// Dekódování přijaté zprávy podle formátu
private String decodeMessage(Message jmsMessage,
MqMessageFormat format) {
if (jmsMessage instanceof TextMessage txt) {
return txt.getText(); // JSON nebo XML string (UTF-8)
} else if (jmsMessage instanceof BytesMessage bytesMsg) {
byte[] data = new byte[(int) bytesMsg.getBodyLength()];
bytesMsg.readBytes(data);
Charset charset = switch (format) {
case EBCDIC_870 -> EBCDIC_870;
case UTF8_1208 -> UTF_8;
default -> UTF_8;
};
return new String(data, charset);
}
}
}
```
### 2. Model třídy (test-harness-master)
#### ReceivedMessage
Umístění: `cz.moneta.test.harness.support.messaging.ReceivedMessage`
Body je vždy String normalizovaný do čitelné podoby bez ohledu na zdroj a wire formát:
- Z Kafka: Avro `GenericRecord` je automaticky konvertován na JSON v `KafkaConnector`
- Z IBM MQ (JSON): JSON string z `JMS TextMessage` je předán přímo
- Z IBM MQ (XML): XML string z `JMS TextMessage` je předán jako XML; `extract()` podporuje jak JSON dotpath, tak XPath
- Z IBM MQ (EBCDIC): `byte[]` z `JMS BytesMessage` je dekódován z IBM-870 na Java String
```java
public enum MessageContentType {
JSON, XML, RAW_TEXT
}
public class ReceivedMessage {
private final String body; // dekódovaný textový obsah
private final MessageContentType contentType; // JSON, XML, nebo RAW_TEXT
private final Map<String, String> headers; // Kafka headers nebo JMS properties
private final long timestamp;
private final String source; // topic nebo queue name
private final String key; // Kafka message key (null pro IBM MQ)
// JSON navigace (dot/bracket notace: "items[0].sku")
public JsonNode extractJson(String path) { ... }
// XPath navigace (pro XML zprávy: "/response/balance")
public String extractXml(String xpathExpression) { ... }
// Univerzální extract - auto-detekce podle contentType
public String extract(String expression) {
return switch (contentType) {
case JSON -> extractJson(expression).asText();
case XML -> extractXml(expression);
case RAW_TEXT -> body;
};
}
public String getKey() { ... } // Kafka klíč, null pro IBM MQ
public String getHeader(String name) { ... } // Kafka header nebo JMS property
public String getBody() { ... } // surový textový obsah
public long getTimestamp() { ... }
public MessageContentType getContentType() { ... }
/**
* Deserializace těla zprávy do Java objektu.
* Pro JSON: Jackson ObjectMapper.readValue(body, type)
* Pro XML: JAXB Unmarshaller nebo Jackson XmlMapper
*/
public <T> T mapTo(Class<T> type) { ... }
}
```
### 3. Endpoints (test-harness-master)
#### KafkaEndpoint
Umístění: `cz.moneta.test.harness.endpoints.kafka.KafkaEndpoint`
- Vzor: analogie s `Wso2GatewayEndpoint` -- konstruktor přijímá `StoreAccessor`, čte `endpoints.kafka.*` konfiguraci
- Čte konfiguraci z `StoreAccessor`:
- `endpoints.kafka.bootstrap-servers`
- `endpoints.kafka.security-protocol`
- `endpoints.kafka.sasl-mechanism`
- `endpoints.kafka.schema-registry-url`
- `endpoints.kafka.value-serializer`
- Credentials z Vaultu (path: `vault.kafka.secrets.path`):
- Kafka API key + secret
- Schema Registry API key + secret
- Lazy init KafkaConnectoru
```java
public class KafkaEndpoint implements Endpoint {
private final KafkaConnector connector;
private final StoreAccessor store;
public KafkaEndpoint(StoreAccessor store) {
this.store = store;
String bootstrapServers = Optional.ofNullable(store.getConfig("endpoints.kafka.bootstrap-servers"))
.orElseThrow(() -> new IllegalStateException(
"You need to configure endpoints.kafka.bootstrap-servers"));
String schemaRegistryUrl = store.getConfig("endpoints.kafka.schema-registry-url");
// API key/secret z Vaultu
// ...
this.connector = new KafkaConnector(bootstrapServers, apiKey, apiSecret,
schemaRegistryUrl, srApiKey, srApiSecret);
}
public void send(String topic, String key, String jsonPayload,
Map<String, String> headers) {
connector.send(topic, key, jsonPayload, headers);
}
public List<ReceivedMessage> receive(String topic,
Predicate<ReceivedMessage> filter,
Duration timeout) {
return connector.receive(topic, filter, timeout);
}
@Override
public void close() { connector.close(); }
}
```
#### ImqFirstVisionEndpoint
Umístění: `cz.moneta.test.harness.endpoints.imq.ImqFirstVisionEndpoint`
- Vzor: analogie s `Wso2GatewayEndpoint`
- Čte konfiguraci z `StoreAccessor`:
- `endpoints.imq-first-vision.connection-name-list` -- formát `host1(port1),host2(port2)` pro multi-instance QMGR
- `endpoints.imq-first-vision.channel`
- `endpoints.imq-first-vision.queue-manager`
- `endpoints.imq-first-vision.ssl-cipher-suite`
- User i heslo z Vaultu (path: `vault.imq-first-vision.secrets.path`)
- Lazy init IbmMqConnectoru
```java
public class ImqFirstVisionEndpoint implements Endpoint {
private final IbmMqConnector connector;
private final StoreAccessor store;
public ImqFirstVisionEndpoint(StoreAccessor store) {
this.store = store;
// connection-name-list: "mq9multitst5x(1414),mq9multitst6x(1414)"
String connectionNameList = Optional.ofNullable(
store.getConfig("endpoints.imq-first-vision.connection-name-list"))
.orElseThrow(() -> new IllegalStateException(
"You need to configure endpoints.imq-first-vision.connection-name-list"));
String channel = store.getConfig("endpoints.imq-first-vision.channel");
String queueManager = store.getConfig("endpoints.imq-first-vision.queue-manager");
// user + password z Vaultu
String user = // z Vaultu (vault.imq-first-vision.secrets.path -> "user")
String password = // z Vaultu (vault.imq-first-vision.secrets.path -> "password")
// ...
this.connector = new IbmMqConnector(connectionNameList, channel,
queueManager, user, password, keystorePath, keystorePassword);
}
public void send(String queueName, String payload,
MqMessageFormat format, Map<String, String> properties) {
connector.send(queueName, payload, format, properties);
}
public ReceivedMessage receive(String queueName, String messageSelector,
MqMessageFormat format, Duration timeout) {
return connector.receive(queueName, messageSelector, format, timeout);
}
public List<ReceivedMessage> browse(String queueName, String messageSelector,
MqMessageFormat format, int maxMessages) {
return connector.browse(queueName, messageSelector, format, maxMessages);
}
// Resolve logického názvu fronty z konfigurace
public String resolveQueue(String logicalName) {
return Optional.ofNullable(
store.getConfig("endpoints.imq-first-vision." + logicalName + ".queue"))
.orElseThrow(() -> new IllegalStateException(
"Queue '" + logicalName + "' is not configured in " +
"endpoints.imq-first-vision." + logicalName + ".queue"));
}
@Override
public void close() { connector.close(); }
}
```
### 4. DSL Builder (test-master)
#### Kafka DSL
Umístění: `cz.moneta.test.dsl.kafka.Kafka`
Vstupní bod přes `Harness`:
```java
public Kafka withKafka() {
return new Kafka(this);
}
```
DSL třída uvnitř získá endpoint přes existující mechanismus:
```java
public class Kafka {
private final Harness harness;
public Kafka(Harness harness) { this.harness = harness; }
public KafkaRequest.TopicPhase prepareRequest() {
KafkaEndpoint endpoint = harness.getEndpoint(KafkaEndpoint.class);
return KafkaRequest.builder(endpoint);
}
}
```
#### ImqFirstVision DSL
Umístění: `cz.moneta.test.dsl.imq.ImqFirstVision`
Vstupní bod přes `Harness`:
```java
public ImqFirstVision withImqFirstVision() {
return new ImqFirstVision(this);
}
```
DSL třída uvnitř získá endpoint přes existující mechanismus:
```java
public class ImqFirstVision {
private final Harness harness;
public ImqFirstVision(Harness harness) { this.harness = harness; }
public ImqRequest.QueuePhase prepareRequest() {
ImqFirstVisionEndpoint endpoint = harness.getEndpoint(ImqFirstVisionEndpoint.class);
return ImqRequest.builder(endpoint);
}
}
```
#### Registrace endpointů v BaseStoreAccessor
`KafkaEndpoint` a `ImqFirstVisionEndpoint` **nevyžadují** explicitní registraci. Existující mechanismus v `BaseStoreAccessor.getEndpoint(Class)` je automaticky instanciuje přes reflexi (konstruktor `(StoreAccessor)`), cache-uje a spravuje lifecycle (zavírá přes `close()`). Podmínkou je, aby nové endpointy měly veřejný konstruktor `public XxxEndpoint(StoreAccessor store)` a implementovaly `Endpoint` interface.
#### ImqFirstVisionQueue enum
Umístění: `cz.moneta.test.harness.endpoints.imq.ImqFirstVisionQueue`
Enum definuje logické názvy front. Fyzický název fronty se resolvuje z konfigurace (`endpoints.imq-first-vision.{logicalName}.queue`). Tester používá enum konstantu -- nemusí znát fyzický název fronty ani konfigurační klíč.
```java
public enum ImqFirstVisionQueue {
PAYMENT_NOTIFICATIONS("payment-notifications"),
PAYMENT_REQUEST("payment-request"),
MF_REQUESTS("mf-requests"),
MF_RESPONSES("mf-responses"),
MF_EBCDIC("mf-ebcdic"),
MF_UTF8("mf-utf8");
private final String configKey;
ImqFirstVisionQueue(String configKey) {
this.configKey = configKey;
}
public String getConfigKey() { return configKey; }
}
```
Resoluce v `ImqFirstVisionEndpoint.resolveQueue()`:
- `ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS` -> čte `endpoints.imq-first-vision.payment-notifications.queue` -> vrátí `"MVSW2TST3.DELIVERY.NOTIFICATION"`
#### KafkaRequest fluent builder
Umístění: `cz.moneta.test.harness.support.messaging.KafkaRequest`
#### ImqRequest fluent builder
Umístění: `cz.moneta.test.harness.support.messaging.ImqRequest`
#### Definice fluent API pro realizaci
Následující rozhraní a metody definují kompletní fluent API. DSL třídy (`Kafka`, `ImqFirstVision`) delegují na request buildery (`KafkaRequest`, `ImqRequest`).
**Fáze builderu:** Každá metoda vrací rozhraní, které umožňuje další volání až do terminálu (`send()` nebo `MessageResponse`).
**Legenda k implementaci:**
- **[EXISTUJE]** -- obdobná logika již je v frameworku (RawRestRequest), lze znovu využít nebo adaptovat
- **[NOVÉ]** -- je potřeba implementovat
##### KafkaRequest fázová rozhraní
```java
// Fáze 1: Po withKafka() -- výběr směru (odeslání vs. příjem) -- [NOVÉ]
public interface KafkaSendPhase {
KafkaPayloadPhase toTopic(String topic);
}
public interface KafkaReceivePhase {
KafkaReceiveFilterPhase fromTopic(String topic);
}
// Fáze 2a: Odeslání - payload a headery
// withPayload, withPayloadFromFile, addField, appendToArray, withHeader [EXISTUJE] v RawRestRequest
// withKey, withTraceparent/RequestID/ActivityID/SourceCodebookId, send [NOVÉ]
public interface KafkaPayloadPhase {
KafkaPayloadPhase withKey(String key); // [NOVÉ]
KafkaPayloadPhase withPayload(String json); // [EXISTUJE]
KafkaPayloadPhase withPayloadFromFile(String path); // [EXISTUJE]
KafkaPayloadPhase withPayloadFromTemplate(String path, Map<String, Object> variables); // [EXISTUJE] nice to have
KafkaPayloadPhase addField(String fieldName, Object value);
KafkaPayloadPhase addField(String path, String fieldName, Object value);
KafkaPayloadPhase appendToArray(String path, Object value);
KafkaPayloadPhase withTraceparent(String value); // [NOVÉ]
KafkaPayloadPhase withRequestID(String value); // [NOVÉ]
KafkaPayloadPhase withActivityID(String value); // [NOVÉ]
KafkaPayloadPhase withSourceCodebookId(String value); // [NOVÉ]
KafkaPayloadPhase withHeader(String key, String value); // [EXISTUJE]
void send(); // [NOVÉ]
}
// Fáze 2b: Příjem - filtr a timeout -- [NOVÉ]
public interface KafkaReceiveFilterPhase {
KafkaAwaitingPhase receiveWhere(Predicate<ReceivedMessage> filter);
}
public interface KafkaAwaitingPhase {
MessageResponse withTimeout(long duration, TimeUnit unit);
}
// MessageResponse -- obdobné RawRestRequest.Response
// andAssertFieldValue, andAssertPresent, andAssertNotPresent, extract, mapTo [EXISTUJE]
// andAssertHeaderValue, andAssertBodyContains, andAssertWithAssertJ [NOVÉ]
public interface MessageResponse {
MessageResponse andAssertFieldValue(String path, String value); // [EXISTUJE]
MessageResponse andAssertPresent(String path); // [EXISTUJE]
MessageResponse andAssertNotPresent(String path); // [EXISTUJE]
MessageResponse andAssertHeaderValue(String headerName, String value); // [NOVÉ]
MessageResponse andAssertBodyContains(String substring); // [NOVÉ] primárně IBM MQ
ObjectAssert<?> andAssertWithAssertJ(); // [NOVÉ] nice to have
JsonPathValue extract(String path); // [EXISTUJE]
<T> T mapTo(Class<T> type); // [EXISTUJE] jako post(Class)
ReceivedMessage getMessage();
String getBody();
String getHeader(String name);
}
```
##### ImqRequest fázová rozhraní
```java
// Fáze 1: Po withImqFirstVision() -- [NOVÉ]
public interface ImqSendPhase {
ImqPayloadPhase toQueue(ImqFirstVisionQueue queue);
}
public interface ImqReceivePhase {
ImqReceiveFilterPhase fromQueue(ImqFirstVisionQueue queue);
}
// Fáze 2a: Odeslání - asXml/asEbcdic/asUtf8 [NOVÉ], payload/addField/appendToArray [EXISTUJE]
public interface ImqPayloadPhase {
ImqPayloadPhase asXml(); // [NOVÉ]
ImqPayloadPhase asEbcdic(); // [NOVÉ]
ImqPayloadPhase asUtf8(); // [NOVÉ]
ImqPayloadPhase withPayload(String payload);
ImqPayloadPhase withPayloadFromFile(String path);
ImqPayloadPhase withPayloadFromTemplate(String path, Map<String, Object> variables);
ImqPayloadPhase addField(String fieldName, Object value);
ImqPayloadPhase addField(String path, String fieldName, Object value);
ImqPayloadPhase appendToArray(String path, Object value);
void send();
}
// Fáze 2b: Příjem - withSelector, receiveWhere, browse [NOVÉ]
public interface ImqReceiveFilterPhase {
ImqReceiveFilterPhase withSelector(String jmsMessageSelector); // [NOVÉ]
ImqAwaitingPhase receiveWhere(Predicate<ReceivedMessage> filter);
List<ReceivedMessage> browse(int maxMessages); // [NOVÉ]
}
public interface ImqAwaitingPhase {
MessageResponse withTimeout(long duration, TimeUnit unit);
}
```
##### Přehledná tabulka metod
| Kontext | Metoda | Návrat | Poznámka |
|---------|--------|--------|----------|
| Harness | `withKafka()` | Kafka | [NOVÉ] vstupní bod |
| Harness | `withImqFirstVision()` | ImqFirstVision | [NOVÉ] vstupní bod |
| Kafka | `toTopic(String)` | KafkaPayloadPhase | [NOVÉ] odeslání |
| Kafka | `fromTopic(String)` | KafkaReceiveFilterPhase | [NOVÉ] příjem |
| KafkaPayloadPhase | `withKey(String)` | KafkaPayloadPhase | [NOVÉ] Kafka specifické |
| KafkaPayloadPhase | `withPayload(String)` | KafkaPayloadPhase | [EXISTUJE] RawRestRequest.withPayload |
| KafkaPayloadPhase | `withPayloadFromFile(String)` | KafkaPayloadPhase | [EXISTUJE] RawRestRequest.withPayloadFromFile |
| KafkaPayloadPhase | `withPayloadFromTemplate(String, Map)` | KafkaPayloadPhase | [EXISTUJE] RawRestRequest.withPayloadFromTemplate (Template API) -- nice to have |
| KafkaPayloadPhase | `addField(...)` | KafkaPayloadPhase | [EXISTUJE] RawRestRequest.addField, JSON pouze |
| KafkaPayloadPhase | `appendToArray(...)` | KafkaPayloadPhase | [EXISTUJE] RawRestRequest.appendToArray, JSON pouze |
| KafkaPayloadPhase | `withTraceparent/RequestID/ActivityID/SourceCodebookId(String)` | KafkaPayloadPhase | [NOVÉ] Kafka headery |
| KafkaPayloadPhase | `withHeader(String, String)` | KafkaPayloadPhase | [EXISTUJE] RawRestRequest.withHeader |
| KafkaPayloadPhase | `send()` | void | [NOVÉ] terminál (messaging specifické) |
| KafkaReceiveFilterPhase | `receiveWhere(Predicate)` | KafkaAwaitingPhase | [NOVÉ] |
| ImqFirstVision | `toQueue(ImqFirstVisionQueue)` | ImqPayloadPhase | [NOVÉ] |
| ImqFirstVision | `fromQueue(ImqFirstVisionQueue)` | ImqReceiveFilterPhase | [NOVÉ] |
| ImqPayloadPhase | `asXml()`, `asEbcdic()`, `asUtf8()` | ImqPayloadPhase | [NOVÉ] IBM MQ formát |
| ImqPayloadPhase | `withPayload`, `withPayloadFromFile`, ... | ImqPayloadPhase | [EXISTUJE] jako Kafka |
| ImqReceiveFilterPhase | `withSelector(String)` | ImqReceiveFilterPhase | [NOVÉ] JMS selector |
| KafkaAwaitingPhase | `withTimeout(long, TimeUnit)` | MessageResponse | [NOVÉ] |
| ImqReceiveFilterPhase | `receiveWhere(Predicate)` | ImqAwaitingPhase | [NOVÉ] |
| ImqAwaitingPhase | `withTimeout(long, TimeUnit)` | MessageResponse | [NOVÉ] |
| ImqReceiveFilterPhase | `browse(int maxMessages)` | List&lt;ReceivedMessage&gt; | [NOVÉ] terminál, nedestruktivní |
| MessageResponse | `andAssertFieldValue(path, value)` | MessageResponse | [EXISTUJE] RawRestRequest.Response.andAssertFieldValue |
| MessageResponse | `andAssertPresent(path)` | MessageResponse | [EXISTUJE] RawRestRequest.Response.andAssertPresent |
| MessageResponse | `andAssertNotPresent(path)` | MessageResponse | [EXISTUJE] RawRestRequest.Response.andAssertNotPresent |
| MessageResponse | `andAssertHeaderValue(name, value)` | MessageResponse | [NOVÉ] Kafka header / JMS property |
| MessageResponse | `andAssertBodyContains(substring)` | MessageResponse | [NOVÉ] EBCDIC/UTF-8 |
| MessageResponse | `andAssertWithAssertJ()` | ObjectAssert | [NOVÉ] nice to have |
| MessageResponse | `extract(path)` | JsonPathValue | [EXISTUJE] RawRestRequest.Response.extract, .asText() |
| MessageResponse | `mapTo(Class)` | T | [EXISTUJE] RawRestRequest.post(Class) -- deserializace |
| MessageResponse | `getBody()`, `getHeader(name)` | String | [EXISTUJE] Response přístup k datům |
**Poznámka k řetězci receiveWhere:** U Kafky `receiveWhere(filter)` vrací objekt s `withTimeout()` timeout je povinný před assert. U IBM MQ může být timeout defaultní nebo explicitní dle implementace.
Vzor podobný `RawRestRequest` - fluent builder s fázovými interfaces:
```java
// =============================================
// === harness.withKafka() -- Kafka (Avro) =====
// =============================================
// Vstup: topic (schéma se resolví automaticky z Registry), klíč, tělo (JSON), headery
// Odeslání zprávy na Kafka topic se standardními headery
harness.withKafka()
.toTopic("order-events")
.withKey("order-123")
.withPayload("{\"orderId\": \"123\", \"status\": \"CREATED\"}")
.withTraceparent("00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01")
.withRequestID("req-001")
.withActivityID("act-555")
.withSourceCodebookId("CB-01")
.send();
// Libovolné vlastní headery
harness.withKafka()
.toTopic("order-events")
.withKey("order-456")
.withPayload("{\"orderId\": \"456\", \"status\": \"SHIPPED\"}")
.withHeader("customHeader", "customValue")
.send();
// Odeslání z JSON souboru
harness.withKafka()
.toTopic("order-events")
.withKey("order-789")
.withPayloadFromFile("messaging/order_event.json")
.withRequestID("req-002")
.send();
// Příjem zprávy z Kafka s filtrem (Avro -> JSON konverze automaticky)
harness.withKafka()
.fromTopic("order-events")
.receiveWhere(msg -> msg.extract("orderId").equals("123"))
.withTimeout(30, TimeUnit.SECONDS)
.andAssertFieldValue("status", "CREATED");
// =============================================
// === harness.withImqFirstVision() -- IBM MQ ==
// =============================================
// --- JSON (výchozí formát) ---
// Tester používá enum ImqFirstVisionQueue -- fyzický název fronty se resolvuje z konfigurace
// Odeslání JSON zprávy do IBM MQ fronty
harness.withImqFirstVision()
.toQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
.withPayload("{\"paymentId\": \"PAY-456\", \"result\": \"OK\"}")
.send();
// Příjem JSON zprávy z IBM MQ fronty
harness.withImqFirstVision()
.fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
.receiveWhere(msg -> msg.extract("paymentId").equals("PAY-456"))
.withTimeout(10, TimeUnit.SECONDS)
.andAssertFieldValue("result", "OK");
// --- XML ---
// Odeslání XML zprávy
harness.withImqFirstVision()
.toQueue(ImqFirstVisionQueue.MF_REQUESTS)
.asXml()
.withPayload("<request><accountId>12345</accountId><action>BALANCE</action></request>")
.send();
// Odeslání XML ze souboru
harness.withImqFirstVision()
.toQueue(ImqFirstVisionQueue.MF_REQUESTS)
.asXml()
.withPayloadFromFile("messaging/mf_request.xml")
.send();
// Příjem XML zprávy s XPath filtrací
harness.withImqFirstVision()
.fromQueue(ImqFirstVisionQueue.MF_RESPONSES)
.asXml()
.receiveWhere(msg -> msg.extract("/response/accountId").equals("12345"))
.withTimeout(15, TimeUnit.SECONDS)
.andAssertFieldValue("/response/balance", "50000");
// --- UTF-8 / CCSID 1208 (BytesMessage) ---
// Odeslání zprávy v UTF-8 jako BytesMessage (CCSID 1208)
harness.withImqFirstVision()
.toQueue(ImqFirstVisionQueue.MF_UTF8)
.asUtf8()
.withPayload("DATA|12345|ÚČET|CZK")
.send();
// Příjem UTF-8 BytesMessage
harness.withImqFirstVision()
.fromQueue(ImqFirstVisionQueue.MF_UTF8)
.asUtf8()
.receiveWhere(msg -> msg.getBody().contains("12345"))
.withTimeout(15, TimeUnit.SECONDS);
// --- EBCDIC / CCSID 870 ---
// Odeslání zprávy v EBCDIC kódování (mainframe, CZ/SK znaky)
harness.withImqFirstVision()
.toQueue(ImqFirstVisionQueue.MF_EBCDIC)
.asEbcdic()
.withPayload("PŘIKAZ|12345|ZŮSTATEK|CZK")
.send();
// Příjem EBCDIC zprávy (automaticky dekódováno z IBM-870 na String)
harness.withImqFirstVision()
.fromQueue(ImqFirstVisionQueue.MF_EBCDIC)
.asEbcdic()
.receiveWhere(msg -> msg.getBody().contains("12345"))
.withTimeout(15, TimeUnit.SECONDS);
```
#### Inkrementální stavba payload pomocí addField / appendToArray
Stejný vzor jako v existující třídě `RawRestRequest.Builder` (viz `addField()`, `appendToArray()` na řádcích 253-342 v [RawRestRequest.java](test-harness-master/src/main/java/cz/moneta/test/harness/support/rest/RawRestRequest.java)). Funguje pro JSON payloady (Kafka i IBM MQ JSON formát). Používá Jackson `ObjectMapper` pro manipulaci s JSON stromem.
```java
// === Kafka: Sestavení payload přes addField ===
// Začátek s prázdným JSON objektem
harness.withKafka()
.toTopic("order-events")
.withKey("order-123")
.withPayload("{}")
.addField("orderId", "123")
.addField("status", "CREATED")
.addField("items", Arrays.asList(
Map.of("sku", "A001", "qty", 2),
Map.of("sku", "B002", "qty", 1)))
.addField("items[1]", "note", "fragile")
.withRequestID("req-001")
.send();
// Výsledný payload: {"orderId":"123","status":"CREATED",
// "items":[{"sku":"A001","qty":2},{"sku":"B002","qty":1,"note":"fragile"}]}
// Přidání do existujícího payloadu ze souboru
harness.withKafka()
.toTopic("order-events")
.withKey("order-456")
.withPayloadFromFile("messaging/order_event.json")
.addField("metadata", new Object())
.addField("metadata", "source", "test-harness")
.addField("metadata", "timestamp", System.currentTimeMillis())
.send();
// Přidání prvku do existujícího pole
harness.withKafka()
.toTopic("order-events")
.withKey("order-789")
.withPayloadFromFile("messaging/order_base.json")
.appendToArray("items", Map.of("sku", "C003", "qty", 5))
.send();
// === IBM MQ: Sestavení JSON payload přes addField ===
harness.withImqFirstVision()
.toQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
.withPayload("{}")
.addField("paymentId", "PAY-456")
.addField("amount", 15000)
.addField("currency", "CZK")
.addField("beneficiary", new Object())
.addField("beneficiary", "name", "Jan Novák")
.addField("beneficiary", "accountNumber", "1234567890/0100")
.send();
```
Podporované metody na builder fázi po `withPayload()` / `withPayloadFromFile()`:
- `addField(String fieldName, Object value)` -- přidá pole do kořene JSON
- `addField(String path, String fieldName, Object value)` -- přidá pole do vnořeného uzlu; `path` používá tečkovou notaci a hranatou závorku pro indexy polí (např. `"items[2].details"`)
- `appendToArray(String path, Object value)` -- přidá prvek na konec existujícího JSON pole
- `withPayloadFromTemplate(String templatePath, Map<String, Object> variables)` -- (nice to have) načte šablonu ze souboru a nahradí placeholdery (`${varName}`) hodnotami z mapy; hodnoty mohou pocházet z `harness.store()`. Pro složitější payloady bez nutnosti programově sestavovat JSON.
Poznámka: `addField` a `appendToArray` fungují pouze pro JSON formát. Pro XML a EBCDIC/UTF8 formáty v IBM MQ se payload sestavuje jako celý String.
#### Varianty výběru zprávy v receiveWhere
##### Kafka -- receiveWhere
Kafka `receiveWhere` přijímá `Predicate<ReceivedMessage>` -- libovolný Java predikát na deserializovanou zprávu (Avro -> JSON). Zprávy se prohledávají sekvenčně v polling smyčce. Vrátí se **první** zpráva, která vyhovuje predikátu.
```java
// --- Filtrace podle obsahu JSON pole ---
harness.withKafka()
.fromTopic("order-events")
.receiveWhere(msg -> msg.extract("orderId").equals("123"))
.withTimeout(30, TimeUnit.SECONDS);
// --- Filtrace podle více polí (AND) ---
harness.withKafka()
.fromTopic("order-events")
.receiveWhere(msg ->
msg.extract("orderId").equals("123")
&& msg.extract("status").equals("CREATED"))
.withTimeout(30, TimeUnit.SECONDS);
// --- Filtrace podle Kafka headeru ---
harness.withKafka()
.fromTopic("order-events")
.receiveWhere(msg -> "req-001".equals(msg.getHeader("requestID")))
.withTimeout(30, TimeUnit.SECONDS);
// --- Filtrace podle klíče zprávy ---
harness.withKafka()
.fromTopic("order-events")
.receiveWhere(msg -> "order-123".equals(msg.getKey()))
.withTimeout(30, TimeUnit.SECONDS);
// --- Filtrace kombinací headeru a obsahu ---
harness.withKafka()
.fromTopic("order-events")
.receiveWhere(msg ->
"act-555".equals(msg.getHeader("activityID"))
&& msg.extract("status").equals("SHIPPED"))
.withTimeout(30, TimeUnit.SECONDS);
// --- Příjem první dostupné zprávy (bez filtru) ---
harness.withKafka()
.fromTopic("order-events")
.receiveWhere(msg -> true)
.withTimeout(10, TimeUnit.SECONDS);
// --- Příjem s vlastním timeoutem ---
harness.withKafka()
.fromTopic("order-events")
.receiveWhere(msg -> msg.extract("orderId").equals("123"))
.withTimeout(60, TimeUnit.SECONDS); // delší timeout pro pomalé systémy
```
ReceivedMessage metody dostupné v predikátu (Kafka):
- `msg.extract("dotPath")` -- hodnota JSON pole (dot/bracket notace)
- `msg.getHeader("headerName")` -- hodnota Kafka headeru
- `msg.getKey()` -- klíč zprávy
- `msg.getBody()` -- celé tělo jako JSON String
- `msg.getTimestamp()` -- timestamp zprávy
##### IBM MQ -- receiveWhere
IBM MQ podporuje dvě úrovně filtrace:
1. **JMS Message Selector** (server-side) -- efektivní filtr na JMS properties/headery, vyhodnocuje se na straně MQ serveru
2. **Predicate na obsahu** (client-side) -- filtr na tělo zprávy po deserializaci, vyhodnocuje se v klientu
```java
// --- Filtrace podle obsahu JSON pole ---
harness.withImqFirstVision()
.fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
.receiveWhere(msg -> msg.extract("paymentId").equals("PAY-456"))
.withTimeout(10, TimeUnit.SECONDS);
// --- Filtrace podle JMS CorrelationID (server-side selector) ---
harness.withImqFirstVision()
.fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
.withSelector("JMSCorrelationID = 'corr-789'")
.receiveWhere(msg -> true)
.withTimeout(10, TimeUnit.SECONDS);
// --- Kombinace JMS selectoru a content filtru ---
harness.withImqFirstVision()
.fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
.withSelector("JMSType = 'payment'")
.receiveWhere(msg -> msg.extract("amount").equals("15000"))
.withTimeout(10, TimeUnit.SECONDS);
// --- XML zprávy s XPath filtrací ---
harness.withImqFirstVision()
.fromQueue(ImqFirstVisionQueue.MF_RESPONSES)
.asXml()
.receiveWhere(msg -> msg.extract("/response/accountId").equals("12345"))
.withTimeout(15, TimeUnit.SECONDS);
// --- EBCDIC / UTF-8 zprávy -- filtrace přes raw body ---
harness.withImqFirstVision()
.fromQueue(ImqFirstVisionQueue.MF_EBCDIC)
.asEbcdic()
.receiveWhere(msg -> msg.getBody().contains("12345"))
.withTimeout(15, TimeUnit.SECONDS);
// --- Příjem první dostupné zprávy ---
harness.withImqFirstVision()
.fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
.receiveWhere(msg -> true)
.withTimeout(10, TimeUnit.SECONDS);
// --- Browse (ne-destruktivní čtení, zprávy zůstávají ve frontě) ---
List<ReceivedMessage> peeked = harness.withImqFirstVision()
.fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
.browse(10); // max 10 zpráv z fronty
// s JMS selektorem: .withSelector("JMSType = 'payment'").browse(10)
```
ReceivedMessage metody dostupné v predikátu (IBM MQ):
- `msg.extract("expression")` -- hodnota pole (JSON dot-path pro JSON, XPath pro XML)
- `msg.getBody()` -- celé tělo zprávy jako String
- `msg.getHeader("jmsProperty")` -- hodnota JMS property
Klíčový rozdíl: `.withSelector()` je server-side filtr (SQL-92 subset, filtruje JMS properties) -- použijte ho pro efektivní filtraci podle korelačních ID. `.receiveWhere()` je client-side filtr na obsahu zprávy.
#### Chybové stavy -- výjimky a assert metody
##### Výjimky (propagovány okamžitě, test failuje)
Tyto chyby signalizují infrastrukturní problém nebo chybu konfigurace -- test nemůže pokračovat:
```java
// --- Chyba připojení (Kafka i IBM MQ) ---
// KafkaConnectionException / JMSException obalené do RuntimeException
// Příčiny: špatný bootstrap-servers, connection-name-list, credentials, síťový problém
try {
harness.withKafka()
.toTopic("order-events")
.withKey("order-123")
.withPayload("{\"orderId\": \"123\"}")
.send();
} catch (MessagingConnectionException e) {
// "Failed to connect to Kafka cluster at pkc-xxxxx:9092: Authentication failed"
// "Failed to connect to IBM MQ: mq9multitst5x(1414),mq9multitst6x(1414) - MQRC 2035 (NOT_AUTHORIZED)"
}
// --- Chyba schématu (Kafka) ---
// Schema Registry vrátí 404 nebo payload neodpovídá schématu
try {
harness.withKafka()
.toTopic("nonexistent-topic")
.withKey("key")
.withPayload("{\"unknownField\": \"value\"}")
.send();
} catch (MessagingSchemaException e) {
// "Schema not found for subject 'nonexistent-topic-value' in Schema Registry"
// "Payload does not conform to Avro schema for topic 'order-events': field 'orderId' is required"
}
// --- Queue neexistuje (IBM MQ) ---
// Výjimka nastane i při chybné konfiguraci logického názvu v properties
try {
harness.withImqFirstVision()
.toQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS) // resolvuje se z konfigurace
.withPayload("{}")
.send();
} catch (MessagingDestinationException e) {
// "Queue 'MVSW2TST3.DELIVERY.NOTIFICATION' not found: MQRC 2085 (UNKNOWN_OBJECT_NAME)"
}
// --- Timeout při příjmu (Kafka i IBM MQ) ---
// Žádná zpráva nevyhověla predikátu v daném čase
try {
harness.withKafka()
.fromTopic("order-events")
.receiveWhere(msg -> msg.extract("orderId").equals("nonexistent"))
.withTimeout(5, TimeUnit.SECONDS);
} catch (MessagingTimeoutException e) {
// "No message matching filter found on topic 'order-events' within 5 seconds"
// "No message matching filter found on queue 'PAYMENT.NOTIFICATIONS' within 10 seconds"
}
// --- Chyba konfigurace (endpoint není nakonfigurován) ---
try {
harness.withKafka()
.toTopic("order-events")
.withKey("key")
.withPayload("{}")
.send();
} catch (IllegalStateException e) {
// "You need to configure endpoints.kafka.bootstrap-servers"
// Stejný vzor jako u Wso2GatewayEndpoint
}
```
Hierarchie výjimek:
```
MessagingException (abstract, extends RuntimeException)
├── MessagingConnectionException -- selhání připojení, autentizace
├── MessagingSchemaException -- Avro schema mismatch, schema not found
├── MessagingDestinationException -- topic/queue neexistuje, přístupová práva
└── MessagingTimeoutException -- receive timeout, žádná matching zpráva
```
##### Assert metody (fluent, na přijaté zprávě)
Tyto metody se volají na výsledku `receiveWhere` -- zpráva byla nalezena a nyní se ověřuje její obsah. Assert selhání = `AssertionError` (standardní JUnit pattern).
```java
// --- JSON assert metody (Kafka i IBM MQ JSON) ---
harness.withKafka()
.fromTopic("order-events")
.receiveWhere(msg -> msg.extract("orderId").equals("123"))
.withTimeout(30, TimeUnit.SECONDS)
// Assert na hodnotu pole (dot/bracket notace)
.andAssertFieldValue("status", "CREATED")
.andAssertFieldValue("items[0].sku", "A001")
// Assert na přítomnost/nepřítomnost pole
.andAssertPresent("items[0].qty")
.andAssertNotPresent("deletedField")
// Assert na Kafka header
.andAssertHeaderValue("requestID", "req-001")
// Extrakce hodnoty pro další použití v testu
.extract("internalId").asText(); // -> String pro uložení do harness.store()
// --- XML assert metody (IBM MQ XML) ---
harness.withImqFirstVision()
.fromQueue(ImqFirstVisionQueue.MF_RESPONSES)
.asXml()
.receiveWhere(msg -> msg.extract("/response/accountId").equals("12345"))
.withTimeout(15, TimeUnit.SECONDS)
// XPath assert
.andAssertFieldValue("/response/balance", "50000")
.andAssertFieldValue("/response/currency", "CZK")
.andAssertPresent("/response/timestamp")
.andAssertNotPresent("/response/error");
// --- Raw body assert (EBCDIC/UTF-8) ---
harness.withImqFirstVision()
.fromQueue(ImqFirstVisionQueue.MF_EBCDIC)
.asEbcdic()
.receiveWhere(msg -> msg.getBody().contains("12345"))
.withTimeout(15, TimeUnit.SECONDS)
.andAssertBodyContains("ZŮSTATEK");
// --- Deserializace do Java objektu (mapTo) ---
PaymentNotification notif = harness.withImqFirstVision()
.fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
.receiveWhere(msg -> msg.extract("paymentId").equals("PAY-456"))
.withTimeout(10, TimeUnit.SECONDS)
.mapTo(PaymentNotification.class);
```
Kompletní tabulka assert metod na `MessageResponse`:
- `andAssertFieldValue(String path, String value)` -- JSON dot-path nebo XPath
- `andAssertPresent(String path)` -- ověří existenci pole
- `andAssertNotPresent(String path)` -- ověří neexistenci pole
- `andAssertHeaderValue(String headerName, String value)` -- Kafka header nebo JMS property
- `andAssertBodyContains(String substring)` -- raw body contains (pro EBCDIC/UTF-8/raw text)
- `andAssertWithAssertJ()` -- *(nice to have)* vrací AssertJ `AbstractObjectAssert` pro komplexní fluent assertions .
- `extract(String path)` -- extrahuje hodnotu pro další použití
- `mapTo(Class<T> type)` -- deserializace těla zprávy do Java objektu (Jackson/JAXB)
- `getBody()` -- raw body String
- `getHeader(String name)` -- raw header/property value
### 5. Konfigurace
Konfigurační klíče dodržují konvenci existujících endpointů v Harness: prefix `endpoints.{system-name}.{property}` (viz `endpoints.wso2.gw.url`, `endpoints.udebs.url` atd.).
#### Properties soubory (envs/tst1):
```properties
# Kafka Confluent Cloud (Avro + Schema Registry)
endpoints.kafka.bootstrap-servers=pkc-xxxxx.eu-central-1.aws.confluent.cloud:9092
endpoints.kafka.security-protocol=SASL_SSL
endpoints.kafka.sasl-mechanism=PLAIN
endpoints.kafka.schema-registry-url=https://psrc-xxxxx.eu-central-1.aws.confluent.cloud
endpoints.kafka.value-serializer=avro
# API key + secret a Schema Registry credentials se načítají z Vaultu
# IBM MQ - First Vision (multi-instance QMGR)
endpoints.imq-first-vision.connection-name-list=mq9multitst5x(1414),mq9multitst6x(1414)
endpoints.imq-first-vision.channel=CLIENT.CHANNEL
endpoints.imq-first-vision.queue-manager=MVSW2TST3
endpoints.imq-first-vision.ssl-cipher-suite=TLS_RSA_WITH_AES_256_CBC_SHA256
# user + heslo se načítají z Vaultu
# Logické destinace (musí odpovídat enum ImqFirstVisionQueue)
endpoints.imq-first-vision.payment-notifications.queue=MVSW2TST3.DELIVERY.NOTIFICATION
endpoints.imq-first-vision.payment-request.queue=MVSW2TST3.DELIVERY.REQUEST
endpoints.imq-first-vision.mf-requests.queue=MVSW2TST3.MF.REQUESTS
endpoints.imq-first-vision.mf-responses.queue=MVSW2TST3.MF.RESPONSES
endpoints.imq-first-vision.mf-ebcdic.queue=MVSW2TST3.MF.EBCDIC
endpoints.imq-first-vision.mf-utf8.queue=MVSW2TST3.MF.UTF8
```
#### Vault paths:
```properties
vault.kafka.secrets.path=/kv/autotesty/tst1/kafka
vault.imq-first-vision.secrets.path=/kv/autotesty/tst1/imq-first-vision
```
### 6. Maven závislosti
V [test-harness-master/pom.xml](test-harness-master/pom.xml) přidat:
- `org.apache.kafka:kafka-clients` (Kafka producer/consumer)
- `io.confluent:kafka-avro-serializer` (Avro + Schema Registry)
- `com.ibm.mq:com.ibm.mq.allclient` (IBM MQ JMS klient)
- `jakarta.jms:jakarta.jms-api` (JMS API)
- `org.assertj:assertj-core` (pro `andAssertWithAssertJ()`, nice to have )
### 7. Řešení klíčových problémů
#### Poll cyklus a hledání zprávy v Kafce
```mermaid
flowchart TD
Start["receive(topic, filter, timeout)"] --> CreateConsumer["Vytvořit KafkaConsumer bez group.id"]
CreateConsumer --> Assign["assign() na všechny partitions"]
Assign --> SeekEnd["seekToEnd() pro všechny partitions"]
SeekEnd --> SavePos["Uložit aktuální pozice"]
SavePos --> Poll["poll(pollInterval)"]
Poll --> Check{"Jsou nějaké záznamy?"}
Check -->|Ano| Filter{"Odpovídá filter?"}
Check -->|Ne| Timeout{"Vypršel timeout?"}
Filter -->|Ano| Return["Vrátit ReceivedMessage"]
Filter -->|Ne| Timeout
Timeout -->|Ne| Backoff["Backoff wait"]
Backoff --> Poll
Timeout -->|Ano| Throw["Throw MessagingTimeoutException"]
```
Klíčové body:
- `seekToEnd()` se volá **před** triggerem akce v testu (tester typicky: 1. připraví listener, 2. provede akci, 3. čeká na zprávu)
- Alternativně: seek na pozici uloženou **před** testem
- Polling interval: 100ms, exponential backoff do max 1s
- Default timeout: 30s (konfigurovatelný)
#### Izolace testů (consumer groups)
**Kafka:**
- Consumer **bez `group.id`** s manuálním `assign()` + `seek()`
- Každý příjem zprávy vytvoří nový Consumer, přečte data a zavře ho
- Žádné sdílení offsetů mezi testy ani vlákny
- Alternativa pro komplexní scénáře: unikátní `group.id` per test (UUID)
**IBM MQ:**
- Pro nedestruktivní čtení: `QueueBrowser` (browse bez odstranění zprávy)
- JMS message selector (`JMSCorrelationID = 'xxx'`) pro cílený příjem
- Každý test si filtruje zprávy podle korelačních údajů
### 8. Souhrnný seznam souborů k vytvoření/úpravě
#### Nové soubory v test-harness-master:
- `connectors/messaging/KafkaConnector.java`
- `connectors/messaging/IbmMqConnector.java`
- `endpoints/kafka/KafkaEndpoint.java`
- `endpoints/imq/ImqFirstVisionEndpoint.java`
- `endpoints/imq/ImqFirstVisionQueue.java` (enum: logické názvy front)
- `support/messaging/ReceivedMessage.java`
- `support/messaging/MessageContentType.java` (enum: JSON, XML, RAW_TEXT)
- `support/messaging/MqMessageFormat.java` (enum: JSON, XML, EBCDIC_870, UTF8_1208)
- `support/messaging/MessageResponse.java` (interface: assert, extract, mapTo -- sdílený pro Kafka i IBM MQ)
- `support/messaging/KafkaRequest.java` (fluent builder pro Kafka)
- `support/messaging/ImqRequest.java` (fluent builder pro IBM MQ)
- `messaging/exception/MessagingException.java` (abstract, extends RuntimeException)
- `messaging/exception/MessagingConnectionException.java`
- `messaging/exception/MessagingSchemaException.java`
- `messaging/exception/MessagingDestinationException.java`
- `messaging/exception/MessagingTimeoutException.java`
#### Nové soubory v test-master:
- `dsl/kafka/Kafka.java` (DSL třída pro `withKafka()`)
- `dsl/imq/ImqFirstVision.java` (DSL třída pro `withImqFirstVision()`)
#### Úpravy existujících souborů:
- `test-harness-master/pom.xml` - přidat závislosti Kafka + IBM MQ
- `test-master/pom.xml` - případné závislosti
- `test-master/.../Harness.java` - přidat `withKafka()` a `withImqFirstVision()` metody
- `test-master/src/test/resources/envs/tst1` - přidat `endpoints.kafka.`* a `endpoints.imq-first-vision.`*
- `test-master/src/test/resources/envs/ppe` - přidat `endpoints.kafka.`* a `endpoints.imq-first-vision.`*