From 70c4540c4acddb83abc75d9d81158626e4c8b1ab Mon Sep 17 00:00:00 2001 From: Radek Davidek Date: Thu, 26 Mar 2026 10:28:48 +0100 Subject: [PATCH] added kafka to tests --- .../main/java/cz/moneta/test/dsl/Harness.java | 5 + .../java/cz/moneta/test/dsl/kafka/Kafka.java | 42 ++++ .../test/system/messaging/KafkaTest.java | 235 ++++++++++++++++++ 3 files changed, 282 insertions(+) create mode 100644 tests/src/main/java/cz/moneta/test/dsl/kafka/Kafka.java create mode 100644 tests/src/test/java/cz/moneta/test/system/messaging/KafkaTest.java diff --git a/tests/src/main/java/cz/moneta/test/dsl/Harness.java b/tests/src/main/java/cz/moneta/test/dsl/Harness.java index 8e331c2..55c13ab 100644 --- a/tests/src/main/java/cz/moneta/test/dsl/Harness.java +++ b/tests/src/main/java/cz/moneta/test/dsl/Harness.java @@ -24,6 +24,7 @@ import cz.moneta.test.dsl.hypos.Hypos; import cz.moneta.test.dsl.ib.Ib; import cz.moneta.test.dsl.ilods.Ilods; import cz.moneta.test.dsl.imq.ImqFirstVision; +import cz.moneta.test.dsl.kafka.Kafka; import cz.moneta.test.dsl.kasanova.Kasanova; import cz.moneta.test.dsl.mobile.smartbanking.home.Sb; import cz.moneta.test.dsl.monetaapiportal.MonetaApiPortal; @@ -287,6 +288,10 @@ public class Harness extends BaseStoreAccessor { return new ImqFirstVision(this); } + public Kafka withKafka() { + return new Kafka(this); + } + private void initGenerators() { addGenerator(RC, new RcGenerator()); addGenerator(FIRST_NAME, new FirstNameGenerator()); diff --git a/tests/src/main/java/cz/moneta/test/dsl/kafka/Kafka.java b/tests/src/main/java/cz/moneta/test/dsl/kafka/Kafka.java new file mode 100644 index 0000000..076eb81 --- /dev/null +++ b/tests/src/main/java/cz/moneta/test/dsl/kafka/Kafka.java @@ -0,0 +1,42 @@ +package cz.moneta.test.dsl.kafka; + +import cz.moneta.test.dsl.Harness; +import cz.moneta.test.harness.endpoints.kafka.KafkaEndpoint; +import cz.moneta.test.harness.support.messaging.kafka.KafkaRequest; + +/** + * Kafka DSL that provides fluent API for Kafka messaging. + *

+ * Usage: + *

{@code
+ * harness.withKafka()
+ *     .toTopic("order-events")
+ *     .withKey("order-123")
+ *     .withPayload("{\"orderId\": \"123\"}")
+ *     .send();
+ * }
+ */ +public class Kafka { + + private final Harness harness; + + public Kafka(Harness harness) { + this.harness = harness; + } + + /** + * Starts a Kafka send operation. + */ + public KafkaRequest.KafkaPayloadPhase toTopic(String topic) { + KafkaEndpoint endpoint = harness.getEndpoint(KafkaEndpoint.class); + return KafkaRequest.builder(endpoint).toTopic(topic); + } + + /** + * Starts a Kafka receive operation. + */ + public KafkaRequest.KafkaReceiveFilterPhase fromTopic(String topic) { + KafkaEndpoint endpoint = harness.getEndpoint(KafkaEndpoint.class); + return KafkaRequest.builder(endpoint).fromTopic(topic); + } +} diff --git a/tests/src/test/java/cz/moneta/test/system/messaging/KafkaTest.java b/tests/src/test/java/cz/moneta/test/system/messaging/KafkaTest.java new file mode 100644 index 0000000..ac65a17 --- /dev/null +++ b/tests/src/test/java/cz/moneta/test/system/messaging/KafkaTest.java @@ -0,0 +1,235 @@ +package cz.moneta.test.system.messaging; + +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import cz.moneta.test.dsl.Harness; +import cz.moneta.test.harness.annotations.TestCase; +import cz.moneta.test.harness.annotations.TestScenario; +import cz.moneta.test.harness.messaging.exception.MessagingSchemaException; +import cz.moneta.test.harness.messaging.exception.MessagingTimeoutException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Disabled; + +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.TimeUnit; + + +@TestScenario(name = "Test Kafka functions") +public class KafkaTest { + + @TestCase(name = "Send message with standard headers") + public void sendWithStandardHeaders(Harness harness) { + harness.withKafka() + .toTopic("order-events") + .withKey("order-123") + .withPayload("{\"orderId\": \"123\", \"status\": \"CREATED\"}") + .withTraceparent("00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01") + .withRequestID("req-001") + .withActivityID("act-555") + .withSourceCodebookId("CB-01") + .send(); + } + + @TestCase(name = "Send message with additional headers") + public void sendWithAdditionalHeaders(Harness harness) { + harness.withKafka() + .toTopic("order-events") + .withKey("order-456") + .withPayload("{\"orderId\": \"456\", \"status\": \"SHIPPED\"}") + .withHeader("customHeader", "customValue") + .send(); + } + + @TestCase(name = "Send message loaded from file") + public void sendLoadedMessage(Harness harness) { + harness.withKafka() + .toTopic("order-events") + .withKey("order-789") + .withPayloadFromFile("system/messaging/order_event.json") + .withRequestID("req-002") + .send(); + } + + @TestCase(name = "Receive message with filter") + public void receiveMessageWithFilter(Harness harness) { + harness.withKafka() + .fromTopic("order-events") + .receiveWhere(msg -> msg.extract("orderId").equals("123")) + .withTimeout(30, TimeUnit.SECONDS) + .andAssertFieldValue("status", "CREATED"); + } + + @TestCase(name = "Send message with added paramaters") + public void sendWithAddedParamaters(Harness harness) { + harness.withKafka() + .toTopic("order-events") + .withKey("order-123") + .withPayload("{}") + .addField("orderId", "555") + .addField("status", "CREATED") + .addField("items", Arrays.asList( + Map.of("sku", "A001", "qty", 2), + Map.of("sku", "B002", "qty", 1))) + .addField("items[1]", "note", "fragile") + .withRequestID("req-001") + .send(); + } + + @TestCase(name = "Send from file with added parameters") + public void sendFromFileWithAddedParamaters(Harness harness) { + harness.withKafka() + .toTopic("order-events") + .withKey("order-456") + .withPayloadFromFile("system/messaging/order_event.json") + .addField("metadata", JsonNodeFactory.instance.objectNode()) + .addField("metadata", "source", "test-harness") + .addField("metadata", "timestamp", System.currentTimeMillis()) + .send(); + } + + @TestCase(name = "Send with added object in array") + public void sendWithAddedObjectsInArray(Harness harness) { + harness.withKafka() + .toTopic("order-events") + .withKey("order-789") + .withPayloadFromFile("system/messaging/order_base.json") + .appendToArray("items", Map.of("sku", "C003", "qty", 5)) + .send(); + } + + @TestCase(name = "Filtering by array value") + public void filteringByArrayValue(Harness harness) { + harness.withKafka() + .fromTopic("order-events") + .receiveWhere(msg -> msg.extract("orderId").equals("123")) + .withTimeout(30, TimeUnit.SECONDS); + } + + @TestCase(name = "Filtering by more values 'AND'") + public void filteringByMoreVlaues(Harness harness) { + harness.withKafka() + .fromTopic("order-events") + .receiveWhere(msg -> + msg.extract("orderId").equals("123") + && msg.extract("status").equals("CREATED")) + .withTimeout(30, TimeUnit.SECONDS); + } + + @TestCase(name = "Filtering by headers") + public void filteringByHeaders(Harness harness) { + harness.withKafka() + .fromTopic("order-events") + .receiveWhere(msg -> "req-001".equals(msg.getHeader("requestID"))) + .withTimeout(30, TimeUnit.SECONDS); + } + + @TestCase(name = "Filtering by key") + public void filteringBykey(Harness harness) { + harness.withKafka() + .fromTopic("order-events") + .receiveWhere(msg -> "order-123".equals(msg.getKey())) + .withTimeout(30, TimeUnit.SECONDS); + } + + @TestCase(name = "Filtering by header and attribute") + public void filteringByHeaderAndAttribute(Harness harness) { + harness.withKafka() + .fromTopic("order-events") + .receiveWhere(msg -> + "act-555".equals(msg.getHeader("activityID")) + && msg.extract("status").equals("CREATED")) + .withTimeout(30, TimeUnit.SECONDS); + } + + @TestCase(name = "First Message") + public void getFirstMessage(Harness harness) { + harness.withKafka() + .fromTopic("order-events") + .receiveWhere(msg -> true) + .withTimeout(10, TimeUnit.SECONDS); + } + + @TestCase(name = "Receive message with timeout") + public void receiveMEssageWithTimeout(Harness harness) { + harness.withKafka() + .fromTopic("order-events") + .receiveWhere(msg -> msg.extract("orderId").equals("123")) + .withTimeout(60, TimeUnit.SECONDS); + } + + @Disabled + @TestCase(name = "Failed to connect") + public void failedToConnect(Harness harness) { + try { + harness.withKafka() + .toTopic("order-events") + .withKey("order-123") + .withPayload("{\"orderId\": \"123\"}") + .send(); + } catch (MessagingSchemaException e) { + // "Failed to connect to Kafka cluster at pkc-xxxxx:9092: Authentication failed" + } + } + + @TestCase(name = "Wrong Schema") + public void wrongSchema(Harness harness) { + try { + harness.withKafka() + .toTopic("nonexistent-topic") + .withKey("key") + .withPayload("{\"unknownField\": \"value\"}") + .send(); + } catch (MessagingSchemaException e) { + // "Schema not found for subject 'nonexistent-topic-value' in Schema Registry" + // "Payload does not conform to Avro schema for topic 'order-events': field 'orderId' is required" + } + } + + @TestCase(name = "Messaging timeout") + public void messageTimeout(Harness harness) { + try { + harness.withKafka() + .fromTopic("order-events") + .receiveWhere(msg -> msg.extract("orderId").equals("nonexistent")) + .withTimeout(5, TimeUnit.SECONDS); + } catch (MessagingTimeoutException e) { + // "No message matching filter found on topic 'order-events' within 5 seconds" + // "No message matching filter found on queue 'PAYMENT.NOTIFICATIONS' within 10 seconds" + } + } + + @Disabled + @TestCase(name = "Ilegal state") + public void ilegalState(Harness harness) { + try { + harness.withKafka() + .toTopic("order-events") + .withKey("key") + .withPayload("{}") + .send(); + } catch (IllegalStateException e) { + // "You need to configure endpoints.kafka.bootstrap-servers" + // Stejný vzor jako u Wso2GatewayEndpoint + } + } + + @TestCase(name = "Assert methods test") + public void assertMethods(Harness harness) { + String text = harness.withKafka() + .fromTopic("order-events") + .receiveWhere(msg -> msg.extract("orderId").equals("555")) + .withTimeout(30, TimeUnit.SECONDS) + // Assert na hodnotu pole (dot/bracket notace) + .andAssertFieldValue("status", "CREATED") + .andAssertFieldValue("items[0].sku", "A001") + // Assert na přítomnost/nepřítomnost pole + .andAssertPresent("items[0].qty") + .andAssertNotPresent("deletedField") + // Assert na Kafka header + .andAssertHeaderValue("requestID", "req-001") + // Extrakce hodnoty pro další použití v testu + .extract("items[1].note").asText(); + Assertions.assertEquals("fragile", text); + } + +}