harness/kafka_ibm_mq_support_c8518eaa.plan 2.md
2026-03-16 18:59:21 +01:00

57 KiB
Raw Permalink Blame History

name, overview, todos, isProject
name overview todos isProject
Kafka IBM MQ Support Rozšíření testovacího frameworku Harness o podporu messaging systémů (Kafka na Confluent Cloud a IBM MQ) s kompletní abstrakcí, aby tester nemusel znát detaily jednotlivých protokolů. Změny probíhají ve dvou projektech: test-harness-master (framework) a test-master (DSL + testy).
id content status
maven-deps Přidat Maven závislosti pro kafka-clients, kafka-avro-serializer, com.ibm.mq.allclient a jakarta.jms-api do pom.xml pending
id content status
model-classes Vytvořit model třídy: ReceivedMessage, MessageContentType, MqMessageFormat, MessageResponse interface pending
id content status
exception-classes Vytvořit hierarchii výjimek: MessagingException (abstract), MessagingConnectionException, MessagingSchemaException, MessagingDestinationException, MessagingTimeoutException pending
id content status
kafka-connector Implementovat KafkaConnector s SASL/PLAIN auth, Avro+SchemaRegistry, producer pooling, consumer assign/seek (bez consumer group), poll cyklus s Predicate filtrem pending
id content status
ibmmq-connector Implementovat IbmMqConnector s JMS klientem, user+password, SSL keystore, connection pooling, podpora JSON/XML/EBCDIC_870/UTF8_1208 pending
id content status
kafka-endpoint Vytvořit KafkaEndpoint - čte endpoints.kafka.* konfiguraci, credentials z Vaultu, lazy init KafkaConnectoru pending
id content status
ibmmq-endpoint Vytvořit ImqFirstVisionEndpoint - čte endpoints.imq-first-vision.* konfiguraci, user + heslo z Vaultu pending
id content status
kafka-request Implementovat KafkaRequest fluent builder - withKey, withPayload, withPayloadFromTemplate (nice), withTraceparent/requestID/activityID/sourceCodebookId, send, receiveWhere, andAssertFieldValue, andAssertWithAssertJ (nice), mapTo pending
id content status
ibmmq-request Implementovat ImqRequest fluent builder - toQueue, withPayload, withPayloadFromTemplate (nice), asXml/asEbcdic/asUtf8, send, receiveWhere, browse, andAssertFieldValue, andAssertWithAssertJ (nice), mapTo pending
id content status
kafka-dsl Vytvořit Kafka DSL třídu a přidat withKafka() do Harness.java pending
id content status
ibmmq-dsl Vytvořit ImqFirstVision DSL třídu a přidat withImqFirstVision() do Harness.java pending
id content status
config Přidat endpoints.kafka.* a endpoints.imq-first-vision.* konfiguraci do envs/ souborů + vault paths pending
false

Rozšíření testovacího frameworku o Kafka a IBM MQ

Současná architektura

Framework je postaven na vrstvené architektuře:

graph TD
    Test["Test (@TestCase)"] --> DSL["DSL vrstva (Harness.withXxx())"]
    DSL --> EP["Endpoint (implementuje Endpoint)"]
    EP --> CON["Connector (implementuje Connector)"]
    CON --> SYS["Cílový systém"]

Navrhovaná architektura

Kafka a IBM MQ mají oddělené vstupní body v Harness DSL -- analogicky k withWso2(), withUfo() apod.

graph TD
    subgraph kafkaPath [Kafka]
        TestK["harness.withKafka()"] --> KafkaDSL["Kafka DSL"]
        KafkaDSL --> KafkaReq["KafkaRequest builder"]
        KafkaReq --> KafkaEP["KafkaEndpoint"]
        KafkaEP --> KafkaCon["KafkaConnector"]
        KafkaCon --> ConflCloud["Confluent Cloud"]
    end
    subgraph imqPath [IBM MQ]
        TestI["harness.withImqFirstVision()"] --> ImqDSL["ImqFirstVision DSL"]
        ImqDSL --> ImqReq["ImqRequest builder"]
        ImqReq --> ImqEP["ImqFirstVisionEndpoint"]
        ImqEP --> ImqCon["IbmMqConnector"]
        ImqCon --> ImqSrv["IBM MQ Server"]
    end
    KafkaCon -.->|"API key/secret"| Vault["HashiCorp Vault"]
    ImqCon -.->|"user + password"| Vault

Třídní diagram

classDiagram
    class Connector {
        <<interface>>
        +close()
    }
    class Endpoint {
        <<interface>>
        +canAccess() bool
        +close()
    }

    class KafkaConnector {
        -KafkaProducer producer
        -CachedSchemaRegistryClient schemaRegistry
        +send(topic, key, jsonPayload, headers)
        +receive(topic, filter, timeout) List~ReceivedMessage~
        +close()
    }

    class IbmMqConnector {
        -JmsConnectionFactory connectionFactory
        -JMSContext jmsContext
        +send(queue, payload, format, properties)
        +receive(queue, selector, format, timeout) ReceivedMessage
        +browse(queue, selector, format, maxMessages) List~ReceivedMessage~
        +close()
    }

    class KafkaEndpoint {
        -KafkaConnector connector
        -StoreAccessor store
    }

    class ImqFirstVisionEndpoint {
        -IbmMqConnector connector
        -StoreAccessor store
    }

    class ReceivedMessage {
        +String body
        +MessageContentType contentType
        +Map headers
        +long timestamp
        +extract(expression) String
        +extractJson(path) JsonNode
        +extractXml(xpath) String
    }

    class MqMessageFormat {
        <<enum>>
        JSON
        XML
        EBCDIC_870
        UTF8_1208
    }

    Connector <|.. KafkaConnector
    Connector <|.. IbmMqConnector
    Endpoint <|.. KafkaEndpoint
    Endpoint <|.. ImqFirstVisionEndpoint
    KafkaEndpoint --> KafkaConnector
    ImqFirstVisionEndpoint --> IbmMqConnector

Detailní návrh tříd

1. Connectors (test-harness-master)

KafkaConnector

Umístění: cz.moneta.test.harness.connectors.messaging.KafkaConnector

  • Spravuje KafkaProducer (lazy init, singleton, thread-safe) a KafkaConsumer (per-call, kvůli izolaci)
  • SASL/PLAIN autentizace: API key + secret z Vaultu
  • SSL context s truststore (stejný pattern jako BaseRestConnector)
  • Formát zpráv: Avro -- používá Confluent Schema Registry:
    • Volba schématu je řízena názvem topicu: Schema Registry subject = {topic-name}-value (Confluent TopicNameStrategy, výchozí). Tester nemusí schéma specifikovat -- connector ho získá automaticky z Registry podle topicu.
    • Producer: KafkaAvroSerializer serializuje GenericRecord / SpecificRecord
    • Consumer: KafkaAvroDeserializer deserializuje na GenericRecord
    • Automatická konverze Avro GenericRecord -> JSON string v ReceivedMessage (transparentní pro testera)
    • Pro odesílání: tester poskytne JSON string, connector stáhne schéma z Registry podle topicu a vytvoří GenericRecord
  • Vstup pro odeslání zprávy: název topicu (= logická destinace), klíč (String), tělo zprávy (JSON String), headery (Map)
  • Podporované Kafka headery -- framework poskytuje typované builder metody pro běžné tracing/correlation headery:
    • traceparent -- W3C Trace Context propagace (např. 00-traceId-spanId-01)
    • requestID -- identifikátor požadavku
    • activityID -- identifikátor aktivity
    • sourceCodebookId -- identifikátor zdrojového číselníku
    • Libovolné další headery přes generický .withHeader(key, value)
  • Izolace testů: Consumer bez consumer group - používá consumer.assign(partitions) + consumer.seekToEnd() místo subscribe(). Díky tomu:
    • Žádný consumer group = žádný rebalancing mezi testy
    • Každý test si čte od aktuální pozice nezávisle
    • Testy neovlivňují offsety ostatních testů
  • Poll cyklus: Metoda receive(topic, Predicate<ReceivedMessage>, Duration timeout):
    1. Vytvoří nový Consumer
    2. assign() na všechny partitions daného topicu
    3. seekToEnd() (nebo seek na uloženou pozici)
    4. Polling smyčka s exponenciálním backoff (100ms -> 500ms -> 1s)
    5. Každý record: Avro GenericRecord -> JSON -> test proti Predicate
    6. Vrátí první matching zprávu, nebo vyhodí MessagingTimeoutException
