57 KiB
name, overview, todos, isProject
| name | overview | todos | isProject | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| Kafka IBM MQ Support | Rozšíření testovacího frameworku Harness o podporu messaging systémů (Kafka na Confluent Cloud a IBM MQ) s kompletní abstrakcí, aby tester nemusel znát detaily jednotlivých protokolů. Změny probíhají ve dvou projektech: test-harness-master (framework) a test-master (DSL + testy). |
|
false |
Rozšíření testovacího frameworku o Kafka a IBM MQ
Současná architektura
Framework je postaven na vrstvené architektuře:
graph TD
Test["Test (@TestCase)"] --> DSL["DSL vrstva (Harness.withXxx())"]
DSL --> EP["Endpoint (implementuje Endpoint)"]
EP --> CON["Connector (implementuje Connector)"]
CON --> SYS["Cílový systém"]
- Connector (connectors/Connector.java): nízkoúrovňová komunikace s cílovým systémem
- Endpoint (endpoints/Endpoint.java): obaluje Connector, čte konfiguraci ze
StoreAccessor, singleton per test class - DSL Builder (v test-master, např. Wso2.java): fluent API pro testy
- Harness (Harness.java): vstupní bod přes
withXxx()metody
Navrhovaná architektura
Kafka a IBM MQ mají oddělené vstupní body v Harness DSL -- analogicky k withWso2(), withUfo() apod.
graph TD
subgraph kafkaPath [Kafka]
TestK["harness.withKafka()"] --> KafkaDSL["Kafka DSL"]
KafkaDSL --> KafkaReq["KafkaRequest builder"]
KafkaReq --> KafkaEP["KafkaEndpoint"]
KafkaEP --> KafkaCon["KafkaConnector"]
KafkaCon --> ConflCloud["Confluent Cloud"]
end
subgraph imqPath [IBM MQ]
TestI["harness.withImqFirstVision()"] --> ImqDSL["ImqFirstVision DSL"]
ImqDSL --> ImqReq["ImqRequest builder"]
ImqReq --> ImqEP["ImqFirstVisionEndpoint"]
ImqEP --> ImqCon["IbmMqConnector"]
ImqCon --> ImqSrv["IBM MQ Server"]
end
KafkaCon -.->|"API key/secret"| Vault["HashiCorp Vault"]
ImqCon -.->|"user + password"| Vault
Třídní diagram
classDiagram
class Connector {
<<interface>>
+close()
}
class Endpoint {
<<interface>>
+canAccess() bool
+close()
}
class KafkaConnector {
-KafkaProducer producer
-CachedSchemaRegistryClient schemaRegistry
+send(topic, key, jsonPayload, headers)
+receive(topic, filter, timeout) List~ReceivedMessage~
+close()
}
class IbmMqConnector {
-JmsConnectionFactory connectionFactory
-JMSContext jmsContext
+send(queue, payload, format, properties)
+receive(queue, selector, format, timeout) ReceivedMessage
+browse(queue, selector, format, maxMessages) List~ReceivedMessage~
+close()
}
class KafkaEndpoint {
-KafkaConnector connector
-StoreAccessor store
}
class ImqFirstVisionEndpoint {
-IbmMqConnector connector
-StoreAccessor store
}
class ReceivedMessage {
+String body
+MessageContentType contentType
+Map headers
+long timestamp
+extract(expression) String
+extractJson(path) JsonNode
+extractXml(xpath) String
}
class MqMessageFormat {
<<enum>>
JSON
XML
EBCDIC_870
UTF8_1208
}
Connector <|.. KafkaConnector
Connector <|.. IbmMqConnector
Endpoint <|.. KafkaEndpoint
Endpoint <|.. ImqFirstVisionEndpoint
KafkaEndpoint --> KafkaConnector
ImqFirstVisionEndpoint --> IbmMqConnector
Detailní návrh tříd
1. Connectors (test-harness-master)
KafkaConnector
Umístění: cz.moneta.test.harness.connectors.messaging.KafkaConnector
- Spravuje
KafkaProducer(lazy init, singleton, thread-safe) aKafkaConsumer(per-call, kvůli izolaci) - SASL/PLAIN autentizace: API key + secret z Vaultu
- SSL context s truststore (stejný pattern jako
BaseRestConnector) - Formát zpráv: Avro -- používá Confluent Schema Registry:
- Volba schématu je řízena názvem topicu: Schema Registry subject =
{topic-name}-value(Confluent TopicNameStrategy, výchozí). Tester nemusí schéma specifikovat -- connector ho získá automaticky z Registry podle topicu. - Producer:
KafkaAvroSerializerserializujeGenericRecord/SpecificRecord - Consumer:
KafkaAvroDeserializerdeserializuje naGenericRecord - Automatická konverze Avro
GenericRecord-> JSON string vReceivedMessage(transparentní pro testera) - Pro odesílání: tester poskytne JSON string, connector stáhne schéma z Registry podle topicu a vytvoří
GenericRecord
- Volba schématu je řízena názvem topicu: Schema Registry subject =
- Vstup pro odeslání zprávy: název topicu (= logická destinace), klíč (String), tělo zprávy (JSON String), headery (Map)
- Podporované Kafka headery -- framework poskytuje typované builder metody pro běžné tracing/correlation headery:
traceparent-- W3C Trace Context propagace (např.00-traceId-spanId-01)requestID-- identifikátor požadavkuactivityID-- identifikátor aktivitysourceCodebookId-- identifikátor zdrojového číselníku- Libovolné další headery přes generický
.withHeader(key, value)
- Izolace testů: Consumer bez consumer group - používá
consumer.assign(partitions)+consumer.seekToEnd()místosubscribe(). Díky tomu:- Žádný consumer group = žádný rebalancing mezi testy
- Každý test si čte od aktuální pozice nezávisle
- Testy neovlivňují offsety ostatních testů
- Poll cyklus: Metoda
receive(topic, Predicate<ReceivedMessage>, Duration timeout):- Vytvoří nový Consumer
assign()na všechny partitions daného topicuseekToEnd()(neboseekna uloženou pozici)- Polling smyčka s exponenciálním backoff (100ms -> 500ms -> 1s)
- Každý record: Avro GenericRecord -> JSON -> test proti Predicate
- Vrátí první matching zprávu, nebo vyhodí
MessagingTimeoutException
public class KafkaConnector implements Connector {
private final Properties baseConfig;
private KafkaProducer<String, GenericRecord> producer;
private final String schemaRegistryUrl;
private final CachedSchemaRegistryClient schemaRegistryClient;
public KafkaConnector(String bootstrapServers, String apiKey,
String apiSecret, String schemaRegistryUrl,
String schemaRegistryApiKey,
String schemaRegistryApiSecret) { ... }
/**
* Odeslání zprávy na topic.
* Schéma se získá automaticky z Schema Registry podle topicu
* (subject = "{topic}-value", TopicNameStrategy).
*
* @param topic název Kafka topicu (= logická destinace)
* @param key klíč zprávy (String)
* @param jsonPayload tělo zprávy jako JSON String
* @param headers Kafka headery (traceparent, requestID, activityID,
* sourceCodebookId, ...)
*/
public void send(String topic, String key, String jsonPayload,
Map<String, String> headers) {
Schema schema = getSchemaForTopic(topic);
GenericRecord record = jsonToAvro(jsonPayload, schema);
ProducerRecord<String, GenericRecord> producerRecord =
new ProducerRecord<>(topic, key, record);
headers.forEach((k, v) ->
producerRecord.headers().add(k, v.getBytes(StandardCharsets.UTF_8)));
producer.send(producerRecord).get();
}
// Příjem: Avro GenericRecord se konvertuje na JSON v ReceivedMessage
public List<ReceivedMessage> receive(String topic,
Predicate<ReceivedMessage> filter,
Duration timeout) { ... }
public Map<TopicPartition, Long> saveOffsets(String topic) { ... }
// Stáhne latest schéma z Registry podle topicu (subject = {topic}-value)
private Schema getSchemaForTopic(String topic) {
String subject = topic + "-value";
SchemaMetadata meta = schemaRegistryClient.getLatestSchemaMetadata(subject);
return new Schema.Parser().parse(meta.getSchema());
}
private String avroToJson(GenericRecord record) { ... }
private GenericRecord jsonToAvro(String json, Schema schema) { ... }
}
IbmMqConnector
Umístění: cz.moneta.test.harness.connectors.messaging.IbmMqConnector
- Používá
com.ibm.mq.allclientJMS klient JmsConnectionFactorypro připojení s user + password- Multi-instance Queue Manager: Podpora connection name list ve formátu
host1(port1),host2(port2)(např.mq9multitst5x(1414),mq9multitst6x(1414)). Nastavuje se přesMQConnectionFactory.setConnectionNameList()místo samostatnýchsetHostName()/setPort(). IBM MQ klient automaticky provádí failover na další instanci při nedostupnosti. - SSL/TLS s keystore a truststore
- Podporované formáty zpráv:
- JSON (výchozí):
JMS TextMessages plain JSON string - XML:
JMS TextMessages XML string; příjem konvertuje XML na JSON vReceivedMessage(přes JacksonXmlMapper) pro jednotné API - UTF-8 / CCSID 1208 (výchozí pro binární):
JMS BytesMessages payload kódovaným v UTF-8 (IBM CCSID 1208). MQMD CCSID = 1208. Vhodné pro systémy vyžadující explicitní UTF-8 BytesMessage. - EBCDIC / CCSID 870:
JMS BytesMessages payload kódovaným v EBCDIC code page 870 (český/slovenský mainframe kódování). MQMD CCSID = 870. Při odesílání se Java String konvertuje nabyte[]přesCharset.forName("IBM870"). Při příjmu sebyte[]dekóduje zpět na Java String.
- JSON (výchozí):
- Logika formátu:
- Formát se volí explicitně ve fluent API (
.asXml(),.asEbcdic(),.asUtf8()); výchozí je JSON - Při příjmu: connector detekuje typ JMS zprávy (
TextMessagevsBytesMessage) a dekóduje odpovídajícím způsobem podle nastaveného formátu
- Formát se volí explicitně ve fluent API (
- Connection pooling: Drží
JMSContextper endpoint instance, recykluje sessions - Izolace testů: Používá
QueueBrowserpro nedestruktivní čtení (browse), nebocreateConsumers message selektorem send(),receive(),browse()metody
public enum MqMessageFormat {
JSON, // TextMessage, plain JSON (UTF-8 default)
XML, // TextMessage, XML string (UTF-8 default)
EBCDIC_870, // BytesMessage, EBCDIC IBM-870 (česky/slovensky)
UTF8_1208 // BytesMessage, UTF-8 IBM CCSID 1208 (výchozí pro binární)
}
public class IbmMqConnector implements Connector {
private static final Charset EBCDIC_870 = Charset.forName("IBM870");
private static final Charset UTF_8 = StandardCharsets.UTF_8;
private final JmsConnectionFactory connectionFactory;
private JMSContext jmsContext;
// connectionNameList: "host1(port1),host2(port2)" pro multi-instance QMGR
public IbmMqConnector(String connectionNameList, String channel,
String queueManager, String user, String password,
String keystorePath, String keystorePassword) {
JmsConnectionFactory cf = ...;
cf.setStringProperty(WMQConstants.WMQ_CONNECTION_NAME_LIST, connectionNameList);
cf.setStringProperty(WMQConstants.WMQ_CHANNEL, channel);
cf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, queueManager);
cf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
// ...
}
// Odeslání zprávy s daným formátem
public void send(String queueName, String payload,
MqMessageFormat format,
Map<String, String> properties) {
switch (format) {
case JSON, XML -> sendTextMessage(queueName, payload, properties);
case EBCDIC_870 -> sendBytesMessage(queueName, payload, EBCDIC_870, 870, properties);
case UTF8_1208 -> sendBytesMessage(queueName, payload, UTF_8, 1208, properties);
}
}
// TextMessage pro JSON a XML (JVM default UTF-8)
private void sendTextMessage(String queueName, String payload,
Map<String, String> properties) { ... }
// BytesMessage s konverzí String -> byte[] v daném kódování + MQMD CCSID
private void sendBytesMessage(String queueName, String payload,
Charset charset, int ccsid,
Map<String, String> properties) {
byte[] bytes = payload.getBytes(charset);
// Vytvoří BytesMessage, nastaví MQMD CCSID, zapíše bytes
}
// Příjem: auto-detekce TextMessage vs BytesMessage
public ReceivedMessage receive(String queueName, String messageSelector,
MqMessageFormat expectedFormat,
Duration timeout) { ... }
/**
* Browse (ne-destruktivní čtení) -- zprávy zůstávají ve frontě.
* Vrací seznam zpráv odpovídajících messageSelector, bez jejich odstranění.
* Použití: inspekce obsahu fronty před/po testu, ověření počtu zpráv.
*/
public List<ReceivedMessage> browse(String queueName, String messageSelector,
MqMessageFormat expectedFormat,
int maxMessages) { ... }
// Dekódování přijaté zprávy podle formátu
private String decodeMessage(Message jmsMessage,
MqMessageFormat format) {
if (jmsMessage instanceof TextMessage txt) {
return txt.getText(); // JSON nebo XML string (UTF-8)
} else if (jmsMessage instanceof BytesMessage bytesMsg) {
byte[] data = new byte[(int) bytesMsg.getBodyLength()];
bytesMsg.readBytes(data);
Charset charset = switch (format) {
case EBCDIC_870 -> EBCDIC_870;
case UTF8_1208 -> UTF_8;
default -> UTF_8;
};
return new String(data, charset);
}
}
}
2. Model třídy (test-harness-master)
ReceivedMessage
Umístění: cz.moneta.test.harness.support.messaging.ReceivedMessage
Body je vždy String normalizovaný do čitelné podoby bez ohledu na zdroj a wire formát:
- Z Kafka: Avro
GenericRecordje automaticky konvertován na JSON vKafkaConnector - Z IBM MQ (JSON): JSON string z
JMS TextMessageje předán přímo - Z IBM MQ (XML): XML string z
JMS TextMessageje předán jako XML;extract()podporuje jak JSON dotpath, tak XPath - Z IBM MQ (EBCDIC):
byte[]zJMS BytesMessageje dekódován z IBM-870 na Java String
public enum MessageContentType {
JSON, XML, RAW_TEXT
}
public class ReceivedMessage {
private final String body; // dekódovaný textový obsah
private final MessageContentType contentType; // JSON, XML, nebo RAW_TEXT
private final Map<String, String> headers; // Kafka headers nebo JMS properties
private final long timestamp;
private final String source; // topic nebo queue name
private final String key; // Kafka message key (null pro IBM MQ)
// JSON navigace (dot/bracket notace: "items[0].sku")
public JsonNode extractJson(String path) { ... }
// XPath navigace (pro XML zprávy: "/response/balance")
public String extractXml(String xpathExpression) { ... }
// Univerzální extract - auto-detekce podle contentType
public String extract(String expression) {
return switch (contentType) {
case JSON -> extractJson(expression).asText();
case XML -> extractXml(expression);
case RAW_TEXT -> body;
};
}
public String getKey() { ... } // Kafka klíč, null pro IBM MQ
public String getHeader(String name) { ... } // Kafka header nebo JMS property
public String getBody() { ... } // surový textový obsah
public long getTimestamp() { ... }
public MessageContentType getContentType() { ... }
/**
* Deserializace těla zprávy do Java objektu.
* Pro JSON: Jackson ObjectMapper.readValue(body, type)
* Pro XML: JAXB Unmarshaller nebo Jackson XmlMapper
*/
public <T> T mapTo(Class<T> type) { ... }
}
3. Endpoints (test-harness-master)
KafkaEndpoint
Umístění: cz.moneta.test.harness.endpoints.kafka.KafkaEndpoint
- Vzor: analogie s
Wso2GatewayEndpoint-- konstruktor přijímáStoreAccessor, čteendpoints.kafka.*konfiguraci - Čte konfiguraci z
StoreAccessor:endpoints.kafka.bootstrap-serversendpoints.kafka.security-protocolendpoints.kafka.sasl-mechanismendpoints.kafka.schema-registry-urlendpoints.kafka.value-serializer
- Credentials z Vaultu (path:
vault.kafka.secrets.path):- Kafka API key + secret
- Schema Registry API key + secret
- Lazy init KafkaConnectoru
public class KafkaEndpoint implements Endpoint {
private final KafkaConnector connector;
private final StoreAccessor store;
public KafkaEndpoint(StoreAccessor store) {
this.store = store;
String bootstrapServers = Optional.ofNullable(store.getConfig("endpoints.kafka.bootstrap-servers"))
.orElseThrow(() -> new IllegalStateException(
"You need to configure endpoints.kafka.bootstrap-servers"));
String schemaRegistryUrl = store.getConfig("endpoints.kafka.schema-registry-url");
// API key/secret z Vaultu
// ...
this.connector = new KafkaConnector(bootstrapServers, apiKey, apiSecret,
schemaRegistryUrl, srApiKey, srApiSecret);
}
public void send(String topic, String key, String jsonPayload,
Map<String, String> headers) {
connector.send(topic, key, jsonPayload, headers);
}
public List<ReceivedMessage> receive(String topic,
Predicate<ReceivedMessage> filter,
Duration timeout) {
return connector.receive(topic, filter, timeout);
}
@Override
public void close() { connector.close(); }
}
ImqFirstVisionEndpoint
Umístění: cz.moneta.test.harness.endpoints.imq.ImqFirstVisionEndpoint
- Vzor: analogie s
Wso2GatewayEndpoint - Čte konfiguraci z
StoreAccessor:endpoints.imq-first-vision.connection-name-list-- formáthost1(port1),host2(port2)pro multi-instance QMGRendpoints.imq-first-vision.channelendpoints.imq-first-vision.queue-managerendpoints.imq-first-vision.ssl-cipher-suite
- User i heslo z Vaultu (path:
vault.imq-first-vision.secrets.path) - Lazy init IbmMqConnectoru
public class ImqFirstVisionEndpoint implements Endpoint {
private final IbmMqConnector connector;
private final StoreAccessor store;
public ImqFirstVisionEndpoint(StoreAccessor store) {
this.store = store;
// connection-name-list: "mq9multitst5x(1414),mq9multitst6x(1414)"
String connectionNameList = Optional.ofNullable(
store.getConfig("endpoints.imq-first-vision.connection-name-list"))
.orElseThrow(() -> new IllegalStateException(
"You need to configure endpoints.imq-first-vision.connection-name-list"));
String channel = store.getConfig("endpoints.imq-first-vision.channel");
String queueManager = store.getConfig("endpoints.imq-first-vision.queue-manager");
// user + password z Vaultu
String user = // z Vaultu (vault.imq-first-vision.secrets.path -> "user")
String password = // z Vaultu (vault.imq-first-vision.secrets.path -> "password")
// ...
this.connector = new IbmMqConnector(connectionNameList, channel,
queueManager, user, password, keystorePath, keystorePassword);
}
public void send(String queueName, String payload,
MqMessageFormat format, Map<String, String> properties) {
connector.send(queueName, payload, format, properties);
}
public ReceivedMessage receive(String queueName, String messageSelector,
MqMessageFormat format, Duration timeout) {
return connector.receive(queueName, messageSelector, format, timeout);
}
public List<ReceivedMessage> browse(String queueName, String messageSelector,
MqMessageFormat format, int maxMessages) {
return connector.browse(queueName, messageSelector, format, maxMessages);
}
// Resolve logického názvu fronty z konfigurace
public String resolveQueue(String logicalName) {
return Optional.ofNullable(
store.getConfig("endpoints.imq-first-vision." + logicalName + ".queue"))
.orElseThrow(() -> new IllegalStateException(
"Queue '" + logicalName + "' is not configured in " +
"endpoints.imq-first-vision." + logicalName + ".queue"));
}
@Override
public void close() { connector.close(); }
}
4. DSL Builder (test-master)
Kafka DSL
Umístění: cz.moneta.test.dsl.kafka.Kafka
Vstupní bod přes Harness:
public Kafka withKafka() {
return new Kafka(this);
}
DSL třída uvnitř získá endpoint přes existující mechanismus:
public class Kafka {
private final Harness harness;
public Kafka(Harness harness) { this.harness = harness; }
public KafkaRequest.TopicPhase prepareRequest() {
KafkaEndpoint endpoint = harness.getEndpoint(KafkaEndpoint.class);
return KafkaRequest.builder(endpoint);
}
}
ImqFirstVision DSL
Umístění: cz.moneta.test.dsl.imq.ImqFirstVision
Vstupní bod přes Harness:
public ImqFirstVision withImqFirstVision() {
return new ImqFirstVision(this);
}
DSL třída uvnitř získá endpoint přes existující mechanismus:
public class ImqFirstVision {
private final Harness harness;
public ImqFirstVision(Harness harness) { this.harness = harness; }
public ImqRequest.QueuePhase prepareRequest() {
ImqFirstVisionEndpoint endpoint = harness.getEndpoint(ImqFirstVisionEndpoint.class);
return ImqRequest.builder(endpoint);
}
}
Registrace endpointů v BaseStoreAccessor
KafkaEndpoint a ImqFirstVisionEndpoint nevyžadují explicitní registraci. Existující mechanismus v BaseStoreAccessor.getEndpoint(Class) je automaticky instanciuje přes reflexi (konstruktor (StoreAccessor)), cache-uje a spravuje lifecycle (zavírá přes close()). Podmínkou je, aby nové endpointy měly veřejný konstruktor public XxxEndpoint(StoreAccessor store) a implementovaly Endpoint interface.
ImqFirstVisionQueue enum
Umístění: cz.moneta.test.harness.endpoints.imq.ImqFirstVisionQueue
Enum definuje logické názvy front. Fyzický název fronty se resolvuje z konfigurace (endpoints.imq-first-vision.{logicalName}.queue). Tester používá enum konstantu -- nemusí znát fyzický název fronty ani konfigurační klíč.
public enum ImqFirstVisionQueue {
PAYMENT_NOTIFICATIONS("payment-notifications"),
PAYMENT_REQUEST("payment-request"),
MF_REQUESTS("mf-requests"),
MF_RESPONSES("mf-responses"),
MF_EBCDIC("mf-ebcdic"),
MF_UTF8("mf-utf8");
private final String configKey;
ImqFirstVisionQueue(String configKey) {
this.configKey = configKey;
}
public String getConfigKey() { return configKey; }
}
Resoluce v ImqFirstVisionEndpoint.resolveQueue():
ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS-> čteendpoints.imq-first-vision.payment-notifications.queue-> vrátí"MVSW2TST3.DELIVERY.NOTIFICATION"
KafkaRequest fluent builder
Umístění: cz.moneta.test.harness.support.messaging.KafkaRequest
ImqRequest fluent builder
Umístění: cz.moneta.test.harness.support.messaging.ImqRequest
Definice fluent API pro realizaci
Následující rozhraní a metody definují kompletní fluent API. DSL třídy (Kafka, ImqFirstVision) delegují na request buildery (KafkaRequest, ImqRequest).
Fáze builderu: Každá metoda vrací rozhraní, které umožňuje další volání až do terminálu (send() nebo MessageResponse).
Legenda k implementaci:
- [EXISTUJE] -- obdobná logika již je v frameworku (RawRestRequest), lze znovu využít nebo adaptovat
- [NOVÉ] -- je potřeba implementovat
KafkaRequest – fázová rozhraní
// Fáze 1: Po withKafka() -- výběr směru (odeslání vs. příjem) -- [NOVÉ]
public interface KafkaSendPhase {
KafkaPayloadPhase toTopic(String topic);
}
public interface KafkaReceivePhase {
KafkaReceiveFilterPhase fromTopic(String topic);
}
// Fáze 2a: Odeslání - payload a headery
// withPayload, withPayloadFromFile, addField, appendToArray, withHeader [EXISTUJE] v RawRestRequest
// withKey, withTraceparent/RequestID/ActivityID/SourceCodebookId, send [NOVÉ]
public interface KafkaPayloadPhase {
KafkaPayloadPhase withKey(String key); // [NOVÉ]
KafkaPayloadPhase withPayload(String json); // [EXISTUJE]
KafkaPayloadPhase withPayloadFromFile(String path); // [EXISTUJE]
KafkaPayloadPhase withPayloadFromTemplate(String path, Map<String, Object> variables); // [EXISTUJE] nice to have
KafkaPayloadPhase addField(String fieldName, Object value);
KafkaPayloadPhase addField(String path, String fieldName, Object value);
KafkaPayloadPhase appendToArray(String path, Object value);
KafkaPayloadPhase withTraceparent(String value); // [NOVÉ]
KafkaPayloadPhase withRequestID(String value); // [NOVÉ]
KafkaPayloadPhase withActivityID(String value); // [NOVÉ]
KafkaPayloadPhase withSourceCodebookId(String value); // [NOVÉ]
KafkaPayloadPhase withHeader(String key, String value); // [EXISTUJE]
void send(); // [NOVÉ]
}
// Fáze 2b: Příjem - filtr a timeout -- [NOVÉ]
public interface KafkaReceiveFilterPhase {
KafkaAwaitingPhase receiveWhere(Predicate<ReceivedMessage> filter);
}
public interface KafkaAwaitingPhase {
MessageResponse withTimeout(long duration, TimeUnit unit);
}
// MessageResponse -- obdobné RawRestRequest.Response
// andAssertFieldValue, andAssertPresent, andAssertNotPresent, extract, mapTo [EXISTUJE]
// andAssertHeaderValue, andAssertBodyContains, andAssertWithAssertJ [NOVÉ]
public interface MessageResponse {
MessageResponse andAssertFieldValue(String path, String value); // [EXISTUJE]
MessageResponse andAssertPresent(String path); // [EXISTUJE]
MessageResponse andAssertNotPresent(String path); // [EXISTUJE]
MessageResponse andAssertHeaderValue(String headerName, String value); // [NOVÉ]
MessageResponse andAssertBodyContains(String substring); // [NOVÉ] primárně IBM MQ
ObjectAssert<?> andAssertWithAssertJ(); // [NOVÉ] nice to have
JsonPathValue extract(String path); // [EXISTUJE]
<T> T mapTo(Class<T> type); // [EXISTUJE] jako post(Class)
ReceivedMessage getMessage();
String getBody();
String getHeader(String name);
}
ImqRequest – fázová rozhraní
// Fáze 1: Po withImqFirstVision() -- [NOVÉ]
public interface ImqSendPhase {
ImqPayloadPhase toQueue(ImqFirstVisionQueue queue);
}
public interface ImqReceivePhase {
ImqReceiveFilterPhase fromQueue(ImqFirstVisionQueue queue);
}
// Fáze 2a: Odeslání - asXml/asEbcdic/asUtf8 [NOVÉ], payload/addField/appendToArray [EXISTUJE]
public interface ImqPayloadPhase {
ImqPayloadPhase asXml(); // [NOVÉ]
ImqPayloadPhase asEbcdic(); // [NOVÉ]
ImqPayloadPhase asUtf8(); // [NOVÉ]
ImqPayloadPhase withPayload(String payload);
ImqPayloadPhase withPayloadFromFile(String path);
ImqPayloadPhase withPayloadFromTemplate(String path, Map<String, Object> variables);
ImqPayloadPhase addField(String fieldName, Object value);
ImqPayloadPhase addField(String path, String fieldName, Object value);
ImqPayloadPhase appendToArray(String path, Object value);
void send();
}
// Fáze 2b: Příjem - withSelector, receiveWhere, browse [NOVÉ]
public interface ImqReceiveFilterPhase {
ImqReceiveFilterPhase withSelector(String jmsMessageSelector); // [NOVÉ]
ImqAwaitingPhase receiveWhere(Predicate<ReceivedMessage> filter);
List<ReceivedMessage> browse(int maxMessages); // [NOVÉ]
}
public interface ImqAwaitingPhase {
MessageResponse withTimeout(long duration, TimeUnit unit);
}
Přehledná tabulka metod
| Kontext | Metoda | Návrat | Poznámka |
|---|---|---|---|
| Harness | withKafka() |
Kafka | [NOVÉ] vstupní bod |
| Harness | withImqFirstVision() |
ImqFirstVision | [NOVÉ] vstupní bod |
| Kafka | toTopic(String) |
KafkaPayloadPhase | [NOVÉ] odeslání |
| Kafka | fromTopic(String) |
KafkaReceiveFilterPhase | [NOVÉ] příjem |
| KafkaPayloadPhase | withKey(String) |
KafkaPayloadPhase | [NOVÉ] Kafka specifické |
| KafkaPayloadPhase | withPayload(String) |
KafkaPayloadPhase | [EXISTUJE] RawRestRequest.withPayload |
| KafkaPayloadPhase | withPayloadFromFile(String) |
KafkaPayloadPhase | [EXISTUJE] RawRestRequest.withPayloadFromFile |
| KafkaPayloadPhase | withPayloadFromTemplate(String, Map) |
KafkaPayloadPhase | [EXISTUJE] RawRestRequest.withPayloadFromTemplate (Template API) -- nice to have |
| KafkaPayloadPhase | addField(...) |
KafkaPayloadPhase | [EXISTUJE] RawRestRequest.addField, JSON pouze |
| KafkaPayloadPhase | appendToArray(...) |
KafkaPayloadPhase | [EXISTUJE] RawRestRequest.appendToArray, JSON pouze |
| KafkaPayloadPhase | withTraceparent/RequestID/ActivityID/SourceCodebookId(String) |
KafkaPayloadPhase | [NOVÉ] Kafka headery |
| KafkaPayloadPhase | withHeader(String, String) |
KafkaPayloadPhase | [EXISTUJE] RawRestRequest.withHeader |
| KafkaPayloadPhase | send() |
void | [NOVÉ] terminál (messaging specifické) |
| KafkaReceiveFilterPhase | receiveWhere(Predicate) |
KafkaAwaitingPhase | [NOVÉ] |
| ImqFirstVision | toQueue(ImqFirstVisionQueue) |
ImqPayloadPhase | [NOVÉ] |
| ImqFirstVision | fromQueue(ImqFirstVisionQueue) |
ImqReceiveFilterPhase | [NOVÉ] |
| ImqPayloadPhase | asXml(), asEbcdic(), asUtf8() |
ImqPayloadPhase | [NOVÉ] IBM MQ formát |
| ImqPayloadPhase | withPayload, withPayloadFromFile, ... |
ImqPayloadPhase | [EXISTUJE] jako Kafka |
| ImqReceiveFilterPhase | withSelector(String) |
ImqReceiveFilterPhase | [NOVÉ] JMS selector |
| KafkaAwaitingPhase | withTimeout(long, TimeUnit) |
MessageResponse | [NOVÉ] |
| ImqReceiveFilterPhase | receiveWhere(Predicate) |
ImqAwaitingPhase | [NOVÉ] |
| ImqAwaitingPhase | withTimeout(long, TimeUnit) |
MessageResponse | [NOVÉ] |
| ImqReceiveFilterPhase | browse(int maxMessages) |
List<ReceivedMessage> | [NOVÉ] terminál, nedestruktivní |
| MessageResponse | andAssertFieldValue(path, value) |
MessageResponse | [EXISTUJE] RawRestRequest.Response.andAssertFieldValue |
| MessageResponse | andAssertPresent(path) |
MessageResponse | [EXISTUJE] RawRestRequest.Response.andAssertPresent |
| MessageResponse | andAssertNotPresent(path) |
MessageResponse | [EXISTUJE] RawRestRequest.Response.andAssertNotPresent |
| MessageResponse | andAssertHeaderValue(name, value) |
MessageResponse | [NOVÉ] Kafka header / JMS property |
| MessageResponse | andAssertBodyContains(substring) |
MessageResponse | [NOVÉ] EBCDIC/UTF-8 |
| MessageResponse | andAssertWithAssertJ() |
ObjectAssert | [NOVÉ] nice to have |
| MessageResponse | extract(path) |
JsonPathValue | [EXISTUJE] RawRestRequest.Response.extract, .asText() |
| MessageResponse | mapTo(Class) |
T | [EXISTUJE] RawRestRequest.post(Class) -- deserializace |
| MessageResponse | getBody(), getHeader(name) |
String | [EXISTUJE] Response přístup k datům |
Poznámka k řetězci receiveWhere: U Kafky receiveWhere(filter) vrací objekt s withTimeout() – timeout je povinný před assert. U IBM MQ může být timeout defaultní nebo explicitní dle implementace.
Vzor podobný RawRestRequest - fluent builder s fázovými interfaces:
// =============================================
// === harness.withKafka() -- Kafka (Avro) =====
// =============================================
// Vstup: topic (schéma se resolví automaticky z Registry), klíč, tělo (JSON), headery
// Odeslání zprávy na Kafka topic se standardními headery
harness.withKafka()
.toTopic("order-events")
.withKey("order-123")
.withPayload("{\"orderId\": \"123\", \"status\": \"CREATED\"}")
.withTraceparent("00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01")
.withRequestID("req-001")
.withActivityID("act-555")
.withSourceCodebookId("CB-01")
.send();
// Libovolné vlastní headery
harness.withKafka()
.toTopic("order-events")
.withKey("order-456")
.withPayload("{\"orderId\": \"456\", \"status\": \"SHIPPED\"}")
.withHeader("customHeader", "customValue")
.send();
// Odeslání z JSON souboru
harness.withKafka()
.toTopic("order-events")
.withKey("order-789")
.withPayloadFromFile("messaging/order_event.json")
.withRequestID("req-002")
.send();
// Příjem zprávy z Kafka s filtrem (Avro -> JSON konverze automaticky)
harness.withKafka()
.fromTopic("order-events")
.receiveWhere(msg -> msg.extract("orderId").equals("123"))
.withTimeout(30, TimeUnit.SECONDS)
.andAssertFieldValue("status", "CREATED");
// =============================================
// === harness.withImqFirstVision() -- IBM MQ ==
// =============================================
// --- JSON (výchozí formát) ---
// Tester používá enum ImqFirstVisionQueue -- fyzický název fronty se resolvuje z konfigurace
// Odeslání JSON zprávy do IBM MQ fronty
harness.withImqFirstVision()
.toQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
.withPayload("{\"paymentId\": \"PAY-456\", \"result\": \"OK\"}")
.send();
// Příjem JSON zprávy z IBM MQ fronty
harness.withImqFirstVision()
.fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
.receiveWhere(msg -> msg.extract("paymentId").equals("PAY-456"))
.withTimeout(10, TimeUnit.SECONDS)
.andAssertFieldValue("result", "OK");
// --- XML ---
// Odeslání XML zprávy
harness.withImqFirstVision()
.toQueue(ImqFirstVisionQueue.MF_REQUESTS)
.asXml()
.withPayload("<request><accountId>12345</accountId><action>BALANCE</action></request>")
.send();
// Odeslání XML ze souboru
harness.withImqFirstVision()
.toQueue(ImqFirstVisionQueue.MF_REQUESTS)
.asXml()
.withPayloadFromFile("messaging/mf_request.xml")
.send();
// Příjem XML zprávy s XPath filtrací
harness.withImqFirstVision()
.fromQueue(ImqFirstVisionQueue.MF_RESPONSES)
.asXml()
.receiveWhere(msg -> msg.extract("/response/accountId").equals("12345"))
.withTimeout(15, TimeUnit.SECONDS)
.andAssertFieldValue("/response/balance", "50000");
// --- UTF-8 / CCSID 1208 (BytesMessage) ---
// Odeslání zprávy v UTF-8 jako BytesMessage (CCSID 1208)
harness.withImqFirstVision()
.toQueue(ImqFirstVisionQueue.MF_UTF8)
.asUtf8()
.withPayload("DATA|12345|ÚČET|CZK")
.send();
// Příjem UTF-8 BytesMessage
harness.withImqFirstVision()
.fromQueue(ImqFirstVisionQueue.MF_UTF8)
.asUtf8()
.receiveWhere(msg -> msg.getBody().contains("12345"))
.withTimeout(15, TimeUnit.SECONDS);
// --- EBCDIC / CCSID 870 ---
// Odeslání zprávy v EBCDIC kódování (mainframe, CZ/SK znaky)
harness.withImqFirstVision()
.toQueue(ImqFirstVisionQueue.MF_EBCDIC)
.asEbcdic()
.withPayload("PŘIKAZ|12345|ZŮSTATEK|CZK")
.send();
// Příjem EBCDIC zprávy (automaticky dekódováno z IBM-870 na String)
harness.withImqFirstVision()
.fromQueue(ImqFirstVisionQueue.MF_EBCDIC)
.asEbcdic()
.receiveWhere(msg -> msg.getBody().contains("12345"))
.withTimeout(15, TimeUnit.SECONDS);
Inkrementální stavba payload pomocí addField / appendToArray
Stejný vzor jako v existující třídě RawRestRequest.Builder (viz addField(), appendToArray() na řádcích 253-342 v RawRestRequest.java). Funguje pro JSON payloady (Kafka i IBM MQ JSON formát). Používá Jackson ObjectMapper pro manipulaci s JSON stromem.
// === Kafka: Sestavení payload přes addField ===
// Začátek s prázdným JSON objektem
harness.withKafka()
.toTopic("order-events")
.withKey("order-123")
.withPayload("{}")
.addField("orderId", "123")
.addField("status", "CREATED")
.addField("items", Arrays.asList(
Map.of("sku", "A001", "qty", 2),
Map.of("sku", "B002", "qty", 1)))
.addField("items[1]", "note", "fragile")
.withRequestID("req-001")
.send();
// Výsledný payload: {"orderId":"123","status":"CREATED",
// "items":[{"sku":"A001","qty":2},{"sku":"B002","qty":1,"note":"fragile"}]}
// Přidání do existujícího payloadu ze souboru
harness.withKafka()
.toTopic("order-events")
.withKey("order-456")
.withPayloadFromFile("messaging/order_event.json")
.addField("metadata", new Object())
.addField("metadata", "source", "test-harness")
.addField("metadata", "timestamp", System.currentTimeMillis())
.send();
// Přidání prvku do existujícího pole
harness.withKafka()
.toTopic("order-events")
.withKey("order-789")
.withPayloadFromFile("messaging/order_base.json")
.appendToArray("items", Map.of("sku", "C003", "qty", 5))
.send();
// === IBM MQ: Sestavení JSON payload přes addField ===
harness.withImqFirstVision()
.toQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
.withPayload("{}")
.addField("paymentId", "PAY-456")
.addField("amount", 15000)
.addField("currency", "CZK")
.addField("beneficiary", new Object())
.addField("beneficiary", "name", "Jan Novák")
.addField("beneficiary", "accountNumber", "1234567890/0100")
.send();
Podporované metody na builder fázi po withPayload() / withPayloadFromFile():
addField(String fieldName, Object value)-- přidá pole do kořene JSONaddField(String path, String fieldName, Object value)-- přidá pole do vnořeného uzlu;pathpoužívá tečkovou notaci a hranatou závorku pro indexy polí (např."items[2].details")appendToArray(String path, Object value)-- přidá prvek na konec existujícího JSON polewithPayloadFromTemplate(String templatePath, Map<String, Object> variables)-- (nice to have) načte šablonu ze souboru a nahradí placeholdery (${varName}) hodnotami z mapy; hodnoty mohou pocházet zharness.store(). Pro složitější payloady bez nutnosti programově sestavovat JSON.
Poznámka: addField a appendToArray fungují pouze pro JSON formát. Pro XML a EBCDIC/UTF8 formáty v IBM MQ se payload sestavuje jako celý String.
Varianty výběru zprávy v receiveWhere
Kafka -- receiveWhere
Kafka receiveWhere přijímá Predicate<ReceivedMessage> -- libovolný Java predikát na deserializovanou zprávu (Avro -> JSON). Zprávy se prohledávají sekvenčně v polling smyčce. Vrátí se první zpráva, která vyhovuje predikátu.
// --- Filtrace podle obsahu JSON pole ---
harness.withKafka()
.fromTopic("order-events")
.receiveWhere(msg -> msg.extract("orderId").equals("123"))
.withTimeout(30, TimeUnit.SECONDS);
// --- Filtrace podle více polí (AND) ---
harness.withKafka()
.fromTopic("order-events")
.receiveWhere(msg ->
msg.extract("orderId").equals("123")
&& msg.extract("status").equals("CREATED"))
.withTimeout(30, TimeUnit.SECONDS);
// --- Filtrace podle Kafka headeru ---
harness.withKafka()
.fromTopic("order-events")
.receiveWhere(msg -> "req-001".equals(msg.getHeader("requestID")))
.withTimeout(30, TimeUnit.SECONDS);
// --- Filtrace podle klíče zprávy ---
harness.withKafka()
.fromTopic("order-events")
.receiveWhere(msg -> "order-123".equals(msg.getKey()))
.withTimeout(30, TimeUnit.SECONDS);
// --- Filtrace kombinací headeru a obsahu ---
harness.withKafka()
.fromTopic("order-events")
.receiveWhere(msg ->
"act-555".equals(msg.getHeader("activityID"))
&& msg.extract("status").equals("SHIPPED"))
.withTimeout(30, TimeUnit.SECONDS);
// --- Příjem první dostupné zprávy (bez filtru) ---
harness.withKafka()
.fromTopic("order-events")
.receiveWhere(msg -> true)
.withTimeout(10, TimeUnit.SECONDS);
// --- Příjem s vlastním timeoutem ---
harness.withKafka()
.fromTopic("order-events")
.receiveWhere(msg -> msg.extract("orderId").equals("123"))
.withTimeout(60, TimeUnit.SECONDS); // delší timeout pro pomalé systémy
ReceivedMessage metody dostupné v predikátu (Kafka):
msg.extract("dotPath")-- hodnota JSON pole (dot/bracket notace)msg.getHeader("headerName")-- hodnota Kafka headerumsg.getKey()-- klíč zprávymsg.getBody()-- celé tělo jako JSON Stringmsg.getTimestamp()-- timestamp zprávy
IBM MQ -- receiveWhere
IBM MQ podporuje dvě úrovně filtrace:
- JMS Message Selector (server-side) -- efektivní filtr na JMS properties/headery, vyhodnocuje se na straně MQ serveru
- Predicate na obsahu (client-side) -- filtr na tělo zprávy po deserializaci, vyhodnocuje se v klientu
// --- Filtrace podle obsahu JSON pole ---
harness.withImqFirstVision()
.fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
.receiveWhere(msg -> msg.extract("paymentId").equals("PAY-456"))
.withTimeout(10, TimeUnit.SECONDS);
// --- Filtrace podle JMS CorrelationID (server-side selector) ---
harness.withImqFirstVision()
.fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
.withSelector("JMSCorrelationID = 'corr-789'")
.receiveWhere(msg -> true)
.withTimeout(10, TimeUnit.SECONDS);
// --- Kombinace JMS selectoru a content filtru ---
harness.withImqFirstVision()
.fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
.withSelector("JMSType = 'payment'")
.receiveWhere(msg -> msg.extract("amount").equals("15000"))
.withTimeout(10, TimeUnit.SECONDS);
// --- XML zprávy s XPath filtrací ---
harness.withImqFirstVision()
.fromQueue(ImqFirstVisionQueue.MF_RESPONSES)
.asXml()
.receiveWhere(msg -> msg.extract("/response/accountId").equals("12345"))
.withTimeout(15, TimeUnit.SECONDS);
// --- EBCDIC / UTF-8 zprávy -- filtrace přes raw body ---
harness.withImqFirstVision()
.fromQueue(ImqFirstVisionQueue.MF_EBCDIC)
.asEbcdic()
.receiveWhere(msg -> msg.getBody().contains("12345"))
.withTimeout(15, TimeUnit.SECONDS);
// --- Příjem první dostupné zprávy ---
harness.withImqFirstVision()
.fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
.receiveWhere(msg -> true)
.withTimeout(10, TimeUnit.SECONDS);
// --- Browse (ne-destruktivní čtení, zprávy zůstávají ve frontě) ---
List<ReceivedMessage> peeked = harness.withImqFirstVision()
.fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
.browse(10); // max 10 zpráv z fronty
// s JMS selektorem: .withSelector("JMSType = 'payment'").browse(10)
ReceivedMessage metody dostupné v predikátu (IBM MQ):
msg.extract("expression")-- hodnota pole (JSON dot-path pro JSON, XPath pro XML)msg.getBody()-- celé tělo zprávy jako Stringmsg.getHeader("jmsProperty")-- hodnota JMS property
Klíčový rozdíl: .withSelector() je server-side filtr (SQL-92 subset, filtruje JMS properties) -- použijte ho pro efektivní filtraci podle korelačních ID. .receiveWhere() je client-side filtr na obsahu zprávy.
Chybové stavy -- výjimky a assert metody
Výjimky (propagovány okamžitě, test failuje)
Tyto chyby signalizují infrastrukturní problém nebo chybu konfigurace -- test nemůže pokračovat:
// --- Chyba připojení (Kafka i IBM MQ) ---
// KafkaConnectionException / JMSException obalené do RuntimeException
// Příčiny: špatný bootstrap-servers, connection-name-list, credentials, síťový problém
try {
harness.withKafka()
.toTopic("order-events")
.withKey("order-123")
.withPayload("{\"orderId\": \"123\"}")
.send();
} catch (MessagingConnectionException e) {
// "Failed to connect to Kafka cluster at pkc-xxxxx:9092: Authentication failed"
// "Failed to connect to IBM MQ: mq9multitst5x(1414),mq9multitst6x(1414) - MQRC 2035 (NOT_AUTHORIZED)"
}
// --- Chyba schématu (Kafka) ---
// Schema Registry vrátí 404 nebo payload neodpovídá schématu
try {
harness.withKafka()
.toTopic("nonexistent-topic")
.withKey("key")
.withPayload("{\"unknownField\": \"value\"}")
.send();
} catch (MessagingSchemaException e) {
// "Schema not found for subject 'nonexistent-topic-value' in Schema Registry"
// "Payload does not conform to Avro schema for topic 'order-events': field 'orderId' is required"
}
// --- Queue neexistuje (IBM MQ) ---
// Výjimka nastane i při chybné konfiguraci logického názvu v properties
try {
harness.withImqFirstVision()
.toQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS) // resolvuje se z konfigurace
.withPayload("{}")
.send();
} catch (MessagingDestinationException e) {
// "Queue 'MVSW2TST3.DELIVERY.NOTIFICATION' not found: MQRC 2085 (UNKNOWN_OBJECT_NAME)"
}
// --- Timeout při příjmu (Kafka i IBM MQ) ---
// Žádná zpráva nevyhověla predikátu v daném čase
try {
harness.withKafka()
.fromTopic("order-events")
.receiveWhere(msg -> msg.extract("orderId").equals("nonexistent"))
.withTimeout(5, TimeUnit.SECONDS);
} catch (MessagingTimeoutException e) {
// "No message matching filter found on topic 'order-events' within 5 seconds"
// "No message matching filter found on queue 'PAYMENT.NOTIFICATIONS' within 10 seconds"
}
// --- Chyba konfigurace (endpoint není nakonfigurován) ---
try {
harness.withKafka()
.toTopic("order-events")
.withKey("key")
.withPayload("{}")
.send();
} catch (IllegalStateException e) {
// "You need to configure endpoints.kafka.bootstrap-servers"
// Stejný vzor jako u Wso2GatewayEndpoint
}
Hierarchie výjimek:
MessagingException (abstract, extends RuntimeException)
├── MessagingConnectionException -- selhání připojení, autentizace
├── MessagingSchemaException -- Avro schema mismatch, schema not found
├── MessagingDestinationException -- topic/queue neexistuje, přístupová práva
└── MessagingTimeoutException -- receive timeout, žádná matching zpráva
Assert metody (fluent, na přijaté zprávě)
Tyto metody se volají na výsledku receiveWhere -- zpráva byla nalezena a nyní se ověřuje její obsah. Assert selhání = AssertionError (standardní JUnit pattern).
// --- JSON assert metody (Kafka i IBM MQ JSON) ---
harness.withKafka()
.fromTopic("order-events")
.receiveWhere(msg -> msg.extract("orderId").equals("123"))
.withTimeout(30, TimeUnit.SECONDS)
// Assert na hodnotu pole (dot/bracket notace)
.andAssertFieldValue("status", "CREATED")
.andAssertFieldValue("items[0].sku", "A001")
// Assert na přítomnost/nepřítomnost pole
.andAssertPresent("items[0].qty")
.andAssertNotPresent("deletedField")
// Assert na Kafka header
.andAssertHeaderValue("requestID", "req-001")
// Extrakce hodnoty pro další použití v testu
.extract("internalId").asText(); // -> String pro uložení do harness.store()
// --- XML assert metody (IBM MQ XML) ---
harness.withImqFirstVision()
.fromQueue(ImqFirstVisionQueue.MF_RESPONSES)
.asXml()
.receiveWhere(msg -> msg.extract("/response/accountId").equals("12345"))
.withTimeout(15, TimeUnit.SECONDS)
// XPath assert
.andAssertFieldValue("/response/balance", "50000")
.andAssertFieldValue("/response/currency", "CZK")
.andAssertPresent("/response/timestamp")
.andAssertNotPresent("/response/error");
// --- Raw body assert (EBCDIC/UTF-8) ---
harness.withImqFirstVision()
.fromQueue(ImqFirstVisionQueue.MF_EBCDIC)
.asEbcdic()
.receiveWhere(msg -> msg.getBody().contains("12345"))
.withTimeout(15, TimeUnit.SECONDS)
.andAssertBodyContains("ZŮSTATEK");
// --- Deserializace do Java objektu (mapTo) ---
PaymentNotification notif = harness.withImqFirstVision()
.fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
.receiveWhere(msg -> msg.extract("paymentId").equals("PAY-456"))
.withTimeout(10, TimeUnit.SECONDS)
.mapTo(PaymentNotification.class);
Kompletní tabulka assert metod na MessageResponse:
andAssertFieldValue(String path, String value)-- JSON dot-path nebo XPathandAssertPresent(String path)-- ověří existenci poleandAssertNotPresent(String path)-- ověří neexistenci poleandAssertHeaderValue(String headerName, String value)-- Kafka header nebo JMS propertyandAssertBodyContains(String substring)-- raw body contains (pro EBCDIC/UTF-8/raw text)andAssertWithAssertJ()-- (nice to have) vrací AssertJAbstractObjectAssertpro komplexní fluent assertions .extract(String path)-- extrahuje hodnotu pro další použitímapTo(Class<T> type)-- deserializace těla zprávy do Java objektu (Jackson/JAXB)getBody()-- raw body StringgetHeader(String name)-- raw header/property value
5. Konfigurace
Konfigurační klíče dodržují konvenci existujících endpointů v Harness: prefix endpoints.{system-name}.{property} (viz endpoints.wso2.gw.url, endpoints.udebs.url atd.).
Properties soubory (envs/tst1):
# Kafka Confluent Cloud (Avro + Schema Registry)
endpoints.kafka.bootstrap-servers=pkc-xxxxx.eu-central-1.aws.confluent.cloud:9092
endpoints.kafka.security-protocol=SASL_SSL
endpoints.kafka.sasl-mechanism=PLAIN
endpoints.kafka.schema-registry-url=https://psrc-xxxxx.eu-central-1.aws.confluent.cloud
endpoints.kafka.value-serializer=avro
# API key + secret a Schema Registry credentials se načítají z Vaultu
# IBM MQ - First Vision (multi-instance QMGR)
endpoints.imq-first-vision.connection-name-list=mq9multitst5x(1414),mq9multitst6x(1414)
endpoints.imq-first-vision.channel=CLIENT.CHANNEL
endpoints.imq-first-vision.queue-manager=MVSW2TST3
endpoints.imq-first-vision.ssl-cipher-suite=TLS_RSA_WITH_AES_256_CBC_SHA256
# user + heslo se načítají z Vaultu
# Logické destinace (musí odpovídat enum ImqFirstVisionQueue)
endpoints.imq-first-vision.payment-notifications.queue=MVSW2TST3.DELIVERY.NOTIFICATION
endpoints.imq-first-vision.payment-request.queue=MVSW2TST3.DELIVERY.REQUEST
endpoints.imq-first-vision.mf-requests.queue=MVSW2TST3.MF.REQUESTS
endpoints.imq-first-vision.mf-responses.queue=MVSW2TST3.MF.RESPONSES
endpoints.imq-first-vision.mf-ebcdic.queue=MVSW2TST3.MF.EBCDIC
endpoints.imq-first-vision.mf-utf8.queue=MVSW2TST3.MF.UTF8
Vault paths:
vault.kafka.secrets.path=/kv/autotesty/tst1/kafka
vault.imq-first-vision.secrets.path=/kv/autotesty/tst1/imq-first-vision
6. Maven závislosti
V test-harness-master/pom.xml přidat:
org.apache.kafka:kafka-clients(Kafka producer/consumer)io.confluent:kafka-avro-serializer(Avro + Schema Registry)com.ibm.mq:com.ibm.mq.allclient(IBM MQ JMS klient)jakarta.jms:jakarta.jms-api(JMS API)org.assertj:assertj-core(proandAssertWithAssertJ(), nice to have )
7. Řešení klíčových problémů
Poll cyklus a hledání zprávy v Kafce
flowchart TD
Start["receive(topic, filter, timeout)"] --> CreateConsumer["Vytvořit KafkaConsumer bez group.id"]
CreateConsumer --> Assign["assign() na všechny partitions"]
Assign --> SeekEnd["seekToEnd() pro všechny partitions"]
SeekEnd --> SavePos["Uložit aktuální pozice"]
SavePos --> Poll["poll(pollInterval)"]
Poll --> Check{"Jsou nějaké záznamy?"}
Check -->|Ano| Filter{"Odpovídá filter?"}
Check -->|Ne| Timeout{"Vypršel timeout?"}
Filter -->|Ano| Return["Vrátit ReceivedMessage"]
Filter -->|Ne| Timeout
Timeout -->|Ne| Backoff["Backoff wait"]
Backoff --> Poll
Timeout -->|Ano| Throw["Throw MessagingTimeoutException"]
Klíčové body:
seekToEnd()se volá před triggerem akce v testu (tester typicky: 1. připraví listener, 2. provede akci, 3. čeká na zprávu)- Alternativně: seek na pozici uloženou před testem
- Polling interval: 100ms, exponential backoff do max 1s
- Default timeout: 30s (konfigurovatelný)
Izolace testů (consumer groups)
Kafka:
- Consumer bez
group.ids manuálnímassign()+seek() - Každý příjem zprávy vytvoří nový Consumer, přečte data a zavře ho
- Žádné sdílení offsetů mezi testy ani vlákny
- Alternativa pro komplexní scénáře: unikátní
group.idper test (UUID)
IBM MQ:
- Pro nedestruktivní čtení:
QueueBrowser(browse bez odstranění zprávy) - JMS message selector (
JMSCorrelationID = 'xxx') pro cílený příjem - Každý test si filtruje zprávy podle korelačních údajů
8. Souhrnný seznam souborů k vytvoření/úpravě
Nové soubory v test-harness-master:
connectors/messaging/KafkaConnector.javaconnectors/messaging/IbmMqConnector.javaendpoints/kafka/KafkaEndpoint.javaendpoints/imq/ImqFirstVisionEndpoint.javaendpoints/imq/ImqFirstVisionQueue.java(enum: logické názvy front)support/messaging/ReceivedMessage.javasupport/messaging/MessageContentType.java(enum: JSON, XML, RAW_TEXT)support/messaging/MqMessageFormat.java(enum: JSON, XML, EBCDIC_870, UTF8_1208)support/messaging/MessageResponse.java(interface: assert, extract, mapTo -- sdílený pro Kafka i IBM MQ)support/messaging/KafkaRequest.java(fluent builder pro Kafka)support/messaging/ImqRequest.java(fluent builder pro IBM MQ)messaging/exception/MessagingException.java(abstract, extends RuntimeException)messaging/exception/MessagingConnectionException.javamessaging/exception/MessagingSchemaException.javamessaging/exception/MessagingDestinationException.javamessaging/exception/MessagingTimeoutException.java
Nové soubory v test-master:
dsl/kafka/Kafka.java(DSL třída prowithKafka())dsl/imq/ImqFirstVision.java(DSL třída prowithImqFirstVision())
Úpravy existujících souborů:
test-harness-master/pom.xml- přidat závislosti Kafka + IBM MQtest-master/pom.xml- případné závislostitest-master/.../Harness.java- přidatwithKafka()awithImqFirstVision()metodytest-master/src/test/resources/envs/tst1- přidatendpoints.kafka.* aendpoints.imq-first-vision.*test-master/src/test/resources/envs/ppe- přidatendpoints.kafka.* aendpoints.imq-first-vision.*