diff --git a/tests/src/main/java/cz/moneta/test/dsl/Harness.java b/tests/src/main/java/cz/moneta/test/dsl/Harness.java index 8e331c2..55c13ab 100644 --- a/tests/src/main/java/cz/moneta/test/dsl/Harness.java +++ b/tests/src/main/java/cz/moneta/test/dsl/Harness.java @@ -24,6 +24,7 @@ import cz.moneta.test.dsl.hypos.Hypos; import cz.moneta.test.dsl.ib.Ib; import cz.moneta.test.dsl.ilods.Ilods; import cz.moneta.test.dsl.imq.ImqFirstVision; +import cz.moneta.test.dsl.kafka.Kafka; import cz.moneta.test.dsl.kasanova.Kasanova; import cz.moneta.test.dsl.mobile.smartbanking.home.Sb; import cz.moneta.test.dsl.monetaapiportal.MonetaApiPortal; @@ -287,6 +288,10 @@ public class Harness extends BaseStoreAccessor { return new ImqFirstVision(this); } + public Kafka withKafka() { + return new Kafka(this); + } + private void initGenerators() { addGenerator(RC, new RcGenerator()); addGenerator(FIRST_NAME, new FirstNameGenerator()); diff --git a/tests/src/main/java/cz/moneta/test/dsl/kafka/Kafka.java b/tests/src/main/java/cz/moneta/test/dsl/kafka/Kafka.java new file mode 100644 index 0000000..076eb81 --- /dev/null +++ b/tests/src/main/java/cz/moneta/test/dsl/kafka/Kafka.java @@ -0,0 +1,42 @@ +package cz.moneta.test.dsl.kafka; + +import cz.moneta.test.dsl.Harness; +import cz.moneta.test.harness.endpoints.kafka.KafkaEndpoint; +import cz.moneta.test.harness.support.messaging.kafka.KafkaRequest; + +/** + * Kafka DSL that provides fluent API for Kafka messaging. + *
+ * Usage: + *
{@code
+ * harness.withKafka()
+ * .toTopic("order-events")
+ * .withKey("order-123")
+ * .withPayload("{\"orderId\": \"123\"}")
+ * .send();
+ * }
+ */
+public class Kafka {
+
+ private final Harness harness;
+
+ public Kafka(Harness harness) {
+ this.harness = harness;
+ }
+
+ /**
+ * Starts a Kafka send operation.
+ */
+ public KafkaRequest.KafkaPayloadPhase toTopic(String topic) {
+ KafkaEndpoint endpoint = harness.getEndpoint(KafkaEndpoint.class);
+ return KafkaRequest.builder(endpoint).toTopic(topic);
+ }
+
+ /**
+ * Starts a Kafka receive operation.
+ */
+ public KafkaRequest.KafkaReceiveFilterPhase fromTopic(String topic) {
+ KafkaEndpoint endpoint = harness.getEndpoint(KafkaEndpoint.class);
+ return KafkaRequest.builder(endpoint).fromTopic(topic);
+ }
+}
diff --git a/tests/src/test/java/cz/moneta/test/system/messaging/KafkaTest.java b/tests/src/test/java/cz/moneta/test/system/messaging/KafkaTest.java
new file mode 100644
index 0000000..ac65a17
--- /dev/null
+++ b/tests/src/test/java/cz/moneta/test/system/messaging/KafkaTest.java
@@ -0,0 +1,235 @@
+package cz.moneta.test.system.messaging;
+
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import cz.moneta.test.dsl.Harness;
+import cz.moneta.test.harness.annotations.TestCase;
+import cz.moneta.test.harness.annotations.TestScenario;
+import cz.moneta.test.harness.messaging.exception.MessagingSchemaException;
+import cz.moneta.test.harness.messaging.exception.MessagingTimeoutException;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+
+@TestScenario(name = "Test Kafka functions")
+public class KafkaTest {
+
+ @TestCase(name = "Send message with standard headers")
+ public void sendWithStandardHeaders(Harness harness) {
+ harness.withKafka()
+ .toTopic("order-events")
+ .withKey("order-123")
+ .withPayload("{\"orderId\": \"123\", \"status\": \"CREATED\"}")
+ .withTraceparent("00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01")
+ .withRequestID("req-001")
+ .withActivityID("act-555")
+ .withSourceCodebookId("CB-01")
+ .send();
+ }
+
+ @TestCase(name = "Send message with additional headers")
+ public void sendWithAdditionalHeaders(Harness harness) {
+ harness.withKafka()
+ .toTopic("order-events")
+ .withKey("order-456")
+ .withPayload("{\"orderId\": \"456\", \"status\": \"SHIPPED\"}")
+ .withHeader("customHeader", "customValue")
+ .send();
+ }
+
+ @TestCase(name = "Send message loaded from file")
+ public void sendLoadedMessage(Harness harness) {
+ harness.withKafka()
+ .toTopic("order-events")
+ .withKey("order-789")
+ .withPayloadFromFile("system/messaging/order_event.json")
+ .withRequestID("req-002")
+ .send();
+ }
+
+ @TestCase(name = "Receive message with filter")
+ public void receiveMessageWithFilter(Harness harness) {
+ harness.withKafka()
+ .fromTopic("order-events")
+ .receiveWhere(msg -> msg.extract("orderId").equals("123"))
+ .withTimeout(30, TimeUnit.SECONDS)
+ .andAssertFieldValue("status", "CREATED");
+ }
+
+ @TestCase(name = "Send message with added paramaters")
+ public void sendWithAddedParamaters(Harness harness) {
+ harness.withKafka()
+ .toTopic("order-events")
+ .withKey("order-123")
+ .withPayload("{}")
+ .addField("orderId", "555")
+ .addField("status", "CREATED")
+ .addField("items", Arrays.asList(
+ Map.of("sku", "A001", "qty", 2),
+ Map.of("sku", "B002", "qty", 1)))
+ .addField("items[1]", "note", "fragile")
+ .withRequestID("req-001")
+ .send();
+ }
+
+ @TestCase(name = "Send from file with added parameters")
+ public void sendFromFileWithAddedParamaters(Harness harness) {
+ harness.withKafka()
+ .toTopic("order-events")
+ .withKey("order-456")
+ .withPayloadFromFile("system/messaging/order_event.json")
+ .addField("metadata", JsonNodeFactory.instance.objectNode())
+ .addField("metadata", "source", "test-harness")
+ .addField("metadata", "timestamp", System.currentTimeMillis())
+ .send();
+ }
+
+ @TestCase(name = "Send with added object in array")
+ public void sendWithAddedObjectsInArray(Harness harness) {
+ harness.withKafka()
+ .toTopic("order-events")
+ .withKey("order-789")
+ .withPayloadFromFile("system/messaging/order_base.json")
+ .appendToArray("items", Map.of("sku", "C003", "qty", 5))
+ .send();
+ }
+
+ @TestCase(name = "Filtering by array value")
+ public void filteringByArrayValue(Harness harness) {
+ harness.withKafka()
+ .fromTopic("order-events")
+ .receiveWhere(msg -> msg.extract("orderId").equals("123"))
+ .withTimeout(30, TimeUnit.SECONDS);
+ }
+
+ @TestCase(name = "Filtering by more values 'AND'")
+ public void filteringByMoreVlaues(Harness harness) {
+ harness.withKafka()
+ .fromTopic("order-events")
+ .receiveWhere(msg ->
+ msg.extract("orderId").equals("123")
+ && msg.extract("status").equals("CREATED"))
+ .withTimeout(30, TimeUnit.SECONDS);
+ }
+
+ @TestCase(name = "Filtering by headers")
+ public void filteringByHeaders(Harness harness) {
+ harness.withKafka()
+ .fromTopic("order-events")
+ .receiveWhere(msg -> "req-001".equals(msg.getHeader("requestID")))
+ .withTimeout(30, TimeUnit.SECONDS);
+ }
+
+ @TestCase(name = "Filtering by key")
+ public void filteringBykey(Harness harness) {
+ harness.withKafka()
+ .fromTopic("order-events")
+ .receiveWhere(msg -> "order-123".equals(msg.getKey()))
+ .withTimeout(30, TimeUnit.SECONDS);
+ }
+
+ @TestCase(name = "Filtering by header and attribute")
+ public void filteringByHeaderAndAttribute(Harness harness) {
+ harness.withKafka()
+ .fromTopic("order-events")
+ .receiveWhere(msg ->
+ "act-555".equals(msg.getHeader("activityID"))
+ && msg.extract("status").equals("CREATED"))
+ .withTimeout(30, TimeUnit.SECONDS);
+ }
+
+ @TestCase(name = "First Message")
+ public void getFirstMessage(Harness harness) {
+ harness.withKafka()
+ .fromTopic("order-events")
+ .receiveWhere(msg -> true)
+ .withTimeout(10, TimeUnit.SECONDS);
+ }
+
+ @TestCase(name = "Receive message with timeout")
+ public void receiveMEssageWithTimeout(Harness harness) {
+ harness.withKafka()
+ .fromTopic("order-events")
+ .receiveWhere(msg -> msg.extract("orderId").equals("123"))
+ .withTimeout(60, TimeUnit.SECONDS);
+ }
+
+ @Disabled
+ @TestCase(name = "Failed to connect")
+ public void failedToConnect(Harness harness) {
+ try {
+ harness.withKafka()
+ .toTopic("order-events")
+ .withKey("order-123")
+ .withPayload("{\"orderId\": \"123\"}")
+ .send();
+ } catch (MessagingSchemaException e) {
+ // "Failed to connect to Kafka cluster at pkc-xxxxx:9092: Authentication failed"
+ }
+ }
+
+ @TestCase(name = "Wrong Schema")
+ public void wrongSchema(Harness harness) {
+ try {
+ harness.withKafka()
+ .toTopic("nonexistent-topic")
+ .withKey("key")
+ .withPayload("{\"unknownField\": \"value\"}")
+ .send();
+ } catch (MessagingSchemaException e) {
+ // "Schema not found for subject 'nonexistent-topic-value' in Schema Registry"
+ // "Payload does not conform to Avro schema for topic 'order-events': field 'orderId' is required"
+ }
+ }
+
+ @TestCase(name = "Messaging timeout")
+ public void messageTimeout(Harness harness) {
+ try {
+ harness.withKafka()
+ .fromTopic("order-events")
+ .receiveWhere(msg -> msg.extract("orderId").equals("nonexistent"))
+ .withTimeout(5, TimeUnit.SECONDS);
+ } catch (MessagingTimeoutException e) {
+ // "No message matching filter found on topic 'order-events' within 5 seconds"
+ // "No message matching filter found on queue 'PAYMENT.NOTIFICATIONS' within 10 seconds"
+ }
+ }
+
+ @Disabled
+ @TestCase(name = "Ilegal state")
+ public void ilegalState(Harness harness) {
+ try {
+ harness.withKafka()
+ .toTopic("order-events")
+ .withKey("key")
+ .withPayload("{}")
+ .send();
+ } catch (IllegalStateException e) {
+ // "You need to configure endpoints.kafka.bootstrap-servers"
+ // Stejný vzor jako u Wso2GatewayEndpoint
+ }
+ }
+
+ @TestCase(name = "Assert methods test")
+ public void assertMethods(Harness harness) {
+ String text = harness.withKafka()
+ .fromTopic("order-events")
+ .receiveWhere(msg -> msg.extract("orderId").equals("555"))
+ .withTimeout(30, TimeUnit.SECONDS)
+ // Assert na hodnotu pole (dot/bracket notace)
+ .andAssertFieldValue("status", "CREATED")
+ .andAssertFieldValue("items[0].sku", "A001")
+ // Assert na přítomnost/nepřítomnost pole
+ .andAssertPresent("items[0].qty")
+ .andAssertNotPresent("deletedField")
+ // Assert na Kafka header
+ .andAssertHeaderValue("requestID", "req-001")
+ // Extrakce hodnoty pro další použití v testu
+ .extract("items[1].note").asText();
+ Assertions.assertEquals("fragile", text);
+ }
+
+}