public class KafkaConnector implements Connector {
    private final Properties baseConfig;
    private KafkaProducer<String, GenericRecord> producer;
    private final String schemaRegistryUrl;
    private final CachedSchemaRegistryClient schemaRegistryClient;

    public KafkaConnector(String bootstrapServers, String apiKey,
                          String apiSecret, String schemaRegistryUrl,
                          String schemaRegistryApiKey,
                          String schemaRegistryApiSecret) { ... }

    /**
     * Odeslání zprávy na topic.
     * Schéma se získá automaticky z Schema Registry podle topicu
     * (subject = "{topic}-value", TopicNameStrategy).
     *
     * @param topic       název Kafka topicu (= logická destinace)
     * @param key         klíč zprávy (String)
     * @param jsonPayload tělo zprávy jako JSON String
     * @param headers     Kafka headery (traceparent, requestID, activityID,
     *                    sourceCodebookId, ...)
     */
    public void send(String topic, String key, String jsonPayload,
                     Map<String, String> headers) {
        Schema schema = getSchemaForTopic(topic);
        GenericRecord record = jsonToAvro(jsonPayload, schema);
        ProducerRecord<String, GenericRecord> producerRecord =
            new ProducerRecord<>(topic, key, record);
        headers.forEach((k, v) ->
            producerRecord.headers().add(k, v.getBytes(StandardCharsets.UTF_8)));
        producer.send(producerRecord).get();
    }

    // Příjem: Avro GenericRecord se konvertuje na JSON v ReceivedMessage
    public List<ReceivedMessage> receive(String topic,
                                         Predicate<ReceivedMessage> filter,
                                         Duration timeout) { ... }

    public Map<TopicPartition, Long> saveOffsets(String topic) { ... }

    // Stáhne latest schéma z Registry podle topicu (subject = {topic}-value)
    private Schema getSchemaForTopic(String topic) {
        String subject = topic + "-value";
        SchemaMetadata meta = schemaRegistryClient.getLatestSchemaMetadata(subject);
        return new Schema.Parser().parse(meta.getSchema());
    }

    private String avroToJson(GenericRecord record) { ... }
    private GenericRecord jsonToAvro(String json, Schema schema) { ... }
}

IbmMqConnector

Umístění: cz.moneta.test.harness.connectors.messaging.IbmMqConnector

  • Používá com.ibm.mq.allclient JMS klient
  • JmsConnectionFactory pro připojení s user + password
  • Multi-instance Queue Manager: Podpora connection name list ve formátu host1(port1),host2(port2) (např. mq9multitst5x(1414),mq9multitst6x(1414)). Nastavuje se přes MQConnectionFactory.setConnectionNameList() místo samostatných setHostName() / setPort(). IBM MQ klient automaticky provádí failover na další instanci při nedostupnosti.
  • SSL/TLS s keystore a truststore
  • Podporované formáty zpráv:
    • JSON (výchozí): JMS TextMessage s plain JSON string
    • XML: JMS TextMessage s XML string; příjem konvertuje XML na JSON v ReceivedMessage (přes Jackson XmlMapper) pro jednotné API
    • UTF-8 / CCSID 1208 (výchozí pro binární): JMS BytesMessage s payload kódovaným v UTF-8 (IBM CCSID 1208). MQMD CCSID = 1208. Vhodné pro systémy vyžadující explicitní UTF-8 BytesMessage.
    • EBCDIC / CCSID 870: JMS BytesMessage s payload kódovaným v EBCDIC code page 870 (český/slovenský mainframe kódování). MQMD CCSID = 870. Při odesílání se Java String konvertuje na byte[] přes Charset.forName("IBM870"). Při příjmu se byte[] dekóduje zpět na Java String.
  • Logika formátu:
    • Formát se volí explicitně ve fluent API (.asXml(), .asEbcdic(), .asUtf8()); výchozí je JSON
    • Při příjmu: connector detekuje typ JMS zprávy (TextMessage vs BytesMessage) a dekóduje odpovídajícím způsobem podle nastaveného formátu
  • Connection pooling: Drží JMSContext per endpoint instance, recykluje sessions
  • Izolace testů: Používá QueueBrowser pro nedestruktivní čtení (browse), nebo createConsumer s message selektorem
  • send(), receive(), browse() metody
public enum MqMessageFormat {
    JSON,         // TextMessage, plain JSON (UTF-8 default)
    XML,          // TextMessage, XML string (UTF-8 default)
    EBCDIC_870,   // BytesMessage, EBCDIC IBM-870 (česky/slovensky)
    UTF8_1208     // BytesMessage, UTF-8 IBM CCSID 1208 (výchozí pro binární)
}

public class IbmMqConnector implements Connector {
    private static final Charset EBCDIC_870 = Charset.forName("IBM870");
    private static final Charset UTF_8 = StandardCharsets.UTF_8;

    private final JmsConnectionFactory connectionFactory;
    private JMSContext jmsContext;

    // connectionNameList: "host1(port1),host2(port2)" pro multi-instance QMGR
    public IbmMqConnector(String connectionNameList, String channel,
                          String queueManager, String user, String password,
                          String keystorePath, String keystorePassword) {
        JmsConnectionFactory cf = ...;
        cf.setStringProperty(WMQConstants.WMQ_CONNECTION_NAME_LIST, connectionNameList);
        cf.setStringProperty(WMQConstants.WMQ_CHANNEL, channel);
        cf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, queueManager);
        cf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
        // ...
    }

    // Odeslání zprávy s daným formátem
    public void send(String queueName, String payload,
                     MqMessageFormat format,
                     Map<String, String> properties) {
        switch (format) {
            case JSON, XML     -> sendTextMessage(queueName, payload, properties);
            case EBCDIC_870    -> sendBytesMessage(queueName, payload, EBCDIC_870, 870, properties);
            case UTF8_1208     -> sendBytesMessage(queueName, payload, UTF_8, 1208, properties);
        }
    }

    // TextMessage pro JSON a XML (JVM default UTF-8)
    private void sendTextMessage(String queueName, String payload,
                                  Map<String, String> properties) { ... }

    // BytesMessage s konverzí String -> byte[] v daném kódování + MQMD CCSID
    private void sendBytesMessage(String queueName, String payload,
                                   Charset charset, int ccsid,
                                   Map<String, String> properties) {
        byte[] bytes = payload.getBytes(charset);
        // Vytvoří BytesMessage, nastaví MQMD CCSID, zapíše bytes
    }

    // Příjem: auto-detekce TextMessage vs BytesMessage
    public ReceivedMessage receive(String queueName, String messageSelector,
                                    MqMessageFormat expectedFormat,
                                    Duration timeout) { ... }

    /**
     * Browse (ne-destruktivní čtení) -- zprávy zůstávají ve frontě.
     * Vrací seznam zpráv odpovídajících messageSelector, bez jejich odstranění.
     * Použití: inspekce obsahu fronty před/po testu, ověření počtu zpráv.
     */
    public List<ReceivedMessage> browse(String queueName, String messageSelector,
                                        MqMessageFormat expectedFormat,
                                        int maxMessages) { ... }

    // Dekódování přijaté zprávy podle formátu
    private String decodeMessage(Message jmsMessage,
                                  MqMessageFormat format) {
        if (jmsMessage instanceof TextMessage txt) {
            return txt.getText(); // JSON nebo XML string (UTF-8)
        } else if (jmsMessage instanceof BytesMessage bytesMsg) {
            byte[] data = new byte[(int) bytesMsg.getBodyLength()];
            bytesMsg.readBytes(data);
            Charset charset = switch (format) {
                case EBCDIC_870 -> EBCDIC_870;
                case UTF8_1208  -> UTF_8;
                default         -> UTF_8;
            };
            return new String(data, charset);
        }
    }
}

