1342 lines
57 KiB
Markdown
1342 lines
57 KiB
Markdown
---
|
||
name: Kafka IBM MQ Support
|
||
overview: "Rozšíření testovacího frameworku Harness o podporu messaging systémů (Kafka na Confluent Cloud a IBM MQ) s kompletní abstrakcí, aby tester nemusel znát detaily jednotlivých protokolů. Změny probíhají ve dvou projektech: test-harness-master (framework) a test-master (DSL + testy)."
|
||
todos:
|
||
- id: maven-deps
|
||
content: Přidat Maven závislosti pro kafka-clients, kafka-avro-serializer, com.ibm.mq.allclient a jakarta.jms-api do pom.xml
|
||
status: pending
|
||
- id: model-classes
|
||
content: "Vytvořit model třídy: ReceivedMessage, MessageContentType, MqMessageFormat, MessageResponse interface"
|
||
status: pending
|
||
- id: exception-classes
|
||
content: "Vytvořit hierarchii výjimek: MessagingException (abstract), MessagingConnectionException, MessagingSchemaException, MessagingDestinationException, MessagingTimeoutException"
|
||
status: pending
|
||
- id: kafka-connector
|
||
content: Implementovat KafkaConnector s SASL/PLAIN auth, Avro+SchemaRegistry, producer pooling, consumer assign/seek (bez consumer group), poll cyklus s Predicate filtrem
|
||
status: pending
|
||
- id: ibmmq-connector
|
||
content: Implementovat IbmMqConnector s JMS klientem, user+password, SSL keystore, connection pooling, podpora JSON/XML/EBCDIC_870/UTF8_1208
|
||
status: pending
|
||
- id: kafka-endpoint
|
||
content: Vytvořit KafkaEndpoint - čte endpoints.kafka.* konfiguraci, credentials z Vaultu, lazy init KafkaConnectoru
|
||
status: pending
|
||
- id: ibmmq-endpoint
|
||
content: Vytvořit ImqFirstVisionEndpoint - čte endpoints.imq-first-vision.* konfiguraci, user + heslo z Vaultu
|
||
status: pending
|
||
- id: kafka-request
|
||
content: Implementovat KafkaRequest fluent builder - withKey, withPayload, withPayloadFromTemplate (nice), withTraceparent/requestID/activityID/sourceCodebookId, send, receiveWhere, andAssertFieldValue, andAssertWithAssertJ (nice), mapTo
|
||
status: pending
|
||
- id: ibmmq-request
|
||
content: Implementovat ImqRequest fluent builder - toQueue, withPayload, withPayloadFromTemplate (nice), asXml/asEbcdic/asUtf8, send, receiveWhere, browse, andAssertFieldValue, andAssertWithAssertJ (nice), mapTo
|
||
status: pending
|
||
- id: kafka-dsl
|
||
content: Vytvořit Kafka DSL třídu a přidat withKafka() do Harness.java
|
||
status: pending
|
||
- id: ibmmq-dsl
|
||
content: Vytvořit ImqFirstVision DSL třídu a přidat withImqFirstVision() do Harness.java
|
||
status: pending
|
||
- id: config
|
||
content: Přidat endpoints.kafka.* a endpoints.imq-first-vision.* konfiguraci do envs/ souborů + vault paths
|
||
status: pending
|
||
isProject: false
|
||
---
|
||
|
||
# Rozšíření testovacího frameworku o Kafka a IBM MQ
|
||
|
||
## Současná architektura
|
||
|
||
Framework je postaven na vrstvené architektuře:
|
||
|
||
```mermaid
|
||
graph TD
|
||
Test["Test (@TestCase)"] --> DSL["DSL vrstva (Harness.withXxx())"]
|
||
DSL --> EP["Endpoint (implementuje Endpoint)"]
|
||
EP --> CON["Connector (implementuje Connector)"]
|
||
CON --> SYS["Cílový systém"]
|
||
```
|
||
|
||
|
||
|
||
- **Connector** ([connectors/Connector.java](test-harness-master/src/main/java/cz/moneta/test/harness/connectors/Connector.java)): nízkoúrovňová komunikace s cílovým systémem
|
||
- **Endpoint** ([endpoints/Endpoint.java](test-harness-master/src/main/java/cz/moneta/test/harness/endpoints/Endpoint.java)): obaluje Connector, čte konfiguraci ze `StoreAccessor`, singleton per test class
|
||
- **DSL Builder** (v test-master, např. [Wso2.java](test-master/src/main/java/cz/moneta/test/dsl/wso2/Wso2.java)): fluent API pro testy
|
||
- **Harness** ([Harness.java](test-master/src/main/java/cz/moneta/test/dsl/Harness.java)): vstupní bod přes `withXxx()` metody
|
||
|
||
## Navrhovaná architektura
|
||
|
||
Kafka a IBM MQ mají oddělené vstupní body v Harness DSL -- analogicky k `withWso2()`, `withUfo()` apod.
|
||
|
||
```mermaid
|
||
graph TD
|
||
subgraph kafkaPath [Kafka]
|
||
TestK["harness.withKafka()"] --> KafkaDSL["Kafka DSL"]
|
||
KafkaDSL --> KafkaReq["KafkaRequest builder"]
|
||
KafkaReq --> KafkaEP["KafkaEndpoint"]
|
||
KafkaEP --> KafkaCon["KafkaConnector"]
|
||
KafkaCon --> ConflCloud["Confluent Cloud"]
|
||
end
|
||
subgraph imqPath [IBM MQ]
|
||
TestI["harness.withImqFirstVision()"] --> ImqDSL["ImqFirstVision DSL"]
|
||
ImqDSL --> ImqReq["ImqRequest builder"]
|
||
ImqReq --> ImqEP["ImqFirstVisionEndpoint"]
|
||
ImqEP --> ImqCon["IbmMqConnector"]
|
||
ImqCon --> ImqSrv["IBM MQ Server"]
|
||
end
|
||
KafkaCon -.->|"API key/secret"| Vault["HashiCorp Vault"]
|
||
ImqCon -.->|"user + password"| Vault
|
||
```
|
||
|
||
|
||
|
||
## Třídní diagram
|
||
|
||
```mermaid
|
||
classDiagram
|
||
class Connector {
|
||
<<interface>>
|
||
+close()
|
||
}
|
||
class Endpoint {
|
||
<<interface>>
|
||
+canAccess() bool
|
||
+close()
|
||
}
|
||
|
||
class KafkaConnector {
|
||
-KafkaProducer producer
|
||
-CachedSchemaRegistryClient schemaRegistry
|
||
+send(topic, key, jsonPayload, headers)
|
||
+receive(topic, filter, timeout) List~ReceivedMessage~
|
||
+close()
|
||
}
|
||
|
||
class IbmMqConnector {
|
||
-JmsConnectionFactory connectionFactory
|
||
-JMSContext jmsContext
|
||
+send(queue, payload, format, properties)
|
||
+receive(queue, selector, format, timeout) ReceivedMessage
|
||
+browse(queue, selector, format, maxMessages) List~ReceivedMessage~
|
||
+close()
|
||
}
|
||
|
||
class KafkaEndpoint {
|
||
-KafkaConnector connector
|
||
-StoreAccessor store
|
||
}
|
||
|
||
class ImqFirstVisionEndpoint {
|
||
-IbmMqConnector connector
|
||
-StoreAccessor store
|
||
}
|
||
|
||
class ReceivedMessage {
|
||
+String body
|
||
+MessageContentType contentType
|
||
+Map headers
|
||
+long timestamp
|
||
+extract(expression) String
|
||
+extractJson(path) JsonNode
|
||
+extractXml(xpath) String
|
||
}
|
||
|
||
class MqMessageFormat {
|
||
<<enum>>
|
||
JSON
|
||
XML
|
||
EBCDIC_870
|
||
UTF8_1208
|
||
}
|
||
|
||
Connector <|.. KafkaConnector
|
||
Connector <|.. IbmMqConnector
|
||
Endpoint <|.. KafkaEndpoint
|
||
Endpoint <|.. ImqFirstVisionEndpoint
|
||
KafkaEndpoint --> KafkaConnector
|
||
ImqFirstVisionEndpoint --> IbmMqConnector
|
||
```
|
||
|
||
|
||
|
||
## Detailní návrh tříd
|
||
|
||
### 1. Connectors (test-harness-master)
|
||
|
||
#### KafkaConnector
|
||
|
||
Umístění: `cz.moneta.test.harness.connectors.messaging.KafkaConnector`
|
||
|
||
- Spravuje `KafkaProducer` (lazy init, singleton, thread-safe) a `KafkaConsumer` (per-call, kvůli izolaci)
|
||
- SASL/PLAIN autentizace: API key + secret z Vaultu
|
||
- SSL context s truststore (stejný pattern jako `BaseRestConnector`)
|
||
- **Formát zpráv: Avro** -- používá Confluent Schema Registry:
|
||
- **Volba schématu je řízena názvem topicu**: Schema Registry subject = `{topic-name}-value` (Confluent TopicNameStrategy, výchozí). Tester nemusí schéma specifikovat -- connector ho získá automaticky z Registry podle topicu.
|
||
- Producer: `KafkaAvroSerializer` serializuje `GenericRecord` / `SpecificRecord`
|
||
- Consumer: `KafkaAvroDeserializer` deserializuje na `GenericRecord`
|
||
- Automatická konverze Avro `GenericRecord` -> JSON string v `ReceivedMessage` (transparentní pro testera)
|
||
- Pro odesílání: tester poskytne JSON string, connector stáhne schéma z Registry podle topicu a vytvoří `GenericRecord`
|
||
- **Vstup pro odeslání zprávy**: název topicu (= logická destinace), klíč (String), tělo zprávy (JSON String), headery (Map)
|
||
- **Podporované Kafka headery** -- framework poskytuje typované builder metody pro běžné tracing/correlation headery:
|
||
- `traceparent` -- W3C Trace Context propagace (např. `00-traceId-spanId-01`)
|
||
- `requestID` -- identifikátor požadavku
|
||
- `activityID` -- identifikátor aktivity
|
||
- `sourceCodebookId` -- identifikátor zdrojového číselníku
|
||
- Libovolné další headery přes generický `.withHeader(key, value)`
|
||
- **Izolace testů**: Consumer bez consumer group - používá `consumer.assign(partitions)` + `consumer.seekToEnd()` místo `subscribe()`. Díky tomu:
|
||
- Žádný consumer group = žádný rebalancing mezi testy
|
||
- Každý test si čte od aktuální pozice nezávisle
|
||
- Testy neovlivňují offsety ostatních testů
|
||
- **Poll cyklus**: Metoda `receive(topic, Predicate<ReceivedMessage>, Duration timeout)`:
|
||
1. Vytvoří nový Consumer
|
||
2. `assign()` na všechny partitions daného topicu
|
||
3. `seekToEnd()` (nebo `seek` na uloženou pozici)
|
||
4. Polling smyčka s exponenciálním backoff (100ms -> 500ms -> 1s)
|
||
5. Každý record: Avro GenericRecord -> JSON -> test proti Predicate
|
||
6. Vrátí první matching zprávu, nebo vyhodí `MessagingTimeoutException`
|
||
|
||
```java
|
||
public class KafkaConnector implements Connector {
|
||
private final Properties baseConfig;
|
||
private KafkaProducer<String, GenericRecord> producer;
|
||
private final String schemaRegistryUrl;
|
||
private final CachedSchemaRegistryClient schemaRegistryClient;
|
||
|
||
public KafkaConnector(String bootstrapServers, String apiKey,
|
||
String apiSecret, String schemaRegistryUrl,
|
||
String schemaRegistryApiKey,
|
||
String schemaRegistryApiSecret) { ... }
|
||
|
||
/**
|
||
* Odeslání zprávy na topic.
|
||
* Schéma se získá automaticky z Schema Registry podle topicu
|
||
* (subject = "{topic}-value", TopicNameStrategy).
|
||
*
|
||
* @param topic název Kafka topicu (= logická destinace)
|
||
* @param key klíč zprávy (String)
|
||
* @param jsonPayload tělo zprávy jako JSON String
|
||
* @param headers Kafka headery (traceparent, requestID, activityID,
|
||
* sourceCodebookId, ...)
|
||
*/
|
||
public void send(String topic, String key, String jsonPayload,
|
||
Map<String, String> headers) {
|
||
Schema schema = getSchemaForTopic(topic);
|
||
GenericRecord record = jsonToAvro(jsonPayload, schema);
|
||
ProducerRecord<String, GenericRecord> producerRecord =
|
||
new ProducerRecord<>(topic, key, record);
|
||
headers.forEach((k, v) ->
|
||
producerRecord.headers().add(k, v.getBytes(StandardCharsets.UTF_8)));
|
||
producer.send(producerRecord).get();
|
||
}
|
||
|
||
// Příjem: Avro GenericRecord se konvertuje na JSON v ReceivedMessage
|
||
public List<ReceivedMessage> receive(String topic,
|
||
Predicate<ReceivedMessage> filter,
|
||
Duration timeout) { ... }
|
||
|
||
public Map<TopicPartition, Long> saveOffsets(String topic) { ... }
|
||
|
||
// Stáhne latest schéma z Registry podle topicu (subject = {topic}-value)
|
||
private Schema getSchemaForTopic(String topic) {
|
||
String subject = topic + "-value";
|
||
SchemaMetadata meta = schemaRegistryClient.getLatestSchemaMetadata(subject);
|
||
return new Schema.Parser().parse(meta.getSchema());
|
||
}
|
||
|
||
private String avroToJson(GenericRecord record) { ... }
|
||
private GenericRecord jsonToAvro(String json, Schema schema) { ... }
|
||
}
|
||
```
|
||
|
||
#### IbmMqConnector
|
||
|
||
Umístění: `cz.moneta.test.harness.connectors.messaging.IbmMqConnector`
|
||
|
||
- Používá `com.ibm.mq.allclient` JMS klient
|
||
- `JmsConnectionFactory` pro připojení s user + password
|
||
- **Multi-instance Queue Manager**: Podpora connection name list ve formátu `host1(port1),host2(port2)` (např. `mq9multitst5x(1414),mq9multitst6x(1414)`). Nastavuje se přes `MQConnectionFactory.setConnectionNameList()` místo samostatných `setHostName()` / `setPort()`. IBM MQ klient automaticky provádí failover na další instanci při nedostupnosti.
|
||
- SSL/TLS s keystore a truststore
|
||
- **Podporované formáty zpráv:**
|
||
- **JSON** (výchozí): `JMS TextMessage` s plain JSON string
|
||
- **XML**: `JMS TextMessage` s XML string; příjem konvertuje XML na JSON v `ReceivedMessage` (přes Jackson `XmlMapper`) pro jednotné API
|
||
- **UTF-8 / CCSID 1208** (výchozí pro binární): `JMS BytesMessage` s payload kódovaným v UTF-8 (IBM CCSID 1208). MQMD CCSID = 1208. Vhodné pro systémy vyžadující explicitní UTF-8 BytesMessage.
|
||
- **EBCDIC / CCSID 870**: `JMS BytesMessage` s payload kódovaným v EBCDIC code page 870 (český/slovenský mainframe kódování). MQMD CCSID = 870. Při odesílání se Java String konvertuje na `byte[]` přes `Charset.forName("IBM870")`. Při příjmu se `byte[]` dekóduje zpět na Java String.
|
||
- **Logika formátu:**
|
||
- Formát se volí explicitně ve fluent API (`.asXml()`, `.asEbcdic()`, `.asUtf8()`); výchozí je JSON
|
||
- Při příjmu: connector detekuje typ JMS zprávy (`TextMessage` vs `BytesMessage`) a dekóduje odpovídajícím způsobem podle nastaveného formátu
|
||
- **Connection pooling**: Drží `JMSContext` per endpoint instance, recykluje sessions
|
||
- **Izolace testů**: Používá `QueueBrowser` pro nedestruktivní čtení (browse), nebo `createConsumer` s message selektorem
|
||
- `send()`, `receive()`, `browse()` metody
|
||
|
||
```java
|
||
public enum MqMessageFormat {
|
||
JSON, // TextMessage, plain JSON (UTF-8 default)
|
||
XML, // TextMessage, XML string (UTF-8 default)
|
||
EBCDIC_870, // BytesMessage, EBCDIC IBM-870 (česky/slovensky)
|
||
UTF8_1208 // BytesMessage, UTF-8 IBM CCSID 1208 (výchozí pro binární)
|
||
}
|
||
|
||
public class IbmMqConnector implements Connector {
|
||
private static final Charset EBCDIC_870 = Charset.forName("IBM870");
|
||
private static final Charset UTF_8 = StandardCharsets.UTF_8;
|
||
|
||
private final JmsConnectionFactory connectionFactory;
|
||
private JMSContext jmsContext;
|
||
|
||
// connectionNameList: "host1(port1),host2(port2)" pro multi-instance QMGR
|
||
public IbmMqConnector(String connectionNameList, String channel,
|
||
String queueManager, String user, String password,
|
||
String keystorePath, String keystorePassword) {
|
||
JmsConnectionFactory cf = ...;
|
||
cf.setStringProperty(WMQConstants.WMQ_CONNECTION_NAME_LIST, connectionNameList);
|
||
cf.setStringProperty(WMQConstants.WMQ_CHANNEL, channel);
|
||
cf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, queueManager);
|
||
cf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
|
||
// ...
|
||
}
|
||
|
||
// Odeslání zprávy s daným formátem
|
||
public void send(String queueName, String payload,
|
||
MqMessageFormat format,
|
||
Map<String, String> properties) {
|
||
switch (format) {
|
||
case JSON, XML -> sendTextMessage(queueName, payload, properties);
|
||
case EBCDIC_870 -> sendBytesMessage(queueName, payload, EBCDIC_870, 870, properties);
|
||
case UTF8_1208 -> sendBytesMessage(queueName, payload, UTF_8, 1208, properties);
|
||
}
|
||
}
|
||
|
||
// TextMessage pro JSON a XML (JVM default UTF-8)
|
||
private void sendTextMessage(String queueName, String payload,
|
||
Map<String, String> properties) { ... }
|
||
|
||
// BytesMessage s konverzí String -> byte[] v daném kódování + MQMD CCSID
|
||
private void sendBytesMessage(String queueName, String payload,
|
||
Charset charset, int ccsid,
|
||
Map<String, String> properties) {
|
||
byte[] bytes = payload.getBytes(charset);
|
||
// Vytvoří BytesMessage, nastaví MQMD CCSID, zapíše bytes
|
||
}
|
||
|
||
// Příjem: auto-detekce TextMessage vs BytesMessage
|
||
public ReceivedMessage receive(String queueName, String messageSelector,
|
||
MqMessageFormat expectedFormat,
|
||
Duration timeout) { ... }
|
||
|
||
/**
|
||
* Browse (ne-destruktivní čtení) -- zprávy zůstávají ve frontě.
|
||
* Vrací seznam zpráv odpovídajících messageSelector, bez jejich odstranění.
|
||
* Použití: inspekce obsahu fronty před/po testu, ověření počtu zpráv.
|
||
*/
|
||
public List<ReceivedMessage> browse(String queueName, String messageSelector,
|
||
MqMessageFormat expectedFormat,
|
||
int maxMessages) { ... }
|
||
|
||
// Dekódování přijaté zprávy podle formátu
|
||
private String decodeMessage(Message jmsMessage,
|
||
MqMessageFormat format) {
|
||
if (jmsMessage instanceof TextMessage txt) {
|
||
return txt.getText(); // JSON nebo XML string (UTF-8)
|
||
} else if (jmsMessage instanceof BytesMessage bytesMsg) {
|
||
byte[] data = new byte[(int) bytesMsg.getBodyLength()];
|
||
bytesMsg.readBytes(data);
|
||
Charset charset = switch (format) {
|
||
case EBCDIC_870 -> EBCDIC_870;
|
||
case UTF8_1208 -> UTF_8;
|
||
default -> UTF_8;
|
||
};
|
||
return new String(data, charset);
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
### 2. Model třídy (test-harness-master)
|
||
|
||
#### ReceivedMessage
|
||
|
||
Umístění: `cz.moneta.test.harness.support.messaging.ReceivedMessage`
|
||
|
||
Body je vždy String normalizovaný do čitelné podoby bez ohledu na zdroj a wire formát:
|
||
|
||
- Z Kafka: Avro `GenericRecord` je automaticky konvertován na JSON v `KafkaConnector`
|
||
- Z IBM MQ (JSON): JSON string z `JMS TextMessage` je předán přímo
|
||
- Z IBM MQ (XML): XML string z `JMS TextMessage` je předán jako XML; `extract()` podporuje jak JSON dotpath, tak XPath
|
||
- Z IBM MQ (EBCDIC): `byte[]` z `JMS BytesMessage` je dekódován z IBM-870 na Java String
|
||
|
||
```java
|
||
public enum MessageContentType {
|
||
JSON, XML, RAW_TEXT
|
||
}
|
||
|
||
public class ReceivedMessage {
|
||
private final String body; // dekódovaný textový obsah
|
||
private final MessageContentType contentType; // JSON, XML, nebo RAW_TEXT
|
||
private final Map<String, String> headers; // Kafka headers nebo JMS properties
|
||
private final long timestamp;
|
||
private final String source; // topic nebo queue name
|
||
private final String key; // Kafka message key (null pro IBM MQ)
|
||
|
||
// JSON navigace (dot/bracket notace: "items[0].sku")
|
||
public JsonNode extractJson(String path) { ... }
|
||
|
||
// XPath navigace (pro XML zprávy: "/response/balance")
|
||
public String extractXml(String xpathExpression) { ... }
|
||
|
||
// Univerzální extract - auto-detekce podle contentType
|
||
public String extract(String expression) {
|
||
return switch (contentType) {
|
||
case JSON -> extractJson(expression).asText();
|
||
case XML -> extractXml(expression);
|
||
case RAW_TEXT -> body;
|
||
};
|
||
}
|
||
|
||
public String getKey() { ... } // Kafka klíč, null pro IBM MQ
|
||
public String getHeader(String name) { ... } // Kafka header nebo JMS property
|
||
public String getBody() { ... } // surový textový obsah
|
||
public long getTimestamp() { ... }
|
||
public MessageContentType getContentType() { ... }
|
||
|
||
/**
|
||
* Deserializace těla zprávy do Java objektu.
|
||
* Pro JSON: Jackson ObjectMapper.readValue(body, type)
|
||
* Pro XML: JAXB Unmarshaller nebo Jackson XmlMapper
|
||
*/
|
||
public <T> T mapTo(Class<T> type) { ... }
|
||
}
|
||
```
|
||
|
||
### 3. Endpoints (test-harness-master)
|
||
|
||
#### KafkaEndpoint
|
||
|
||
Umístění: `cz.moneta.test.harness.endpoints.kafka.KafkaEndpoint`
|
||
|
||
- Vzor: analogie s `Wso2GatewayEndpoint` -- konstruktor přijímá `StoreAccessor`, čte `endpoints.kafka.*` konfiguraci
|
||
- Čte konfiguraci z `StoreAccessor`:
|
||
- `endpoints.kafka.bootstrap-servers`
|
||
- `endpoints.kafka.security-protocol`
|
||
- `endpoints.kafka.sasl-mechanism`
|
||
- `endpoints.kafka.schema-registry-url`
|
||
- `endpoints.kafka.value-serializer`
|
||
- Credentials z Vaultu (path: `vault.kafka.secrets.path`):
|
||
- Kafka API key + secret
|
||
- Schema Registry API key + secret
|
||
- Lazy init KafkaConnectoru
|
||
|
||
```java
|
||
public class KafkaEndpoint implements Endpoint {
|
||
private final KafkaConnector connector;
|
||
private final StoreAccessor store;
|
||
|
||
public KafkaEndpoint(StoreAccessor store) {
|
||
this.store = store;
|
||
String bootstrapServers = Optional.ofNullable(store.getConfig("endpoints.kafka.bootstrap-servers"))
|
||
.orElseThrow(() -> new IllegalStateException(
|
||
"You need to configure endpoints.kafka.bootstrap-servers"));
|
||
String schemaRegistryUrl = store.getConfig("endpoints.kafka.schema-registry-url");
|
||
// API key/secret z Vaultu
|
||
// ...
|
||
this.connector = new KafkaConnector(bootstrapServers, apiKey, apiSecret,
|
||
schemaRegistryUrl, srApiKey, srApiSecret);
|
||
}
|
||
|
||
public void send(String topic, String key, String jsonPayload,
|
||
Map<String, String> headers) {
|
||
connector.send(topic, key, jsonPayload, headers);
|
||
}
|
||
|
||
public List<ReceivedMessage> receive(String topic,
|
||
Predicate<ReceivedMessage> filter,
|
||
Duration timeout) {
|
||
return connector.receive(topic, filter, timeout);
|
||
}
|
||
|
||
@Override
|
||
public void close() { connector.close(); }
|
||
}
|
||
```
|
||
|
||
#### ImqFirstVisionEndpoint
|
||
|
||
Umístění: `cz.moneta.test.harness.endpoints.imq.ImqFirstVisionEndpoint`
|
||
|
||
- Vzor: analogie s `Wso2GatewayEndpoint`
|
||
- Čte konfiguraci z `StoreAccessor`:
|
||
- `endpoints.imq-first-vision.connection-name-list` -- formát `host1(port1),host2(port2)` pro multi-instance QMGR
|
||
- `endpoints.imq-first-vision.channel`
|
||
- `endpoints.imq-first-vision.queue-manager`
|
||
- `endpoints.imq-first-vision.ssl-cipher-suite`
|
||
- User i heslo z Vaultu (path: `vault.imq-first-vision.secrets.path`)
|
||
- Lazy init IbmMqConnectoru
|
||
|
||
```java
|
||
public class ImqFirstVisionEndpoint implements Endpoint {
|
||
private final IbmMqConnector connector;
|
||
private final StoreAccessor store;
|
||
|
||
public ImqFirstVisionEndpoint(StoreAccessor store) {
|
||
this.store = store;
|
||
// connection-name-list: "mq9multitst5x(1414),mq9multitst6x(1414)"
|
||
String connectionNameList = Optional.ofNullable(
|
||
store.getConfig("endpoints.imq-first-vision.connection-name-list"))
|
||
.orElseThrow(() -> new IllegalStateException(
|
||
"You need to configure endpoints.imq-first-vision.connection-name-list"));
|
||
String channel = store.getConfig("endpoints.imq-first-vision.channel");
|
||
String queueManager = store.getConfig("endpoints.imq-first-vision.queue-manager");
|
||
// user + password z Vaultu
|
||
String user = // z Vaultu (vault.imq-first-vision.secrets.path -> "user")
|
||
String password = // z Vaultu (vault.imq-first-vision.secrets.path -> "password")
|
||
// ...
|
||
this.connector = new IbmMqConnector(connectionNameList, channel,
|
||
queueManager, user, password, keystorePath, keystorePassword);
|
||
}
|
||
|
||
public void send(String queueName, String payload,
|
||
MqMessageFormat format, Map<String, String> properties) {
|
||
connector.send(queueName, payload, format, properties);
|
||
}
|
||
|
||
public ReceivedMessage receive(String queueName, String messageSelector,
|
||
MqMessageFormat format, Duration timeout) {
|
||
return connector.receive(queueName, messageSelector, format, timeout);
|
||
}
|
||
|
||
public List<ReceivedMessage> browse(String queueName, String messageSelector,
|
||
MqMessageFormat format, int maxMessages) {
|
||
return connector.browse(queueName, messageSelector, format, maxMessages);
|
||
}
|
||
|
||
// Resolve logického názvu fronty z konfigurace
|
||
public String resolveQueue(String logicalName) {
|
||
return Optional.ofNullable(
|
||
store.getConfig("endpoints.imq-first-vision." + logicalName + ".queue"))
|
||
.orElseThrow(() -> new IllegalStateException(
|
||
"Queue '" + logicalName + "' is not configured in " +
|
||
"endpoints.imq-first-vision." + logicalName + ".queue"));
|
||
}
|
||
|
||
@Override
|
||
public void close() { connector.close(); }
|
||
}
|
||
```
|
||
|
||
### 4. DSL Builder (test-master)
|
||
|
||
#### Kafka DSL
|
||
|
||
Umístění: `cz.moneta.test.dsl.kafka.Kafka`
|
||
|
||
Vstupní bod přes `Harness`:
|
||
|
||
```java
|
||
public Kafka withKafka() {
|
||
return new Kafka(this);
|
||
}
|
||
```
|
||
|
||
DSL třída uvnitř získá endpoint přes existující mechanismus:
|
||
|
||
```java
|
||
public class Kafka {
|
||
private final Harness harness;
|
||
|
||
public Kafka(Harness harness) { this.harness = harness; }
|
||
|
||
public KafkaRequest.TopicPhase prepareRequest() {
|
||
KafkaEndpoint endpoint = harness.getEndpoint(KafkaEndpoint.class);
|
||
return KafkaRequest.builder(endpoint);
|
||
}
|
||
}
|
||
```
|
||
|
||
#### ImqFirstVision DSL
|
||
|
||
Umístění: `cz.moneta.test.dsl.imq.ImqFirstVision`
|
||
|
||
Vstupní bod přes `Harness`:
|
||
|
||
```java
|
||
public ImqFirstVision withImqFirstVision() {
|
||
return new ImqFirstVision(this);
|
||
}
|
||
```
|
||
|
||
DSL třída uvnitř získá endpoint přes existující mechanismus:
|
||
|
||
```java
|
||
public class ImqFirstVision {
|
||
private final Harness harness;
|
||
|
||
public ImqFirstVision(Harness harness) { this.harness = harness; }
|
||
|
||
public ImqRequest.QueuePhase prepareRequest() {
|
||
ImqFirstVisionEndpoint endpoint = harness.getEndpoint(ImqFirstVisionEndpoint.class);
|
||
return ImqRequest.builder(endpoint);
|
||
}
|
||
}
|
||
```
|
||
|
||
#### Registrace endpointů v BaseStoreAccessor
|
||
|
||
`KafkaEndpoint` a `ImqFirstVisionEndpoint` **nevyžadují** explicitní registraci. Existující mechanismus v `BaseStoreAccessor.getEndpoint(Class)` je automaticky instanciuje přes reflexi (konstruktor `(StoreAccessor)`), cache-uje a spravuje lifecycle (zavírá přes `close()`). Podmínkou je, aby nové endpointy měly veřejný konstruktor `public XxxEndpoint(StoreAccessor store)` a implementovaly `Endpoint` interface.
|
||
|
||
#### ImqFirstVisionQueue enum
|
||
|
||
Umístění: `cz.moneta.test.harness.endpoints.imq.ImqFirstVisionQueue`
|
||
|
||
Enum definuje logické názvy front. Fyzický název fronty se resolvuje z konfigurace (`endpoints.imq-first-vision.{logicalName}.queue`). Tester používá enum konstantu -- nemusí znát fyzický název fronty ani konfigurační klíč.
|
||
|
||
```java
|
||
public enum ImqFirstVisionQueue {
|
||
PAYMENT_NOTIFICATIONS("payment-notifications"),
|
||
PAYMENT_REQUEST("payment-request"),
|
||
MF_REQUESTS("mf-requests"),
|
||
MF_RESPONSES("mf-responses"),
|
||
MF_EBCDIC("mf-ebcdic"),
|
||
MF_UTF8("mf-utf8");
|
||
|
||
private final String configKey;
|
||
|
||
ImqFirstVisionQueue(String configKey) {
|
||
this.configKey = configKey;
|
||
}
|
||
|
||
public String getConfigKey() { return configKey; }
|
||
}
|
||
```
|
||
|
||
Resoluce v `ImqFirstVisionEndpoint.resolveQueue()`:
|
||
|
||
- `ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS` -> čte `endpoints.imq-first-vision.payment-notifications.queue` -> vrátí `"MVSW2TST3.DELIVERY.NOTIFICATION"`
|
||
|
||
#### KafkaRequest fluent builder
|
||
|
||
Umístění: `cz.moneta.test.harness.support.messaging.KafkaRequest`
|
||
|
||
#### ImqRequest fluent builder
|
||
|
||
Umístění: `cz.moneta.test.harness.support.messaging.ImqRequest`
|
||
|
||
#### Definice fluent API pro realizaci
|
||
|
||
Následující rozhraní a metody definují kompletní fluent API. DSL třídy (`Kafka`, `ImqFirstVision`) delegují na request buildery (`KafkaRequest`, `ImqRequest`).
|
||
|
||
**Fáze builderu:** Každá metoda vrací rozhraní, které umožňuje další volání až do terminálu (`send()` nebo `MessageResponse`).
|
||
|
||
**Legenda k implementaci:**
|
||
- **[EXISTUJE]** -- obdobná logika již je v frameworku (RawRestRequest), lze znovu využít nebo adaptovat
|
||
- **[NOVÉ]** -- je potřeba implementovat
|
||
|
||
##### KafkaRequest – fázová rozhraní
|
||
|
||
```java
|
||
// Fáze 1: Po withKafka() -- výběr směru (odeslání vs. příjem) -- [NOVÉ]
|
||
public interface KafkaSendPhase {
|
||
KafkaPayloadPhase toTopic(String topic);
|
||
}
|
||
public interface KafkaReceivePhase {
|
||
KafkaReceiveFilterPhase fromTopic(String topic);
|
||
}
|
||
|
||
// Fáze 2a: Odeslání - payload a headery
|
||
// withPayload, withPayloadFromFile, addField, appendToArray, withHeader [EXISTUJE] v RawRestRequest
|
||
// withKey, withTraceparent/RequestID/ActivityID/SourceCodebookId, send [NOVÉ]
|
||
public interface KafkaPayloadPhase {
|
||
KafkaPayloadPhase withKey(String key); // [NOVÉ]
|
||
KafkaPayloadPhase withPayload(String json); // [EXISTUJE]
|
||
KafkaPayloadPhase withPayloadFromFile(String path); // [EXISTUJE]
|
||
KafkaPayloadPhase withPayloadFromTemplate(String path, Map<String, Object> variables); // [EXISTUJE] nice to have
|
||
KafkaPayloadPhase addField(String fieldName, Object value);
|
||
KafkaPayloadPhase addField(String path, String fieldName, Object value);
|
||
KafkaPayloadPhase appendToArray(String path, Object value);
|
||
KafkaPayloadPhase withTraceparent(String value); // [NOVÉ]
|
||
KafkaPayloadPhase withRequestID(String value); // [NOVÉ]
|
||
KafkaPayloadPhase withActivityID(String value); // [NOVÉ]
|
||
KafkaPayloadPhase withSourceCodebookId(String value); // [NOVÉ]
|
||
KafkaPayloadPhase withHeader(String key, String value); // [EXISTUJE]
|
||
void send(); // [NOVÉ]
|
||
}
|
||
|
||
// Fáze 2b: Příjem - filtr a timeout -- [NOVÉ]
|
||
public interface KafkaReceiveFilterPhase {
|
||
KafkaAwaitingPhase receiveWhere(Predicate<ReceivedMessage> filter);
|
||
}
|
||
public interface KafkaAwaitingPhase {
|
||
MessageResponse withTimeout(long duration, TimeUnit unit);
|
||
}
|
||
|
||
// MessageResponse -- obdobné RawRestRequest.Response
|
||
// andAssertFieldValue, andAssertPresent, andAssertNotPresent, extract, mapTo [EXISTUJE]
|
||
// andAssertHeaderValue, andAssertBodyContains, andAssertWithAssertJ [NOVÉ]
|
||
public interface MessageResponse {
|
||
MessageResponse andAssertFieldValue(String path, String value); // [EXISTUJE]
|
||
MessageResponse andAssertPresent(String path); // [EXISTUJE]
|
||
MessageResponse andAssertNotPresent(String path); // [EXISTUJE]
|
||
MessageResponse andAssertHeaderValue(String headerName, String value); // [NOVÉ]
|
||
MessageResponse andAssertBodyContains(String substring); // [NOVÉ] primárně IBM MQ
|
||
ObjectAssert<?> andAssertWithAssertJ(); // [NOVÉ] nice to have
|
||
JsonPathValue extract(String path); // [EXISTUJE]
|
||
<T> T mapTo(Class<T> type); // [EXISTUJE] jako post(Class)
|
||
ReceivedMessage getMessage();
|
||
String getBody();
|
||
String getHeader(String name);
|
||
}
|
||
```
|
||
|
||
##### ImqRequest – fázová rozhraní
|
||
|
||
```java
|
||
// Fáze 1: Po withImqFirstVision() -- [NOVÉ]
|
||
public interface ImqSendPhase {
|
||
ImqPayloadPhase toQueue(ImqFirstVisionQueue queue);
|
||
}
|
||
public interface ImqReceivePhase {
|
||
ImqReceiveFilterPhase fromQueue(ImqFirstVisionQueue queue);
|
||
}
|
||
|
||
// Fáze 2a: Odeslání - asXml/asEbcdic/asUtf8 [NOVÉ], payload/addField/appendToArray [EXISTUJE]
|
||
public interface ImqPayloadPhase {
|
||
ImqPayloadPhase asXml(); // [NOVÉ]
|
||
ImqPayloadPhase asEbcdic(); // [NOVÉ]
|
||
ImqPayloadPhase asUtf8(); // [NOVÉ]
|
||
ImqPayloadPhase withPayload(String payload);
|
||
ImqPayloadPhase withPayloadFromFile(String path);
|
||
ImqPayloadPhase withPayloadFromTemplate(String path, Map<String, Object> variables);
|
||
ImqPayloadPhase addField(String fieldName, Object value);
|
||
ImqPayloadPhase addField(String path, String fieldName, Object value);
|
||
ImqPayloadPhase appendToArray(String path, Object value);
|
||
void send();
|
||
}
|
||
|
||
// Fáze 2b: Příjem - withSelector, receiveWhere, browse [NOVÉ]
|
||
public interface ImqReceiveFilterPhase {
|
||
ImqReceiveFilterPhase withSelector(String jmsMessageSelector); // [NOVÉ]
|
||
ImqAwaitingPhase receiveWhere(Predicate<ReceivedMessage> filter);
|
||
List<ReceivedMessage> browse(int maxMessages); // [NOVÉ]
|
||
}
|
||
public interface ImqAwaitingPhase {
|
||
MessageResponse withTimeout(long duration, TimeUnit unit);
|
||
}
|
||
```
|
||
|
||
##### Přehledná tabulka metod
|
||
|
||
| Kontext | Metoda | Návrat | Poznámka |
|
||
|---------|--------|--------|----------|
|
||
| Harness | `withKafka()` | Kafka | [NOVÉ] vstupní bod |
|
||
| Harness | `withImqFirstVision()` | ImqFirstVision | [NOVÉ] vstupní bod |
|
||
| Kafka | `toTopic(String)` | KafkaPayloadPhase | [NOVÉ] odeslání |
|
||
| Kafka | `fromTopic(String)` | KafkaReceiveFilterPhase | [NOVÉ] příjem |
|
||
| KafkaPayloadPhase | `withKey(String)` | KafkaPayloadPhase | [NOVÉ] Kafka specifické |
|
||
| KafkaPayloadPhase | `withPayload(String)` | KafkaPayloadPhase | [EXISTUJE] RawRestRequest.withPayload |
|
||
| KafkaPayloadPhase | `withPayloadFromFile(String)` | KafkaPayloadPhase | [EXISTUJE] RawRestRequest.withPayloadFromFile |
|
||
| KafkaPayloadPhase | `withPayloadFromTemplate(String, Map)` | KafkaPayloadPhase | [EXISTUJE] RawRestRequest.withPayloadFromTemplate (Template API) -- nice to have |
|
||
| KafkaPayloadPhase | `addField(...)` | KafkaPayloadPhase | [EXISTUJE] RawRestRequest.addField, JSON pouze |
|
||
| KafkaPayloadPhase | `appendToArray(...)` | KafkaPayloadPhase | [EXISTUJE] RawRestRequest.appendToArray, JSON pouze |
|
||
| KafkaPayloadPhase | `withTraceparent/RequestID/ActivityID/SourceCodebookId(String)` | KafkaPayloadPhase | [NOVÉ] Kafka headery |
|
||
| KafkaPayloadPhase | `withHeader(String, String)` | KafkaPayloadPhase | [EXISTUJE] RawRestRequest.withHeader |
|
||
| KafkaPayloadPhase | `send()` | void | [NOVÉ] terminál (messaging specifické) |
|
||
| KafkaReceiveFilterPhase | `receiveWhere(Predicate)` | KafkaAwaitingPhase | [NOVÉ] |
|
||
| ImqFirstVision | `toQueue(ImqFirstVisionQueue)` | ImqPayloadPhase | [NOVÉ] |
|
||
| ImqFirstVision | `fromQueue(ImqFirstVisionQueue)` | ImqReceiveFilterPhase | [NOVÉ] |
|
||
| ImqPayloadPhase | `asXml()`, `asEbcdic()`, `asUtf8()` | ImqPayloadPhase | [NOVÉ] IBM MQ formát |
|
||
| ImqPayloadPhase | `withPayload`, `withPayloadFromFile`, ... | ImqPayloadPhase | [EXISTUJE] jako Kafka |
|
||
| ImqReceiveFilterPhase | `withSelector(String)` | ImqReceiveFilterPhase | [NOVÉ] JMS selector |
|
||
| KafkaAwaitingPhase | `withTimeout(long, TimeUnit)` | MessageResponse | [NOVÉ] |
|
||
| ImqReceiveFilterPhase | `receiveWhere(Predicate)` | ImqAwaitingPhase | [NOVÉ] |
|
||
| ImqAwaitingPhase | `withTimeout(long, TimeUnit)` | MessageResponse | [NOVÉ] |
|
||
| ImqReceiveFilterPhase | `browse(int maxMessages)` | List<ReceivedMessage> | [NOVÉ] terminál, nedestruktivní |
|
||
| MessageResponse | `andAssertFieldValue(path, value)` | MessageResponse | [EXISTUJE] RawRestRequest.Response.andAssertFieldValue |
|
||
| MessageResponse | `andAssertPresent(path)` | MessageResponse | [EXISTUJE] RawRestRequest.Response.andAssertPresent |
|
||
| MessageResponse | `andAssertNotPresent(path)` | MessageResponse | [EXISTUJE] RawRestRequest.Response.andAssertNotPresent |
|
||
| MessageResponse | `andAssertHeaderValue(name, value)` | MessageResponse | [NOVÉ] Kafka header / JMS property |
|
||
| MessageResponse | `andAssertBodyContains(substring)` | MessageResponse | [NOVÉ] EBCDIC/UTF-8 |
|
||
| MessageResponse | `andAssertWithAssertJ()` | ObjectAssert | [NOVÉ] nice to have |
|
||
| MessageResponse | `extract(path)` | JsonPathValue | [EXISTUJE] RawRestRequest.Response.extract, .asText() |
|
||
| MessageResponse | `mapTo(Class)` | T | [EXISTUJE] RawRestRequest.post(Class) -- deserializace |
|
||
| MessageResponse | `getBody()`, `getHeader(name)` | String | [EXISTUJE] Response přístup k datům |
|
||
|
||
**Poznámka k řetězci receiveWhere:** U Kafky `receiveWhere(filter)` vrací objekt s `withTimeout()` – timeout je povinný před assert. U IBM MQ může být timeout defaultní nebo explicitní dle implementace.
|
||
|
||
Vzor podobný `RawRestRequest` - fluent builder s fázovými interfaces:
|
||
|
||
```java
|
||
// =============================================
|
||
// === harness.withKafka() -- Kafka (Avro) =====
|
||
// =============================================
|
||
// Vstup: topic (schéma se resolví automaticky z Registry), klíč, tělo (JSON), headery
|
||
|
||
// Odeslání zprávy na Kafka topic se standardními headery
|
||
harness.withKafka()
|
||
.toTopic("order-events")
|
||
.withKey("order-123")
|
||
.withPayload("{\"orderId\": \"123\", \"status\": \"CREATED\"}")
|
||
.withTraceparent("00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01")
|
||
.withRequestID("req-001")
|
||
.withActivityID("act-555")
|
||
.withSourceCodebookId("CB-01")
|
||
.send();
|
||
|
||
// Libovolné vlastní headery
|
||
harness.withKafka()
|
||
.toTopic("order-events")
|
||
.withKey("order-456")
|
||
.withPayload("{\"orderId\": \"456\", \"status\": \"SHIPPED\"}")
|
||
.withHeader("customHeader", "customValue")
|
||
.send();
|
||
|
||
// Odeslání z JSON souboru
|
||
harness.withKafka()
|
||
.toTopic("order-events")
|
||
.withKey("order-789")
|
||
.withPayloadFromFile("messaging/order_event.json")
|
||
.withRequestID("req-002")
|
||
.send();
|
||
|
||
// Příjem zprávy z Kafka s filtrem (Avro -> JSON konverze automaticky)
|
||
harness.withKafka()
|
||
.fromTopic("order-events")
|
||
.receiveWhere(msg -> msg.extract("orderId").equals("123"))
|
||
.withTimeout(30, TimeUnit.SECONDS)
|
||
.andAssertFieldValue("status", "CREATED");
|
||
|
||
// =============================================
|
||
// === harness.withImqFirstVision() -- IBM MQ ==
|
||
// =============================================
|
||
|
||
// --- JSON (výchozí formát) ---
|
||
// Tester používá enum ImqFirstVisionQueue -- fyzický název fronty se resolvuje z konfigurace
|
||
|
||
// Odeslání JSON zprávy do IBM MQ fronty
|
||
harness.withImqFirstVision()
|
||
.toQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
|
||
.withPayload("{\"paymentId\": \"PAY-456\", \"result\": \"OK\"}")
|
||
.send();
|
||
|
||
// Příjem JSON zprávy z IBM MQ fronty
|
||
harness.withImqFirstVision()
|
||
.fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
|
||
.receiveWhere(msg -> msg.extract("paymentId").equals("PAY-456"))
|
||
.withTimeout(10, TimeUnit.SECONDS)
|
||
.andAssertFieldValue("result", "OK");
|
||
|
||
// --- XML ---
|
||
|
||
// Odeslání XML zprávy
|
||
harness.withImqFirstVision()
|
||
.toQueue(ImqFirstVisionQueue.MF_REQUESTS)
|
||
.asXml()
|
||
.withPayload("<request><accountId>12345</accountId><action>BALANCE</action></request>")
|
||
.send();
|
||
|
||
// Odeslání XML ze souboru
|
||
harness.withImqFirstVision()
|
||
.toQueue(ImqFirstVisionQueue.MF_REQUESTS)
|
||
.asXml()
|
||
.withPayloadFromFile("messaging/mf_request.xml")
|
||
.send();
|
||
|
||
// Příjem XML zprávy s XPath filtrací
|
||
harness.withImqFirstVision()
|
||
.fromQueue(ImqFirstVisionQueue.MF_RESPONSES)
|
||
.asXml()
|
||
.receiveWhere(msg -> msg.extract("/response/accountId").equals("12345"))
|
||
.withTimeout(15, TimeUnit.SECONDS)
|
||
.andAssertFieldValue("/response/balance", "50000");
|
||
|
||
// --- UTF-8 / CCSID 1208 (BytesMessage) ---
|
||
|
||
// Odeslání zprávy v UTF-8 jako BytesMessage (CCSID 1208)
|
||
harness.withImqFirstVision()
|
||
.toQueue(ImqFirstVisionQueue.MF_UTF8)
|
||
.asUtf8()
|
||
.withPayload("DATA|12345|ÚČET|CZK")
|
||
.send();
|
||
|
||
// Příjem UTF-8 BytesMessage
|
||
harness.withImqFirstVision()
|
||
.fromQueue(ImqFirstVisionQueue.MF_UTF8)
|
||
.asUtf8()
|
||
.receiveWhere(msg -> msg.getBody().contains("12345"))
|
||
.withTimeout(15, TimeUnit.SECONDS);
|
||
|
||
// --- EBCDIC / CCSID 870 ---
|
||
|
||
// Odeslání zprávy v EBCDIC kódování (mainframe, CZ/SK znaky)
|
||
harness.withImqFirstVision()
|
||
.toQueue(ImqFirstVisionQueue.MF_EBCDIC)
|
||
.asEbcdic()
|
||
.withPayload("PŘIKAZ|12345|ZŮSTATEK|CZK")
|
||
.send();
|
||
|
||
// Příjem EBCDIC zprávy (automaticky dekódováno z IBM-870 na String)
|
||
harness.withImqFirstVision()
|
||
.fromQueue(ImqFirstVisionQueue.MF_EBCDIC)
|
||
.asEbcdic()
|
||
.receiveWhere(msg -> msg.getBody().contains("12345"))
|
||
.withTimeout(15, TimeUnit.SECONDS);
|
||
```
|
||
|
||
#### Inkrementální stavba payload pomocí addField / appendToArray
|
||
|
||
Stejný vzor jako v existující třídě `RawRestRequest.Builder` (viz `addField()`, `appendToArray()` na řádcích 253-342 v [RawRestRequest.java](test-harness-master/src/main/java/cz/moneta/test/harness/support/rest/RawRestRequest.java)). Funguje pro JSON payloady (Kafka i IBM MQ JSON formát). Používá Jackson `ObjectMapper` pro manipulaci s JSON stromem.
|
||
|
||
```java
|
||
// === Kafka: Sestavení payload přes addField ===
|
||
|
||
// Začátek s prázdným JSON objektem
|
||
harness.withKafka()
|
||
.toTopic("order-events")
|
||
.withKey("order-123")
|
||
.withPayload("{}")
|
||
.addField("orderId", "123")
|
||
.addField("status", "CREATED")
|
||
.addField("items", Arrays.asList(
|
||
Map.of("sku", "A001", "qty", 2),
|
||
Map.of("sku", "B002", "qty", 1)))
|
||
.addField("items[1]", "note", "fragile")
|
||
.withRequestID("req-001")
|
||
.send();
|
||
// Výsledný payload: {"orderId":"123","status":"CREATED",
|
||
// "items":[{"sku":"A001","qty":2},{"sku":"B002","qty":1,"note":"fragile"}]}
|
||
|
||
// Přidání do existujícího payloadu ze souboru
|
||
harness.withKafka()
|
||
.toTopic("order-events")
|
||
.withKey("order-456")
|
||
.withPayloadFromFile("messaging/order_event.json")
|
||
.addField("metadata", new Object())
|
||
.addField("metadata", "source", "test-harness")
|
||
.addField("metadata", "timestamp", System.currentTimeMillis())
|
||
.send();
|
||
|
||
// Přidání prvku do existujícího pole
|
||
harness.withKafka()
|
||
.toTopic("order-events")
|
||
.withKey("order-789")
|
||
.withPayloadFromFile("messaging/order_base.json")
|
||
.appendToArray("items", Map.of("sku", "C003", "qty", 5))
|
||
.send();
|
||
|
||
// === IBM MQ: Sestavení JSON payload přes addField ===
|
||
|
||
harness.withImqFirstVision()
|
||
.toQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
|
||
.withPayload("{}")
|
||
.addField("paymentId", "PAY-456")
|
||
.addField("amount", 15000)
|
||
.addField("currency", "CZK")
|
||
.addField("beneficiary", new Object())
|
||
.addField("beneficiary", "name", "Jan Novák")
|
||
.addField("beneficiary", "accountNumber", "1234567890/0100")
|
||
.send();
|
||
```
|
||
|
||
Podporované metody na builder fázi po `withPayload()` / `withPayloadFromFile()`:
|
||
|
||
- `addField(String fieldName, Object value)` -- přidá pole do kořene JSON
|
||
- `addField(String path, String fieldName, Object value)` -- přidá pole do vnořeného uzlu; `path` používá tečkovou notaci a hranatou závorku pro indexy polí (např. `"items[2].details"`)
|
||
- `appendToArray(String path, Object value)` -- přidá prvek na konec existujícího JSON pole
|
||
- `withPayloadFromTemplate(String templatePath, Map<String, Object> variables)` -- (nice to have) načte šablonu ze souboru a nahradí placeholdery (`${varName}`) hodnotami z mapy; hodnoty mohou pocházet z `harness.store()`. Pro složitější payloady bez nutnosti programově sestavovat JSON.
|
||
|
||
Poznámka: `addField` a `appendToArray` fungují pouze pro JSON formát. Pro XML a EBCDIC/UTF8 formáty v IBM MQ se payload sestavuje jako celý String.
|
||
|
||
#### Varianty výběru zprávy v receiveWhere
|
||
|
||
##### Kafka -- receiveWhere
|
||
|
||
Kafka `receiveWhere` přijímá `Predicate<ReceivedMessage>` -- libovolný Java predikát na deserializovanou zprávu (Avro -> JSON). Zprávy se prohledávají sekvenčně v polling smyčce. Vrátí se **první** zpráva, která vyhovuje predikátu.
|
||
|
||
```java
|
||
// --- Filtrace podle obsahu JSON pole ---
|
||
harness.withKafka()
|
||
.fromTopic("order-events")
|
||
.receiveWhere(msg -> msg.extract("orderId").equals("123"))
|
||
.withTimeout(30, TimeUnit.SECONDS);
|
||
|
||
// --- Filtrace podle více polí (AND) ---
|
||
harness.withKafka()
|
||
.fromTopic("order-events")
|
||
.receiveWhere(msg ->
|
||
msg.extract("orderId").equals("123")
|
||
&& msg.extract("status").equals("CREATED"))
|
||
.withTimeout(30, TimeUnit.SECONDS);
|
||
|
||
// --- Filtrace podle Kafka headeru ---
|
||
harness.withKafka()
|
||
.fromTopic("order-events")
|
||
.receiveWhere(msg -> "req-001".equals(msg.getHeader("requestID")))
|
||
.withTimeout(30, TimeUnit.SECONDS);
|
||
|
||
// --- Filtrace podle klíče zprávy ---
|
||
harness.withKafka()
|
||
.fromTopic("order-events")
|
||
.receiveWhere(msg -> "order-123".equals(msg.getKey()))
|
||
.withTimeout(30, TimeUnit.SECONDS);
|
||
|
||
// --- Filtrace kombinací headeru a obsahu ---
|
||
harness.withKafka()
|
||
.fromTopic("order-events")
|
||
.receiveWhere(msg ->
|
||
"act-555".equals(msg.getHeader("activityID"))
|
||
&& msg.extract("status").equals("SHIPPED"))
|
||
.withTimeout(30, TimeUnit.SECONDS);
|
||
|
||
// --- Příjem první dostupné zprávy (bez filtru) ---
|
||
harness.withKafka()
|
||
.fromTopic("order-events")
|
||
.receiveWhere(msg -> true)
|
||
.withTimeout(10, TimeUnit.SECONDS);
|
||
|
||
// --- Příjem s vlastním timeoutem ---
|
||
harness.withKafka()
|
||
.fromTopic("order-events")
|
||
.receiveWhere(msg -> msg.extract("orderId").equals("123"))
|
||
.withTimeout(60, TimeUnit.SECONDS); // delší timeout pro pomalé systémy
|
||
```
|
||
|
||
ReceivedMessage metody dostupné v predikátu (Kafka):
|
||
|
||
- `msg.extract("dotPath")` -- hodnota JSON pole (dot/bracket notace)
|
||
- `msg.getHeader("headerName")` -- hodnota Kafka headeru
|
||
- `msg.getKey()` -- klíč zprávy
|
||
- `msg.getBody()` -- celé tělo jako JSON String
|
||
- `msg.getTimestamp()` -- timestamp zprávy
|
||
|
||
##### IBM MQ -- receiveWhere
|
||
|
||
IBM MQ podporuje dvě úrovně filtrace:
|
||
|
||
1. **JMS Message Selector** (server-side) -- efektivní filtr na JMS properties/headery, vyhodnocuje se na straně MQ serveru
|
||
2. **Predicate na obsahu** (client-side) -- filtr na tělo zprávy po deserializaci, vyhodnocuje se v klientu
|
||
|
||
```java
|
||
// --- Filtrace podle obsahu JSON pole ---
|
||
harness.withImqFirstVision()
|
||
.fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
|
||
.receiveWhere(msg -> msg.extract("paymentId").equals("PAY-456"))
|
||
.withTimeout(10, TimeUnit.SECONDS);
|
||
|
||
// --- Filtrace podle JMS CorrelationID (server-side selector) ---
|
||
harness.withImqFirstVision()
|
||
.fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
|
||
.withSelector("JMSCorrelationID = 'corr-789'")
|
||
.receiveWhere(msg -> true)
|
||
.withTimeout(10, TimeUnit.SECONDS);
|
||
|
||
// --- Kombinace JMS selectoru a content filtru ---
|
||
harness.withImqFirstVision()
|
||
.fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
|
||
.withSelector("JMSType = 'payment'")
|
||
.receiveWhere(msg -> msg.extract("amount").equals("15000"))
|
||
.withTimeout(10, TimeUnit.SECONDS);
|
||
|
||
// --- XML zprávy s XPath filtrací ---
|
||
harness.withImqFirstVision()
|
||
.fromQueue(ImqFirstVisionQueue.MF_RESPONSES)
|
||
.asXml()
|
||
.receiveWhere(msg -> msg.extract("/response/accountId").equals("12345"))
|
||
.withTimeout(15, TimeUnit.SECONDS);
|
||
|
||
// --- EBCDIC / UTF-8 zprávy -- filtrace přes raw body ---
|
||
harness.withImqFirstVision()
|
||
.fromQueue(ImqFirstVisionQueue.MF_EBCDIC)
|
||
.asEbcdic()
|
||
.receiveWhere(msg -> msg.getBody().contains("12345"))
|
||
.withTimeout(15, TimeUnit.SECONDS);
|
||
|
||
// --- Příjem první dostupné zprávy ---
|
||
harness.withImqFirstVision()
|
||
.fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
|
||
.receiveWhere(msg -> true)
|
||
.withTimeout(10, TimeUnit.SECONDS);
|
||
|
||
// --- Browse (ne-destruktivní čtení, zprávy zůstávají ve frontě) ---
|
||
List<ReceivedMessage> peeked = harness.withImqFirstVision()
|
||
.fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
|
||
.browse(10); // max 10 zpráv z fronty
|
||
// s JMS selektorem: .withSelector("JMSType = 'payment'").browse(10)
|
||
```
|
||
|
||
ReceivedMessage metody dostupné v predikátu (IBM MQ):
|
||
|
||
- `msg.extract("expression")` -- hodnota pole (JSON dot-path pro JSON, XPath pro XML)
|
||
- `msg.getBody()` -- celé tělo zprávy jako String
|
||
- `msg.getHeader("jmsProperty")` -- hodnota JMS property
|
||
|
||
Klíčový rozdíl: `.withSelector()` je server-side filtr (SQL-92 subset, filtruje JMS properties) -- použijte ho pro efektivní filtraci podle korelačních ID. `.receiveWhere()` je client-side filtr na obsahu zprávy.
|
||
|
||
#### Chybové stavy -- výjimky a assert metody
|
||
|
||
##### Výjimky (propagovány okamžitě, test failuje)
|
||
|
||
Tyto chyby signalizují infrastrukturní problém nebo chybu konfigurace -- test nemůže pokračovat:
|
||
|
||
```java
|
||
// --- Chyba připojení (Kafka i IBM MQ) ---
|
||
// KafkaConnectionException / JMSException obalené do RuntimeException
|
||
// Příčiny: špatný bootstrap-servers, connection-name-list, credentials, síťový problém
|
||
try {
|
||
harness.withKafka()
|
||
.toTopic("order-events")
|
||
.withKey("order-123")
|
||
.withPayload("{\"orderId\": \"123\"}")
|
||
.send();
|
||
} catch (MessagingConnectionException e) {
|
||
// "Failed to connect to Kafka cluster at pkc-xxxxx:9092: Authentication failed"
|
||
// "Failed to connect to IBM MQ: mq9multitst5x(1414),mq9multitst6x(1414) - MQRC 2035 (NOT_AUTHORIZED)"
|
||
}
|
||
|
||
// --- Chyba schématu (Kafka) ---
|
||
// Schema Registry vrátí 404 nebo payload neodpovídá schématu
|
||
try {
|
||
harness.withKafka()
|
||
.toTopic("nonexistent-topic")
|
||
.withKey("key")
|
||
.withPayload("{\"unknownField\": \"value\"}")
|
||
.send();
|
||
} catch (MessagingSchemaException e) {
|
||
// "Schema not found for subject 'nonexistent-topic-value' in Schema Registry"
|
||
// "Payload does not conform to Avro schema for topic 'order-events': field 'orderId' is required"
|
||
}
|
||
|
||
// --- Queue neexistuje (IBM MQ) ---
|
||
// Výjimka nastane i při chybné konfiguraci logického názvu v properties
|
||
try {
|
||
harness.withImqFirstVision()
|
||
.toQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS) // resolvuje se z konfigurace
|
||
.withPayload("{}")
|
||
.send();
|
||
} catch (MessagingDestinationException e) {
|
||
// "Queue 'MVSW2TST3.DELIVERY.NOTIFICATION' not found: MQRC 2085 (UNKNOWN_OBJECT_NAME)"
|
||
}
|
||
|
||
// --- Timeout při příjmu (Kafka i IBM MQ) ---
|
||
// Žádná zpráva nevyhověla predikátu v daném čase
|
||
try {
|
||
harness.withKafka()
|
||
.fromTopic("order-events")
|
||
.receiveWhere(msg -> msg.extract("orderId").equals("nonexistent"))
|
||
.withTimeout(5, TimeUnit.SECONDS);
|
||
} catch (MessagingTimeoutException e) {
|
||
// "No message matching filter found on topic 'order-events' within 5 seconds"
|
||
// "No message matching filter found on queue 'PAYMENT.NOTIFICATIONS' within 10 seconds"
|
||
}
|
||
|
||
// --- Chyba konfigurace (endpoint není nakonfigurován) ---
|
||
try {
|
||
harness.withKafka()
|
||
.toTopic("order-events")
|
||
.withKey("key")
|
||
.withPayload("{}")
|
||
.send();
|
||
} catch (IllegalStateException e) {
|
||
// "You need to configure endpoints.kafka.bootstrap-servers"
|
||
// Stejný vzor jako u Wso2GatewayEndpoint
|
||
}
|
||
```
|
||
|
||
Hierarchie výjimek:
|
||
|
||
```
|
||
MessagingException (abstract, extends RuntimeException)
|
||
├── MessagingConnectionException -- selhání připojení, autentizace
|
||
├── MessagingSchemaException -- Avro schema mismatch, schema not found
|
||
├── MessagingDestinationException -- topic/queue neexistuje, přístupová práva
|
||
└── MessagingTimeoutException -- receive timeout, žádná matching zpráva
|
||
```
|
||
|
||
##### Assert metody (fluent, na přijaté zprávě)
|
||
|
||
Tyto metody se volají na výsledku `receiveWhere` -- zpráva byla nalezena a nyní se ověřuje její obsah. Assert selhání = `AssertionError` (standardní JUnit pattern).
|
||
|
||
```java
|
||
// --- JSON assert metody (Kafka i IBM MQ JSON) ---
|
||
|
||
harness.withKafka()
|
||
.fromTopic("order-events")
|
||
.receiveWhere(msg -> msg.extract("orderId").equals("123"))
|
||
.withTimeout(30, TimeUnit.SECONDS)
|
||
// Assert na hodnotu pole (dot/bracket notace)
|
||
.andAssertFieldValue("status", "CREATED")
|
||
.andAssertFieldValue("items[0].sku", "A001")
|
||
// Assert na přítomnost/nepřítomnost pole
|
||
.andAssertPresent("items[0].qty")
|
||
.andAssertNotPresent("deletedField")
|
||
// Assert na Kafka header
|
||
.andAssertHeaderValue("requestID", "req-001")
|
||
// Extrakce hodnoty pro další použití v testu
|
||
.extract("internalId").asText(); // -> String pro uložení do harness.store()
|
||
|
||
// --- XML assert metody (IBM MQ XML) ---
|
||
|
||
harness.withImqFirstVision()
|
||
.fromQueue(ImqFirstVisionQueue.MF_RESPONSES)
|
||
.asXml()
|
||
.receiveWhere(msg -> msg.extract("/response/accountId").equals("12345"))
|
||
.withTimeout(15, TimeUnit.SECONDS)
|
||
// XPath assert
|
||
.andAssertFieldValue("/response/balance", "50000")
|
||
.andAssertFieldValue("/response/currency", "CZK")
|
||
.andAssertPresent("/response/timestamp")
|
||
.andAssertNotPresent("/response/error");
|
||
|
||
// --- Raw body assert (EBCDIC/UTF-8) ---
|
||
|
||
harness.withImqFirstVision()
|
||
.fromQueue(ImqFirstVisionQueue.MF_EBCDIC)
|
||
.asEbcdic()
|
||
.receiveWhere(msg -> msg.getBody().contains("12345"))
|
||
.withTimeout(15, TimeUnit.SECONDS)
|
||
.andAssertBodyContains("ZŮSTATEK");
|
||
|
||
// --- Deserializace do Java objektu (mapTo) ---
|
||
|
||
PaymentNotification notif = harness.withImqFirstVision()
|
||
.fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
|
||
.receiveWhere(msg -> msg.extract("paymentId").equals("PAY-456"))
|
||
.withTimeout(10, TimeUnit.SECONDS)
|
||
.mapTo(PaymentNotification.class);
|
||
```
|
||
|
||
Kompletní tabulka assert metod na `MessageResponse`:
|
||
|
||
- `andAssertFieldValue(String path, String value)` -- JSON dot-path nebo XPath
|
||
- `andAssertPresent(String path)` -- ověří existenci pole
|
||
- `andAssertNotPresent(String path)` -- ověří neexistenci pole
|
||
- `andAssertHeaderValue(String headerName, String value)` -- Kafka header nebo JMS property
|
||
- `andAssertBodyContains(String substring)` -- raw body contains (pro EBCDIC/UTF-8/raw text)
|
||
- `andAssertWithAssertJ()` -- *(nice to have)* vrací AssertJ `AbstractObjectAssert` pro komplexní fluent assertions .
|
||
- `extract(String path)` -- extrahuje hodnotu pro další použití
|
||
- `mapTo(Class<T> type)` -- deserializace těla zprávy do Java objektu (Jackson/JAXB)
|
||
- `getBody()` -- raw body String
|
||
- `getHeader(String name)` -- raw header/property value
|
||
|
||
### 5. Konfigurace
|
||
|
||
Konfigurační klíče dodržují konvenci existujících endpointů v Harness: prefix `endpoints.{system-name}.{property}` (viz `endpoints.wso2.gw.url`, `endpoints.udebs.url` atd.).
|
||
|
||
#### Properties soubory (envs/tst1):
|
||
|
||
```properties
|
||
# Kafka Confluent Cloud (Avro + Schema Registry)
|
||
endpoints.kafka.bootstrap-servers=pkc-xxxxx.eu-central-1.aws.confluent.cloud:9092
|
||
endpoints.kafka.security-protocol=SASL_SSL
|
||
endpoints.kafka.sasl-mechanism=PLAIN
|
||
endpoints.kafka.schema-registry-url=https://psrc-xxxxx.eu-central-1.aws.confluent.cloud
|
||
endpoints.kafka.value-serializer=avro
|
||
# API key + secret a Schema Registry credentials se načítají z Vaultu
|
||
|
||
# IBM MQ - First Vision (multi-instance QMGR)
|
||
endpoints.imq-first-vision.connection-name-list=mq9multitst5x(1414),mq9multitst6x(1414)
|
||
endpoints.imq-first-vision.channel=CLIENT.CHANNEL
|
||
endpoints.imq-first-vision.queue-manager=MVSW2TST3
|
||
endpoints.imq-first-vision.ssl-cipher-suite=TLS_RSA_WITH_AES_256_CBC_SHA256
|
||
# user + heslo se načítají z Vaultu
|
||
|
||
# Logické destinace (musí odpovídat enum ImqFirstVisionQueue)
|
||
endpoints.imq-first-vision.payment-notifications.queue=MVSW2TST3.DELIVERY.NOTIFICATION
|
||
endpoints.imq-first-vision.payment-request.queue=MVSW2TST3.DELIVERY.REQUEST
|
||
endpoints.imq-first-vision.mf-requests.queue=MVSW2TST3.MF.REQUESTS
|
||
endpoints.imq-first-vision.mf-responses.queue=MVSW2TST3.MF.RESPONSES
|
||
endpoints.imq-first-vision.mf-ebcdic.queue=MVSW2TST3.MF.EBCDIC
|
||
endpoints.imq-first-vision.mf-utf8.queue=MVSW2TST3.MF.UTF8
|
||
```
|
||
|
||
#### Vault paths:
|
||
|
||
```properties
|
||
vault.kafka.secrets.path=/kv/autotesty/tst1/kafka
|
||
vault.imq-first-vision.secrets.path=/kv/autotesty/tst1/imq-first-vision
|
||
```
|
||
|
||
### 6. Maven závislosti
|
||
|
||
V [test-harness-master/pom.xml](test-harness-master/pom.xml) přidat:
|
||
|
||
- `org.apache.kafka:kafka-clients` (Kafka producer/consumer)
|
||
- `io.confluent:kafka-avro-serializer` (Avro + Schema Registry)
|
||
- `com.ibm.mq:com.ibm.mq.allclient` (IBM MQ JMS klient)
|
||
- `jakarta.jms:jakarta.jms-api` (JMS API)
|
||
- `org.assertj:assertj-core` (pro `andAssertWithAssertJ()`, nice to have )
|
||
|
||
### 7. Řešení klíčových problémů
|
||
|
||
#### Poll cyklus a hledání zprávy v Kafce
|
||
|
||
```mermaid
|
||
flowchart TD
|
||
Start["receive(topic, filter, timeout)"] --> CreateConsumer["Vytvořit KafkaConsumer bez group.id"]
|
||
CreateConsumer --> Assign["assign() na všechny partitions"]
|
||
Assign --> SeekEnd["seekToEnd() pro všechny partitions"]
|
||
SeekEnd --> SavePos["Uložit aktuální pozice"]
|
||
SavePos --> Poll["poll(pollInterval)"]
|
||
Poll --> Check{"Jsou nějaké záznamy?"}
|
||
Check -->|Ano| Filter{"Odpovídá filter?"}
|
||
Check -->|Ne| Timeout{"Vypršel timeout?"}
|
||
Filter -->|Ano| Return["Vrátit ReceivedMessage"]
|
||
Filter -->|Ne| Timeout
|
||
Timeout -->|Ne| Backoff["Backoff wait"]
|
||
Backoff --> Poll
|
||
Timeout -->|Ano| Throw["Throw MessagingTimeoutException"]
|
||
```
|
||
|
||
|
||
|
||
Klíčové body:
|
||
|
||
- `seekToEnd()` se volá **před** triggerem akce v testu (tester typicky: 1. připraví listener, 2. provede akci, 3. čeká na zprávu)
|
||
- Alternativně: seek na pozici uloženou **před** testem
|
||
- Polling interval: 100ms, exponential backoff do max 1s
|
||
- Default timeout: 30s (konfigurovatelný)
|
||
|
||
#### Izolace testů (consumer groups)
|
||
|
||
**Kafka:**
|
||
|
||
- Consumer **bez `group.id`** s manuálním `assign()` + `seek()`
|
||
- Každý příjem zprávy vytvoří nový Consumer, přečte data a zavře ho
|
||
- Žádné sdílení offsetů mezi testy ani vlákny
|
||
- Alternativa pro komplexní scénáře: unikátní `group.id` per test (UUID)
|
||
|
||
**IBM MQ:**
|
||
|
||
- Pro nedestruktivní čtení: `QueueBrowser` (browse bez odstranění zprávy)
|
||
- JMS message selector (`JMSCorrelationID = 'xxx'`) pro cílený příjem
|
||
- Každý test si filtruje zprávy podle korelačních údajů
|
||
|
||
### 8. Souhrnný seznam souborů k vytvoření/úpravě
|
||
|
||
#### Nové soubory v test-harness-master:
|
||
|
||
- `connectors/messaging/KafkaConnector.java`
|
||
- `connectors/messaging/IbmMqConnector.java`
|
||
- `endpoints/kafka/KafkaEndpoint.java`
|
||
- `endpoints/imq/ImqFirstVisionEndpoint.java`
|
||
- `endpoints/imq/ImqFirstVisionQueue.java` (enum: logické názvy front)
|
||
- `support/messaging/ReceivedMessage.java`
|
||
- `support/messaging/MessageContentType.java` (enum: JSON, XML, RAW_TEXT)
|
||
- `support/messaging/MqMessageFormat.java` (enum: JSON, XML, EBCDIC_870, UTF8_1208)
|
||
- `support/messaging/MessageResponse.java` (interface: assert, extract, mapTo -- sdílený pro Kafka i IBM MQ)
|
||
- `support/messaging/KafkaRequest.java` (fluent builder pro Kafka)
|
||
- `support/messaging/ImqRequest.java` (fluent builder pro IBM MQ)
|
||
- `messaging/exception/MessagingException.java` (abstract, extends RuntimeException)
|
||
- `messaging/exception/MessagingConnectionException.java`
|
||
- `messaging/exception/MessagingSchemaException.java`
|
||
- `messaging/exception/MessagingDestinationException.java`
|
||
- `messaging/exception/MessagingTimeoutException.java`
|
||
|
||
#### Nové soubory v test-master:
|
||
|
||
- `dsl/kafka/Kafka.java` (DSL třída pro `withKafka()`)
|
||
- `dsl/imq/ImqFirstVision.java` (DSL třída pro `withImqFirstVision()`)
|
||
|
||
#### Úpravy existujících souborů:
|
||
|
||
- `test-harness-master/pom.xml` - přidat závislosti Kafka + IBM MQ
|
||
- `test-master/pom.xml` - případné závislosti
|
||
- `test-master/.../Harness.java` - přidat `withKafka()` a `withImqFirstVision()` metody
|
||
- `test-master/src/test/resources/envs/tst1` - přidat `endpoints.kafka.`* a `endpoints.imq-first-vision.`*
|
||
- `test-master/src/test/resources/envs/ppe` - přidat `endpoints.kafka.`* a `endpoints.imq-first-vision.`*
|
||
|