2. Model třídy (test-harness-master)

ReceivedMessage

Umístění: cz.moneta.test.harness.support.messaging.ReceivedMessage

Body je vždy String normalizovaný do čitelné podoby bez ohledu na zdroj a wire formát:

  • Z Kafka: Avro GenericRecord je automaticky konvertován na JSON v KafkaConnector
  • Z IBM MQ (JSON): JSON string z JMS TextMessage je předán přímo
  • Z IBM MQ (XML): XML string z JMS TextMessage je předán jako XML; extract() podporuje jak JSON dotpath, tak XPath
  • Z IBM MQ (EBCDIC): byte[] z JMS BytesMessage je dekódován z IBM-870 na Java String
public enum MessageContentType {
    JSON, XML, RAW_TEXT
}

public class ReceivedMessage {
    private final String body;                    // dekódovaný textový obsah
    private final MessageContentType contentType; // JSON, XML, nebo RAW_TEXT
    private final Map<String, String> headers;    // Kafka headers nebo JMS properties
    private final long timestamp;
    private final String source;                  // topic nebo queue name
    private final String key;                     // Kafka message key (null pro IBM MQ)

    // JSON navigace (dot/bracket notace: "items[0].sku")
    public JsonNode extractJson(String path) { ... }

    // XPath navigace (pro XML zprávy: "/response/balance")
    public String extractXml(String xpathExpression) { ... }

    // Univerzální extract - auto-detekce podle contentType
    public String extract(String expression) {
        return switch (contentType) {
            case JSON     -> extractJson(expression).asText();
            case XML      -> extractXml(expression);
            case RAW_TEXT -> body;
        };
    }

    public String getKey() { ... }                // Kafka klíč, null pro IBM MQ
    public String getHeader(String name) { ... }  // Kafka header nebo JMS property
    public String getBody() { ... }               // surový textový obsah
    public long getTimestamp() { ... }
    public MessageContentType getContentType() { ... }

    /**
     * Deserializace těla zprávy do Java objektu.
     * Pro JSON: Jackson ObjectMapper.readValue(body, type)
     * Pro XML: JAXB Unmarshaller nebo Jackson XmlMapper
     */
    public <T> T mapTo(Class<T> type) { ... }
}

3. Endpoints (test-harness-master)

KafkaEndpoint

Umístění: cz.moneta.test.harness.endpoints.kafka.KafkaEndpoint

  • Vzor: analogie s Wso2GatewayEndpoint -- konstruktor přijímá StoreAccessor, čte endpoints.kafka.* konfiguraci
  • Čte konfiguraci z StoreAccessor:
    • endpoints.kafka.bootstrap-servers
    • endpoints.kafka.security-protocol
    • endpoints.kafka.sasl-mechanism
    • endpoints.kafka.schema-registry-url
    • endpoints.kafka.value-serializer
  • Credentials z Vaultu (path: vault.kafka.secrets.path):
    • Kafka API key + secret
    • Schema Registry API key + secret
  • Lazy init KafkaConnectoru
public class KafkaEndpoint implements Endpoint {
    private final KafkaConnector connector;
    private final StoreAccessor store;

    public KafkaEndpoint(StoreAccessor store) {
        this.store = store;
        String bootstrapServers = Optional.ofNullable(store.getConfig("endpoints.kafka.bootstrap-servers"))
            .orElseThrow(() -> new IllegalStateException(
                "You need to configure endpoints.kafka.bootstrap-servers"));
        String schemaRegistryUrl = store.getConfig("endpoints.kafka.schema-registry-url");
        // API key/secret z Vaultu
        // ...
        this.connector = new KafkaConnector(bootstrapServers, apiKey, apiSecret,
            schemaRegistryUrl, srApiKey, srApiSecret);
    }

    public void send(String topic, String key, String jsonPayload,
                     Map<String, String> headers) {
        connector.send(topic, key, jsonPayload, headers);
    }

    public List<ReceivedMessage> receive(String topic,
                                          Predicate<ReceivedMessage> filter,
                                          Duration timeout) {
        return connector.receive(topic, filter, timeout);
    }

    @Override
    public void close() { connector.close(); }
}

ImqFirstVisionEndpoint

Umístění: cz.moneta.test.harness.endpoints.imq.ImqFirstVisionEndpoint

  • Vzor: analogie s Wso2GatewayEndpoint
  • Čte konfiguraci z StoreAccessor:
    • endpoints.imq-first-vision.connection-name-list -- formát host1(port1),host2(port2) pro multi-instance QMGR
    • endpoints.imq-first-vision.channel
    • endpoints.imq-first-vision.queue-manager
    • endpoints.imq-first-vision.ssl-cipher-suite
  • User i heslo z Vaultu (path: vault.imq-first-vision.secrets.path)
  • Lazy init IbmMqConnectoru
public class ImqFirstVisionEndpoint implements Endpoint {
    private final IbmMqConnector connector;
    private final StoreAccessor store;

    public ImqFirstVisionEndpoint(StoreAccessor store) {
        this.store = store;
        // connection-name-list: "mq9multitst5x(1414),mq9multitst6x(1414)"
        String connectionNameList = Optional.ofNullable(
                store.getConfig("endpoints.imq-first-vision.connection-name-list"))
            .orElseThrow(() -> new IllegalStateException(
                "You need to configure endpoints.imq-first-vision.connection-name-list"));
        String channel = store.getConfig("endpoints.imq-first-vision.channel");
        String queueManager = store.getConfig("endpoints.imq-first-vision.queue-manager");
        // user + password z Vaultu
        String user = // z Vaultu (vault.imq-first-vision.secrets.path -> "user")
        String password = // z Vaultu (vault.imq-first-vision.secrets.path -> "password")
        // ...
        this.connector = new IbmMqConnector(connectionNameList, channel,
            queueManager, user, password, keystorePath, keystorePassword);
    }

    public void send(String queueName, String payload,
                     MqMessageFormat format, Map<String, String> properties) {
        connector.send(queueName, payload, format, properties);
    }

    public ReceivedMessage receive(String queueName, String messageSelector,
                                    MqMessageFormat format, Duration timeout) {
        return connector.receive(queueName, messageSelector, format, timeout);
    }

    public List<ReceivedMessage> browse(String queueName, String messageSelector,
                                         MqMessageFormat format, int maxMessages) {
        return connector.browse(queueName, messageSelector, format, maxMessages);
    }

    // Resolve logického názvu fronty z konfigurace
    public String resolveQueue(String logicalName) {
        return Optional.ofNullable(
                store.getConfig("endpoints.imq-first-vision." + logicalName + ".queue"))
            .orElseThrow(() -> new IllegalStateException(
                "Queue '" + logicalName + "' is not configured in " +
                "endpoints.imq-first-vision." + logicalName + ".queue"));
    }

    @Override
    public void close() { connector.close(); }
}

4. DSL Builder (test-master)

Kafka DSL

Umístění: cz.moneta.test.dsl.kafka.Kafka

Vstupní bod přes Harness:

public Kafka withKafka() {
    return new Kafka(this);
}

DSL třída uvnitř získá endpoint přes existující mechanismus:

public class Kafka {
    private final Harness harness;

    public Kafka(Harness harness) { this.harness = harness; }

    public KafkaRequest.TopicPhase prepareRequest() {
        KafkaEndpoint endpoint = harness.getEndpoint(KafkaEndpoint.class);
        return KafkaRequest.builder(endpoint);
    }
}

ImqFirstVision DSL

Umístění: cz.moneta.test.dsl.imq.ImqFirstVision

Vstupní bod přes Harness:

public ImqFirstVision withImqFirstVision() {
    return new ImqFirstVision(this);
}

DSL třída uvnitř získá endpoint přes existující mechanismus:

public class ImqFirstVision {
    private final Harness harness;

    public ImqFirstVision(Harness harness) { this.harness = harness; }

    public ImqRequest.QueuePhase prepareRequest() {
        ImqFirstVisionEndpoint endpoint = harness.getEndpoint(ImqFirstVisionEndpoint.class);
        return ImqRequest.builder(endpoint);
    }
}

Registrace endpointů v BaseStoreAccessor

KafkaEndpoint a ImqFirstVisionEndpoint nevyžadují explicitní registraci. Existující mechanismus v BaseStoreAccessor.getEndpoint(Class) je automaticky instanciuje přes reflexi (konstruktor (StoreAccessor)), cache-uje a spravuje lifecycle (zavírá přes close()). Podmínkou je, aby nové endpointy měly veřejný konstruktor public XxxEndpoint(StoreAccessor store) a implementovaly Endpoint interface.

ImqFirstVisionQueue enum

Umístění: cz.moneta.test.harness.endpoints.imq.ImqFirstVisionQueue

Enum definuje logické názvy front. Fyzický název fronty se resolvuje z konfigurace (endpoints.imq-first-vision.{logicalName}.queue). Tester používá enum konstantu -- nemusí znát fyzický název fronty ani konfigurační klíč.

public enum ImqFirstVisionQueue {
    PAYMENT_NOTIFICATIONS("payment-notifications"),
    PAYMENT_REQUEST("payment-request"),
    MF_REQUESTS("mf-requests"),
    MF_RESPONSES("mf-responses"),
    MF_EBCDIC("mf-ebcdic"),
    MF_UTF8("mf-utf8");

    private final String configKey;

    ImqFirstVisionQueue(String configKey) {
        this.configKey = configKey;
    }

    public String getConfigKey() { return configKey; }
}

Resoluce v ImqFirstVisionEndpoint.resolveQueue():

  • ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS -> čte endpoints.imq-first-vision.payment-notifications.queue -> vrátí "MVSW2TST3.DELIVERY.NOTIFICATION"

KafkaRequest fluent builder

Umístění: cz.moneta.test.harness.support.messaging.KafkaRequest

ImqRequest fluent builder

Umístění: cz.moneta.test.harness.support.messaging.ImqRequest

Definice fluent API pro realizaci

Následující rozhraní a metody definují kompletní fluent API. DSL třídy (Kafka, ImqFirstVision) delegují na request buildery (KafkaRequest, ImqRequest).

Fáze builderu: Každá metoda vrací rozhraní, které umožňuje další volání až do terminálu (send() nebo MessageResponse).

Legenda k implementaci:

  • [EXISTUJE] -- obdobná logika již je v frameworku (RawRestRequest), lze znovu využít nebo adaptovat
  • [NOVÉ] -- je potřeba implementovat
KafkaRequest fázová rozhraní
// Fáze 1: Po withKafka() -- výběr směru (odeslání vs. příjem) -- [NOVÉ]
public interface KafkaSendPhase {
    KafkaPayloadPhase toTopic(String topic);
}
public interface KafkaReceivePhase {
    KafkaReceiveFilterPhase fromTopic(String topic);
}

// Fáze 2a: Odeslání - payload a headery
// withPayload, withPayloadFromFile, addField, appendToArray, withHeader [EXISTUJE] v RawRestRequest
// withKey, withTraceparent/RequestID/ActivityID/SourceCodebookId, send [NOVÉ]
public interface KafkaPayloadPhase {
    KafkaPayloadPhase withKey(String key);                    // [NOVÉ]
    KafkaPayloadPhase withPayload(String json);                // [EXISTUJE]
    KafkaPayloadPhase withPayloadFromFile(String path);        // [EXISTUJE]
    KafkaPayloadPhase withPayloadFromTemplate(String path, Map<String, Object> variables);  // [EXISTUJE] nice to have
    KafkaPayloadPhase addField(String fieldName, Object value);
    KafkaPayloadPhase addField(String path, String fieldName, Object value);
    KafkaPayloadPhase appendToArray(String path, Object value);
    KafkaPayloadPhase withTraceparent(String value);           // [NOVÉ]
    KafkaPayloadPhase withRequestID(String value);             // [NOVÉ]
    KafkaPayloadPhase withActivityID(String value);            // [NOVÉ]
    KafkaPayloadPhase withSourceCodebookId(String value);      // [NOVÉ]
    KafkaPayloadPhase withHeader(String key, String value);   // [EXISTUJE]
    void send();                                               // [NOVÉ]
}

// Fáze 2b: Příjem - filtr a timeout -- [NOVÉ]
public interface KafkaReceiveFilterPhase {
    KafkaAwaitingPhase receiveWhere(Predicate<ReceivedMessage> filter);
}
public interface KafkaAwaitingPhase {
    MessageResponse withTimeout(long duration, TimeUnit unit);
}

// MessageResponse -- obdobné RawRestRequest.Response
// andAssertFieldValue, andAssertPresent, andAssertNotPresent, extract, mapTo [EXISTUJE]
// andAssertHeaderValue, andAssertBodyContains, andAssertWithAssertJ [NOVÉ]
public interface MessageResponse {
    MessageResponse andAssertFieldValue(String path, String value);   // [EXISTUJE]
    MessageResponse andAssertPresent(String path);                     // [EXISTUJE]
    MessageResponse andAssertNotPresent(String path);                  // [EXISTUJE]
    MessageResponse andAssertHeaderValue(String headerName, String value);  // [NOVÉ]
    MessageResponse andAssertBodyContains(String substring);           // [NOVÉ] primárně IBM MQ
    ObjectAssert<?> andAssertWithAssertJ();                            // [NOVÉ] nice to have
    JsonPathValue extract(String path);                                // [EXISTUJE]
    <T> T mapTo(Class<T> type);                                        // [EXISTUJE] jako post(Class)
    ReceivedMessage getMessage();
    String getBody();
    String getHeader(String name);
}
ImqRequest fázová rozhraní
// Fáze 1: Po withImqFirstVision() -- [NOVÉ]
public interface ImqSendPhase {
    ImqPayloadPhase toQueue(ImqFirstVisionQueue queue);
}
public interface ImqReceivePhase {
    ImqReceiveFilterPhase fromQueue(ImqFirstVisionQueue queue);
}

// Fáze 2a: Odeslání - asXml/asEbcdic/asUtf8 [NOVÉ], payload/addField/appendToArray [EXISTUJE]
public interface ImqPayloadPhase {
    ImqPayloadPhase asXml();           // [NOVÉ]
    ImqPayloadPhase asEbcdic();         // [NOVÉ]
    ImqPayloadPhase asUtf8();           // [NOVÉ]
    ImqPayloadPhase withPayload(String payload);
    ImqPayloadPhase withPayloadFromFile(String path);
    ImqPayloadPhase withPayloadFromTemplate(String path, Map<String, Object> variables);
    ImqPayloadPhase addField(String fieldName, Object value);
    ImqPayloadPhase addField(String path, String fieldName, Object value);
    ImqPayloadPhase appendToArray(String path, Object value);
    void send();
}

// Fáze 2b: Příjem - withSelector, receiveWhere, browse [NOVÉ]
public interface ImqReceiveFilterPhase {
    ImqReceiveFilterPhase withSelector(String jmsMessageSelector);   // [NOVÉ]
    ImqAwaitingPhase receiveWhere(Predicate<ReceivedMessage> filter);
    List<ReceivedMessage> browse(int maxMessages);                   // [NOVÉ]
}
public interface ImqAwaitingPhase {
    MessageResponse withTimeout(long duration, TimeUnit unit);
}
Přehledná tabulka metod
Kontext Metoda Návrat Poznámka
Harness withKafka() Kafka [NOVÉ] vstupní bod
Harness withImqFirstVision() ImqFirstVision [NOVÉ] vstupní bod
Kafka toTopic(String) KafkaPayloadPhase [NOVÉ] odeslání
Kafka fromTopic(String) KafkaReceiveFilterPhase [NOVÉ] příjem
KafkaPayloadPhase withKey(String) KafkaPayloadPhase [NOVÉ] Kafka specifické
KafkaPayloadPhase withPayload(String) KafkaPayloadPhase [EXISTUJE] RawRestRequest.withPayload
KafkaPayloadPhase withPayloadFromFile(String) KafkaPayloadPhase [EXISTUJE] RawRestRequest.withPayloadFromFile
KafkaPayloadPhase withPayloadFromTemplate(String, Map) KafkaPayloadPhase [EXISTUJE] RawRestRequest.withPayloadFromTemplate (Template API) -- nice to have
KafkaPayloadPhase addField(...) KafkaPayloadPhase [EXISTUJE] RawRestRequest.addField, JSON pouze
KafkaPayloadPhase appendToArray(...) KafkaPayloadPhase [EXISTUJE] RawRestRequest.appendToArray, JSON pouze
KafkaPayloadPhase withTraceparent/RequestID/ActivityID/SourceCodebookId(String) KafkaPayloadPhase [NOVÉ] Kafka headery
KafkaPayloadPhase withHeader(String, String) KafkaPayloadPhase [EXISTUJE] RawRestRequest.withHeader
KafkaPayloadPhase send() void [NOVÉ] terminál (messaging specifické)
KafkaReceiveFilterPhase receiveWhere(Predicate) KafkaAwaitingPhase [NOVÉ]
ImqFirstVision toQueue(ImqFirstVisionQueue) ImqPayloadPhase [NOVÉ]
ImqFirstVision fromQueue(ImqFirstVisionQueue) ImqReceiveFilterPhase [NOVÉ]
ImqPayloadPhase asXml(), asEbcdic(), asUtf8() ImqPayloadPhase [NOVÉ] IBM MQ formát
ImqPayloadPhase withPayload, withPayloadFromFile, ... ImqPayloadPhase [EXISTUJE] jako Kafka
ImqReceiveFilterPhase withSelector(String) ImqReceiveFilterPhase [NOVÉ] JMS selector
KafkaAwaitingPhase withTimeout(long, TimeUnit) MessageResponse [NOVÉ]
ImqReceiveFilterPhase receiveWhere(Predicate) ImqAwaitingPhase [NOVÉ]
ImqAwaitingPhase withTimeout(long, TimeUnit) MessageResponse [NOVÉ]
ImqReceiveFilterPhase browse(int maxMessages) List<ReceivedMessage> [NOVÉ] terminál, nedestruktivní
MessageResponse andAssertFieldValue(path, value) MessageResponse [EXISTUJE] RawRestRequest.Response.andAssertFieldValue
MessageResponse andAssertPresent(path) MessageResponse [EXISTUJE] RawRestRequest.Response.andAssertPresent
MessageResponse andAssertNotPresent(path) MessageResponse [EXISTUJE] RawRestRequest.Response.andAssertNotPresent
MessageResponse andAssertHeaderValue(name, value) MessageResponse [NOVÉ] Kafka header / JMS property
MessageResponse andAssertBodyContains(substring) MessageResponse [NOVÉ] EBCDIC/UTF-8
MessageResponse andAssertWithAssertJ() ObjectAssert [NOVÉ] nice to have
MessageResponse extract(path) JsonPathValue [EXISTUJE] RawRestRequest.Response.extract, .asText()
MessageResponse mapTo(Class) T [EXISTUJE] RawRestRequest.post(Class) -- deserializace
MessageResponse getBody(), getHeader(name) String [EXISTUJE] Response přístup k datům

Poznámka k řetězci receiveWhere: U Kafky receiveWhere(filter) vrací objekt s withTimeout() timeout je povinný před assert. U IBM MQ může být timeout defaultní nebo explicitní dle implementace.

Vzor podobný RawRestRequest - fluent builder s fázovými interfaces:

// =============================================
// === harness.withKafka() -- Kafka (Avro) =====
// =============================================
// Vstup: topic (schéma se resolví automaticky z Registry), klíč, tělo (JSON), headery

// Odeslání zprávy na Kafka topic se standardními headery
harness.withKafka()
    .toTopic("order-events")
    .withKey("order-123")
    .withPayload("{\"orderId\": \"123\", \"status\": \"CREATED\"}")
    .withTraceparent("00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01")
    .withRequestID("req-001")
    .withActivityID("act-555")
    .withSourceCodebookId("CB-01")
    .send();

// Libovolné vlastní headery
harness.withKafka()
    .toTopic("order-events")
    .withKey("order-456")
    .withPayload("{\"orderId\": \"456\", \"status\": \"SHIPPED\"}")
    .withHeader("customHeader", "customValue")
    .send();

// Odeslání z JSON souboru
harness.withKafka()
    .toTopic("order-events")
    .withKey("order-789")
    .withPayloadFromFile("messaging/order_event.json")
    .withRequestID("req-002")
    .send();

// Příjem zprávy z Kafka s filtrem (Avro -> JSON konverze automaticky)
harness.withKafka()
    .fromTopic("order-events")
    .receiveWhere(msg -> msg.extract("orderId").equals("123"))
    .withTimeout(30, TimeUnit.SECONDS)
    .andAssertFieldValue("status", "CREATED");

// =============================================
// === harness.withImqFirstVision() -- IBM MQ ==
// =============================================

// --- JSON (výchozí formát) ---
// Tester používá enum ImqFirstVisionQueue -- fyzický název fronty se resolvuje z konfigurace

// Odeslání JSON zprávy do IBM MQ fronty
harness.withImqFirstVision()
    .toQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
    .withPayload("{\"paymentId\": \"PAY-456\", \"result\": \"OK\"}")
    .send();

// Příjem JSON zprávy z IBM MQ fronty
harness.withImqFirstVision()
    .fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
    .receiveWhere(msg -> msg.extract("paymentId").equals("PAY-456"))
    .withTimeout(10, TimeUnit.SECONDS)
    .andAssertFieldValue("result", "OK");

// --- XML ---

// Odeslání XML zprávy
harness.withImqFirstVision()
    .toQueue(ImqFirstVisionQueue.MF_REQUESTS)
    .asXml()
    .withPayload("<request><accountId>12345</accountId><action>BALANCE</action></request>")
    .send();

// Odeslání XML ze souboru
harness.withImqFirstVision()
    .toQueue(ImqFirstVisionQueue.MF_REQUESTS)
    .asXml()
    .withPayloadFromFile("messaging/mf_request.xml")
    .send();

// Příjem XML zprávy s XPath filtrací
harness.withImqFirstVision()
    .fromQueue(ImqFirstVisionQueue.MF_RESPONSES)
    .asXml()
    .receiveWhere(msg -> msg.extract("/response/accountId").equals("12345"))
    .withTimeout(15, TimeUnit.SECONDS)
    .andAssertFieldValue("/response/balance", "50000");

// --- UTF-8 / CCSID 1208 (BytesMessage) ---

// Odeslání zprávy v UTF-8 jako BytesMessage (CCSID 1208)
harness.withImqFirstVision()
    .toQueue(ImqFirstVisionQueue.MF_UTF8)
    .asUtf8()
    .withPayload("DATA|12345|ÚČET|CZK")
    .send();

// Příjem UTF-8 BytesMessage
harness.withImqFirstVision()
    .fromQueue(ImqFirstVisionQueue.MF_UTF8)
    .asUtf8()
    .receiveWhere(msg -> msg.getBody().contains("12345"))
    .withTimeout(15, TimeUnit.SECONDS);

// --- EBCDIC / CCSID 870 ---

// Odeslání zprávy v EBCDIC kódování (mainframe, CZ/SK znaky)
harness.withImqFirstVision()
    .toQueue(ImqFirstVisionQueue.MF_EBCDIC)
    .asEbcdic()
    .withPayload("PŘIKAZ|12345|ZŮSTATEK|CZK")
    .send();

// Příjem EBCDIC zprávy (automaticky dekódováno z IBM-870 na String)
harness.withImqFirstVision()
    .fromQueue(ImqFirstVisionQueue.MF_EBCDIC)
    .asEbcdic()
    .receiveWhere(msg -> msg.getBody().contains("12345"))
    .withTimeout(15, TimeUnit.SECONDS);

Inkrementální stavba payload pomocí addField / appendToArray

Stejný vzor jako v existující třídě RawRestRequest.Builder (viz addField(), appendToArray() na řádcích 253-342 v RawRestRequest.java). Funguje pro JSON payloady (Kafka i IBM MQ JSON formát). Používá Jackson ObjectMapper pro manipulaci s JSON stromem.

// === Kafka: Sestavení payload přes addField ===

// Začátek s prázdným JSON objektem
harness.withKafka()
    .toTopic("order-events")
    .withKey("order-123")
    .withPayload("{}")
    .addField("orderId", "123")
    .addField("status", "CREATED")
    .addField("items", Arrays.asList(
        Map.of("sku", "A001", "qty", 2),
        Map.of("sku", "B002", "qty", 1)))
    .addField("items[1]", "note", "fragile")
    .withRequestID("req-001")
    .send();
// Výsledný payload: {"orderId":"123","status":"CREATED",
//   "items":[{"sku":"A001","qty":2},{"sku":"B002","qty":1,"note":"fragile"}]}

// Přidání do existujícího payloadu ze souboru
harness.withKafka()
    .toTopic("order-events")
    .withKey("order-456")
    .withPayloadFromFile("messaging/order_event.json")
    .addField("metadata", new Object())
    .addField("metadata", "source", "test-harness")
    .addField("metadata", "timestamp", System.currentTimeMillis())
    .send();

// Přidání prvku do existujícího pole
harness.withKafka()
    .toTopic("order-events")
    .withKey("order-789")
    .withPayloadFromFile("messaging/order_base.json")
    .appendToArray("items", Map.of("sku", "C003", "qty", 5))
    .send();

// === IBM MQ: Sestavení JSON payload přes addField ===

harness.withImqFirstVision()
    .toQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
    .withPayload("{}")
    .addField("paymentId", "PAY-456")
    .addField("amount", 15000)
    .addField("currency", "CZK")
    .addField("beneficiary", new Object())
    .addField("beneficiary", "name", "Jan Novák")
    .addField("beneficiary", "accountNumber", "1234567890/0100")
    .send();

Podporované metody na builder fázi po withPayload() / withPayloadFromFile():

  • addField(String fieldName, Object value) -- přidá pole do kořene JSON
  • addField(String path, String fieldName, Object value) -- přidá pole do vnořeného uzlu; path používá tečkovou notaci a hranatou závorku pro indexy polí (např. "items[2].details")
  • appendToArray(String path, Object value) -- přidá prvek na konec existujícího JSON pole
  • withPayloadFromTemplate(String templatePath, Map<String, Object> variables) -- (nice to have) načte šablonu ze souboru a nahradí placeholdery (${varName}) hodnotami z mapy; hodnoty mohou pocházet z harness.store(). Pro složitější payloady bez nutnosti programově sestavovat JSON.

Poznámka: addField a appendToArray fungují pouze pro JSON formát. Pro XML a EBCDIC/UTF8 formáty v IBM MQ se payload sestavuje jako celý String.

Varianty výběru zprávy v receiveWhere

Kafka -- receiveWhere

Kafka receiveWhere přijímá Predicate<ReceivedMessage> -- libovolný Java predikát na deserializovanou zprávu (Avro -> JSON). Zprávy se prohledávají sekvenčně v polling smyčce. Vrátí se první zpráva, která vyhovuje predikátu.

// --- Filtrace podle obsahu JSON pole ---
harness.withKafka()
    .fromTopic("order-events")
    .receiveWhere(msg -> msg.extract("orderId").equals("123"))
    .withTimeout(30, TimeUnit.SECONDS);

// --- Filtrace podle více polí (AND) ---
harness.withKafka()
    .fromTopic("order-events")
    .receiveWhere(msg ->
        msg.extract("orderId").equals("123")
        && msg.extract("status").equals("CREATED"))
    .withTimeout(30, TimeUnit.SECONDS);

// --- Filtrace podle Kafka headeru ---
harness.withKafka()
    .fromTopic("order-events")
    .receiveWhere(msg -> "req-001".equals(msg.getHeader("requestID")))
    .withTimeout(30, TimeUnit.SECONDS);

// --- Filtrace podle klíče zprávy ---
harness.withKafka()
    .fromTopic("order-events")
    .receiveWhere(msg -> "order-123".equals(msg.getKey()))
    .withTimeout(30, TimeUnit.SECONDS);

// --- Filtrace kombinací headeru a obsahu ---
harness.withKafka()
    .fromTopic("order-events")
    .receiveWhere(msg ->
        "act-555".equals(msg.getHeader("activityID"))
        && msg.extract("status").equals("SHIPPED"))
    .withTimeout(30, TimeUnit.SECONDS);

// --- Příjem první dostupné zprávy (bez filtru) ---
harness.withKafka()
    .fromTopic("order-events")
    .receiveWhere(msg -> true)
    .withTimeout(10, TimeUnit.SECONDS);

// --- Příjem s vlastním timeoutem ---
harness.withKafka()
    .fromTopic("order-events")
    .receiveWhere(msg -> msg.extract("orderId").equals("123"))
    .withTimeout(60, TimeUnit.SECONDS);  // delší timeout pro pomalé systémy

ReceivedMessage metody dostupné v predikátu (Kafka):

  • msg.extract("dotPath") -- hodnota JSON pole (dot/bracket notace)
  • msg.getHeader("headerName") -- hodnota Kafka headeru
  • msg.getKey() -- klíč zprávy
  • msg.getBody() -- celé tělo jako JSON String
  • msg.getTimestamp() -- timestamp zprávy
IBM MQ -- receiveWhere

IBM MQ podporuje dvě úrovně filtrace:

  1. JMS Message Selector (server-side) -- efektivní filtr na JMS properties/headery, vyhodnocuje se na straně MQ serveru
  2. Predicate na obsahu (client-side) -- filtr na tělo zprávy po deserializaci, vyhodnocuje se v klientu
// --- Filtrace podle obsahu JSON pole ---
harness.withImqFirstVision()
    .fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
    .receiveWhere(msg -> msg.extract("paymentId").equals("PAY-456"))
    .withTimeout(10, TimeUnit.SECONDS);

// --- Filtrace podle JMS CorrelationID (server-side selector) ---
harness.withImqFirstVision()
    .fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
    .withSelector("JMSCorrelationID = 'corr-789'")
    .receiveWhere(msg -> true)
    .withTimeout(10, TimeUnit.SECONDS);

// --- Kombinace JMS selectoru a content filtru ---
harness.withImqFirstVision()
    .fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
    .withSelector("JMSType = 'payment'")
    .receiveWhere(msg -> msg.extract("amount").equals("15000"))
    .withTimeout(10, TimeUnit.SECONDS);

// --- XML zprávy s XPath filtrací ---
harness.withImqFirstVision()
    .fromQueue(ImqFirstVisionQueue.MF_RESPONSES)
    .asXml()
    .receiveWhere(msg -> msg.extract("/response/accountId").equals("12345"))
    .withTimeout(15, TimeUnit.SECONDS);

// --- EBCDIC / UTF-8 zprávy -- filtrace přes raw body ---
harness.withImqFirstVision()
    .fromQueue(ImqFirstVisionQueue.MF_EBCDIC)
    .asEbcdic()
    .receiveWhere(msg -> msg.getBody().contains("12345"))
    .withTimeout(15, TimeUnit.SECONDS);

// --- Příjem první dostupné zprávy ---
harness.withImqFirstVision()
    .fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
    .receiveWhere(msg -> true)
    .withTimeout(10, TimeUnit.SECONDS);

// --- Browse (ne-destruktivní čtení, zprávy zůstávají ve frontě) ---
List<ReceivedMessage> peeked = harness.withImqFirstVision()
    .fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
    .browse(10);  // max 10 zpráv z fronty
// s JMS selektorem: .withSelector("JMSType = 'payment'").browse(10)

ReceivedMessage metody dostupné v predikátu (IBM MQ):

  • msg.extract("expression") -- hodnota pole (JSON dot-path pro JSON, XPath pro XML)
  • msg.getBody() -- celé tělo zprávy jako String
  • msg.getHeader("jmsProperty") -- hodnota JMS property

Klíčový rozdíl: .withSelector() je server-side filtr (SQL-92 subset, filtruje JMS properties) -- použijte ho pro efektivní filtraci podle korelačních ID. .receiveWhere() je client-side filtr na obsahu zprávy.

Chybové stavy -- výjimky a assert metody

Výjimky (propagovány okamžitě, test failuje)

Tyto chyby signalizují infrastrukturní problém nebo chybu konfigurace -- test nemůže pokračovat:

// --- Chyba připojení (Kafka i IBM MQ) ---
// KafkaConnectionException / JMSException obalené do RuntimeException
// Příčiny: špatný bootstrap-servers, connection-name-list, credentials, síťový problém
try {
    harness.withKafka()
        .toTopic("order-events")
        .withKey("order-123")
        .withPayload("{\"orderId\": \"123\"}")
        .send();
} catch (MessagingConnectionException e) {
    // "Failed to connect to Kafka cluster at pkc-xxxxx:9092: Authentication failed"
    // "Failed to connect to IBM MQ: mq9multitst5x(1414),mq9multitst6x(1414) - MQRC 2035 (NOT_AUTHORIZED)"
}

// --- Chyba schématu (Kafka) ---
// Schema Registry vrátí 404 nebo payload neodpovídá schématu
try {
    harness.withKafka()
        .toTopic("nonexistent-topic")
        .withKey("key")
        .withPayload("{\"unknownField\": \"value\"}")
        .send();
} catch (MessagingSchemaException e) {
    // "Schema not found for subject 'nonexistent-topic-value' in Schema Registry"
    // "Payload does not conform to Avro schema for topic 'order-events': field 'orderId' is required"
}

// --- Queue neexistuje (IBM MQ) ---
// Výjimka nastane i při chybné konfiguraci logického názvu v properties
try {
    harness.withImqFirstVision()
        .toQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS) // resolvuje se z konfigurace
        .withPayload("{}")
        .send();
} catch (MessagingDestinationException e) {
    // "Queue 'MVSW2TST3.DELIVERY.NOTIFICATION' not found: MQRC 2085 (UNKNOWN_OBJECT_NAME)"
}

// --- Timeout při příjmu (Kafka i IBM MQ) ---
// Žádná zpráva nevyhověla predikátu v daném čase
try {
    harness.withKafka()
        .fromTopic("order-events")
        .receiveWhere(msg -> msg.extract("orderId").equals("nonexistent"))
        .withTimeout(5, TimeUnit.SECONDS);
} catch (MessagingTimeoutException e) {
    // "No message matching filter found on topic 'order-events' within 5 seconds"
    // "No message matching filter found on queue 'PAYMENT.NOTIFICATIONS' within 10 seconds"
}

// --- Chyba konfigurace (endpoint není nakonfigurován) ---
try {
    harness.withKafka()
        .toTopic("order-events")
        .withKey("key")
        .withPayload("{}")
        .send();
} catch (IllegalStateException e) {
    // "You need to configure endpoints.kafka.bootstrap-servers"
    // Stejný vzor jako u Wso2GatewayEndpoint
}

Hierarchie výjimek:

MessagingException (abstract, extends RuntimeException)
├── MessagingConnectionException    -- selhání připojení, autentizace
├── MessagingSchemaException        -- Avro schema mismatch, schema not found
├── MessagingDestinationException   -- topic/queue neexistuje, přístupová práva
└── MessagingTimeoutException       -- receive timeout, žádná matching zpráva
Assert metody (fluent, na přijaté zprávě)

Tyto metody se volají na výsledku receiveWhere -- zpráva byla nalezena a nyní se ověřuje její obsah. Assert selhání = AssertionError (standardní JUnit pattern).

// --- JSON assert metody (Kafka i IBM MQ JSON) ---

harness.withKafka()
    .fromTopic("order-events")
    .receiveWhere(msg -> msg.extract("orderId").equals("123"))
    .withTimeout(30, TimeUnit.SECONDS)
    // Assert na hodnotu pole (dot/bracket notace)
    .andAssertFieldValue("status", "CREATED")
    .andAssertFieldValue("items[0].sku", "A001")
    // Assert na přítomnost/nepřítomnost pole
    .andAssertPresent("items[0].qty")
    .andAssertNotPresent("deletedField")
    // Assert na Kafka header
    .andAssertHeaderValue("requestID", "req-001")
    // Extrakce hodnoty pro další použití v testu
    .extract("internalId").asText();  // -> String pro uložení do harness.store()

// --- XML assert metody (IBM MQ XML) ---

harness.withImqFirstVision()
    .fromQueue(ImqFirstVisionQueue.MF_RESPONSES)
    .asXml()
    .receiveWhere(msg -> msg.extract("/response/accountId").equals("12345"))
    .withTimeout(15, TimeUnit.SECONDS)
    // XPath assert
    .andAssertFieldValue("/response/balance", "50000")
    .andAssertFieldValue("/response/currency", "CZK")
    .andAssertPresent("/response/timestamp")
    .andAssertNotPresent("/response/error");

// --- Raw body assert (EBCDIC/UTF-8) ---

harness.withImqFirstVision()
    .fromQueue(ImqFirstVisionQueue.MF_EBCDIC)
    .asEbcdic()
    .receiveWhere(msg -> msg.getBody().contains("12345"))
    .withTimeout(15, TimeUnit.SECONDS)
    .andAssertBodyContains("ZŮSTATEK");

// --- Deserializace do Java objektu (mapTo) ---

PaymentNotification notif = harness.withImqFirstVision()
    .fromQueue(ImqFirstVisionQueue.PAYMENT_NOTIFICATIONS)
    .receiveWhere(msg -> msg.extract("paymentId").equals("PAY-456"))
    .withTimeout(10, TimeUnit.SECONDS)
    .mapTo(PaymentNotification.class);

Kompletní tabulka assert metod na MessageResponse:

  • andAssertFieldValue(String path, String value) -- JSON dot-path nebo XPath
  • andAssertPresent(String path) -- ověří existenci pole
  • andAssertNotPresent(String path) -- ověří neexistenci pole
  • andAssertHeaderValue(String headerName, String value) -- Kafka header nebo JMS property
  • andAssertBodyContains(String substring) -- raw body contains (pro EBCDIC/UTF-8/raw text)
  • andAssertWithAssertJ() -- (nice to have) vrací AssertJ AbstractObjectAssert pro komplexní fluent assertions .
  • extract(String path) -- extrahuje hodnotu pro další použití
  • mapTo(Class<T> type) -- deserializace těla zprávy do Java objektu (Jackson/JAXB)
  • getBody() -- raw body String
  • getHeader(String name) -- raw header/property value

5. Konfigurace

Konfigurační klíče dodržují konvenci existujících endpointů v Harness: prefix endpoints.{system-name}.{property} (viz endpoints.wso2.gw.url, endpoints.udebs.url atd.).

Properties soubory (envs/tst1):

# Kafka Confluent Cloud (Avro + Schema Registry)
endpoints.kafka.bootstrap-servers=pkc-xxxxx.eu-central-1.aws.confluent.cloud:9092
endpoints.kafka.security-protocol=SASL_SSL
endpoints.kafka.sasl-mechanism=PLAIN
endpoints.kafka.schema-registry-url=https://psrc-xxxxx.eu-central-1.aws.confluent.cloud
endpoints.kafka.value-serializer=avro
# API key + secret a Schema Registry credentials se načítají z Vaultu

# IBM MQ - First Vision (multi-instance QMGR)
endpoints.imq-first-vision.connection-name-list=mq9multitst5x(1414),mq9multitst6x(1414)
endpoints.imq-first-vision.channel=CLIENT.CHANNEL
endpoints.imq-first-vision.queue-manager=MVSW2TST3
endpoints.imq-first-vision.ssl-cipher-suite=TLS_RSA_WITH_AES_256_CBC_SHA256
# user + heslo se načítají z Vaultu

# Logické destinace (musí odpovídat enum ImqFirstVisionQueue)
endpoints.imq-first-vision.payment-notifications.queue=MVSW2TST3.DELIVERY.NOTIFICATION
endpoints.imq-first-vision.payment-request.queue=MVSW2TST3.DELIVERY.REQUEST
endpoints.imq-first-vision.mf-requests.queue=MVSW2TST3.MF.REQUESTS
endpoints.imq-first-vision.mf-responses.queue=MVSW2TST3.MF.RESPONSES
endpoints.imq-first-vision.mf-ebcdic.queue=MVSW2TST3.MF.EBCDIC
endpoints.imq-first-vision.mf-utf8.queue=MVSW2TST3.MF.UTF8

Vault paths:

vault.kafka.secrets.path=/kv/autotesty/tst1/kafka
vault.imq-first-vision.secrets.path=/kv/autotesty/tst1/imq-first-vision

6. Maven závislosti

V test-harness-master/pom.xml přidat:

  • org.apache.kafka:kafka-clients (Kafka producer/consumer)
  • io.confluent:kafka-avro-serializer (Avro + Schema Registry)
  • com.ibm.mq:com.ibm.mq.allclient (IBM MQ JMS klient)
  • jakarta.jms:jakarta.jms-api (JMS API)
  • org.assertj:assertj-core (pro andAssertWithAssertJ(), nice to have )

7. Řešení klíčových problémů

Poll cyklus a hledání zprávy v Kafce

flowchart TD
    Start["receive(topic, filter, timeout)"] --> CreateConsumer["Vytvořit KafkaConsumer bez group.id"]
    CreateConsumer --> Assign["assign() na všechny partitions"]
    Assign --> SeekEnd["seekToEnd() pro všechny partitions"]
    SeekEnd --> SavePos["Uložit aktuální pozice"]
    SavePos --> Poll["poll(pollInterval)"]
    Poll --> Check{"Jsou nějaké záznamy?"}
    Check -->|Ano| Filter{"Odpovídá filter?"}
    Check -->|Ne| Timeout{"Vypršel timeout?"}
    Filter -->|Ano| Return["Vrátit ReceivedMessage"]
    Filter -->|Ne| Timeout
    Timeout -->|Ne| Backoff["Backoff wait"]
    Backoff --> Poll
    Timeout -->|Ano| Throw["Throw MessagingTimeoutException"]

Klíčové body:

  • seekToEnd() se volá před triggerem akce v testu (tester typicky: 1. připraví listener, 2. provede akci, 3. čeká na zprávu)
  • Alternativně: seek na pozici uloženou před testem
  • Polling interval: 100ms, exponential backoff do max 1s
  • Default timeout: 30s (konfigurovatelný)

Izolace testů (consumer groups)

Kafka:

  • Consumer bez group.id s manuálním assign() + seek()
  • Každý příjem zprávy vytvoří nový Consumer, přečte data a zavře ho
  • Žádné sdílení offsetů mezi testy ani vlákny
  • Alternativa pro komplexní scénáře: unikátní group.id per test (UUID)

IBM MQ:

  • Pro nedestruktivní čtení: QueueBrowser (browse bez odstranění zprávy)
  • JMS message selector (JMSCorrelationID = 'xxx') pro cílený příjem
  • Každý test si filtruje zprávy podle korelačních údajů

8. Souhrnný seznam souborů k vytvoření/úpravě

Nové soubory v test-harness-master:

  • connectors/messaging/KafkaConnector.java
  • connectors/messaging/IbmMqConnector.java
  • endpoints/kafka/KafkaEndpoint.java
  • endpoints/imq/ImqFirstVisionEndpoint.java
  • endpoints/imq/ImqFirstVisionQueue.java (enum: logické názvy front)
  • support/messaging/ReceivedMessage.java
  • support/messaging/MessageContentType.java (enum: JSON, XML, RAW_TEXT)
  • support/messaging/MqMessageFormat.java (enum: JSON, XML, EBCDIC_870, UTF8_1208)
  • support/messaging/MessageResponse.java (interface: assert, extract, mapTo -- sdílený pro Kafka i IBM MQ)
  • support/messaging/KafkaRequest.java (fluent builder pro Kafka)
  • support/messaging/ImqRequest.java (fluent builder pro IBM MQ)
  • messaging/exception/MessagingException.java (abstract, extends RuntimeException)
  • messaging/exception/MessagingConnectionException.java
  • messaging/exception/MessagingSchemaException.java
  • messaging/exception/MessagingDestinationException.java
  • messaging/exception/MessagingTimeoutException.java

Nové soubory v test-master:

  • dsl/kafka/Kafka.java (DSL třída pro withKafka())
  • dsl/imq/ImqFirstVision.java (DSL třída pro withImqFirstVision())

Úpravy existujících souborů:

  • test-harness-master/pom.xml - přidat závislosti Kafka + IBM MQ
  • test-master/pom.xml - případné závislosti
  • test-master/.../Harness.java - přidat withKafka() a withImqFirstVision() metody
  • test-master/src/test/resources/envs/tst1 - přidat endpoints.kafka.* a endpoints.imq-first-vision.*
  • test-master/src/test/resources/envs/ppe - přidat endpoints.kafka.* a endpoints.imq-first-vision.*