added kafka to tests

This commit is contained in:
Radek Davidek 2026-03-26 10:28:48 +01:00
parent b94a071fda
commit 70c4540c4a
3 changed files with 282 additions and 0 deletions

View File

@ -24,6 +24,7 @@ import cz.moneta.test.dsl.hypos.Hypos;
import cz.moneta.test.dsl.ib.Ib;
import cz.moneta.test.dsl.ilods.Ilods;
import cz.moneta.test.dsl.imq.ImqFirstVision;
import cz.moneta.test.dsl.kafka.Kafka;
import cz.moneta.test.dsl.kasanova.Kasanova;
import cz.moneta.test.dsl.mobile.smartbanking.home.Sb;
import cz.moneta.test.dsl.monetaapiportal.MonetaApiPortal;
@ -287,6 +288,10 @@ public class Harness extends BaseStoreAccessor {
return new ImqFirstVision(this);
}
public Kafka withKafka() {
return new Kafka(this);
}
private void initGenerators() {
addGenerator(RC, new RcGenerator());
addGenerator(FIRST_NAME, new FirstNameGenerator());

View File

@ -0,0 +1,42 @@
package cz.moneta.test.dsl.kafka;
import cz.moneta.test.dsl.Harness;
import cz.moneta.test.harness.endpoints.kafka.KafkaEndpoint;
import cz.moneta.test.harness.support.messaging.kafka.KafkaRequest;
/**
* Kafka DSL that provides fluent API for Kafka messaging.
* <p>
* Usage:
* <pre>{@code
* harness.withKafka()
* .toTopic("order-events")
* .withKey("order-123")
* .withPayload("{\"orderId\": \"123\"}")
* .send();
* }</pre>
*/
public class Kafka {
private final Harness harness;
public Kafka(Harness harness) {
this.harness = harness;
}
/**
* Starts a Kafka send operation.
*/
public KafkaRequest.KafkaPayloadPhase toTopic(String topic) {
KafkaEndpoint endpoint = harness.getEndpoint(KafkaEndpoint.class);
return KafkaRequest.builder(endpoint).toTopic(topic);
}
/**
* Starts a Kafka receive operation.
*/
public KafkaRequest.KafkaReceiveFilterPhase fromTopic(String topic) {
KafkaEndpoint endpoint = harness.getEndpoint(KafkaEndpoint.class);
return KafkaRequest.builder(endpoint).fromTopic(topic);
}
}

View File

@ -0,0 +1,235 @@
package cz.moneta.test.system.messaging;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import cz.moneta.test.dsl.Harness;
import cz.moneta.test.harness.annotations.TestCase;
import cz.moneta.test.harness.annotations.TestScenario;
import cz.moneta.test.harness.messaging.exception.MessagingSchemaException;
import cz.moneta.test.harness.messaging.exception.MessagingTimeoutException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@TestScenario(name = "Test Kafka functions")
public class KafkaTest {
@TestCase(name = "Send message with standard headers")
public void sendWithStandardHeaders(Harness harness) {
harness.withKafka()
.toTopic("order-events")
.withKey("order-123")
.withPayload("{\"orderId\": \"123\", \"status\": \"CREATED\"}")
.withTraceparent("00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01")
.withRequestID("req-001")
.withActivityID("act-555")
.withSourceCodebookId("CB-01")
.send();
}
@TestCase(name = "Send message with additional headers")
public void sendWithAdditionalHeaders(Harness harness) {
harness.withKafka()
.toTopic("order-events")
.withKey("order-456")
.withPayload("{\"orderId\": \"456\", \"status\": \"SHIPPED\"}")
.withHeader("customHeader", "customValue")
.send();
}
@TestCase(name = "Send message loaded from file")
public void sendLoadedMessage(Harness harness) {
harness.withKafka()
.toTopic("order-events")
.withKey("order-789")
.withPayloadFromFile("system/messaging/order_event.json")
.withRequestID("req-002")
.send();
}
@TestCase(name = "Receive message with filter")
public void receiveMessageWithFilter(Harness harness) {
harness.withKafka()
.fromTopic("order-events")
.receiveWhere(msg -> msg.extract("orderId").equals("123"))
.withTimeout(30, TimeUnit.SECONDS)
.andAssertFieldValue("status", "CREATED");
}
@TestCase(name = "Send message with added paramaters")
public void sendWithAddedParamaters(Harness harness) {
harness.withKafka()
.toTopic("order-events")
.withKey("order-123")
.withPayload("{}")
.addField("orderId", "555")
.addField("status", "CREATED")
.addField("items", Arrays.asList(
Map.of("sku", "A001", "qty", 2),
Map.of("sku", "B002", "qty", 1)))
.addField("items[1]", "note", "fragile")
.withRequestID("req-001")
.send();
}
@TestCase(name = "Send from file with added parameters")
public void sendFromFileWithAddedParamaters(Harness harness) {
harness.withKafka()
.toTopic("order-events")
.withKey("order-456")
.withPayloadFromFile("system/messaging/order_event.json")
.addField("metadata", JsonNodeFactory.instance.objectNode())
.addField("metadata", "source", "test-harness")
.addField("metadata", "timestamp", System.currentTimeMillis())
.send();
}
@TestCase(name = "Send with added object in array")
public void sendWithAddedObjectsInArray(Harness harness) {
harness.withKafka()
.toTopic("order-events")
.withKey("order-789")
.withPayloadFromFile("system/messaging/order_base.json")
.appendToArray("items", Map.of("sku", "C003", "qty", 5))
.send();
}
@TestCase(name = "Filtering by array value")
public void filteringByArrayValue(Harness harness) {
harness.withKafka()
.fromTopic("order-events")
.receiveWhere(msg -> msg.extract("orderId").equals("123"))
.withTimeout(30, TimeUnit.SECONDS);
}
@TestCase(name = "Filtering by more values 'AND'")
public void filteringByMoreVlaues(Harness harness) {
harness.withKafka()
.fromTopic("order-events")
.receiveWhere(msg ->
msg.extract("orderId").equals("123")
&& msg.extract("status").equals("CREATED"))
.withTimeout(30, TimeUnit.SECONDS);
}
@TestCase(name = "Filtering by headers")
public void filteringByHeaders(Harness harness) {
harness.withKafka()
.fromTopic("order-events")
.receiveWhere(msg -> "req-001".equals(msg.getHeader("requestID")))
.withTimeout(30, TimeUnit.SECONDS);
}
@TestCase(name = "Filtering by key")
public void filteringBykey(Harness harness) {
harness.withKafka()
.fromTopic("order-events")
.receiveWhere(msg -> "order-123".equals(msg.getKey()))
.withTimeout(30, TimeUnit.SECONDS);
}
@TestCase(name = "Filtering by header and attribute")
public void filteringByHeaderAndAttribute(Harness harness) {
harness.withKafka()
.fromTopic("order-events")
.receiveWhere(msg ->
"act-555".equals(msg.getHeader("activityID"))
&& msg.extract("status").equals("CREATED"))
.withTimeout(30, TimeUnit.SECONDS);
}
@TestCase(name = "First Message")
public void getFirstMessage(Harness harness) {
harness.withKafka()
.fromTopic("order-events")
.receiveWhere(msg -> true)
.withTimeout(10, TimeUnit.SECONDS);
}
@TestCase(name = "Receive message with timeout")
public void receiveMEssageWithTimeout(Harness harness) {
harness.withKafka()
.fromTopic("order-events")
.receiveWhere(msg -> msg.extract("orderId").equals("123"))
.withTimeout(60, TimeUnit.SECONDS);
}
@Disabled
@TestCase(name = "Failed to connect")
public void failedToConnect(Harness harness) {
try {
harness.withKafka()
.toTopic("order-events")
.withKey("order-123")
.withPayload("{\"orderId\": \"123\"}")
.send();
} catch (MessagingSchemaException e) {
// "Failed to connect to Kafka cluster at pkc-xxxxx:9092: Authentication failed"
}
}
@TestCase(name = "Wrong Schema")
public void wrongSchema(Harness harness) {
try {
harness.withKafka()
.toTopic("nonexistent-topic")
.withKey("key")
.withPayload("{\"unknownField\": \"value\"}")
.send();
} catch (MessagingSchemaException e) {
// "Schema not found for subject 'nonexistent-topic-value' in Schema Registry"
// "Payload does not conform to Avro schema for topic 'order-events': field 'orderId' is required"
}
}
@TestCase(name = "Messaging timeout")
public void messageTimeout(Harness harness) {
try {
harness.withKafka()
.fromTopic("order-events")
.receiveWhere(msg -> msg.extract("orderId").equals("nonexistent"))
.withTimeout(5, TimeUnit.SECONDS);
} catch (MessagingTimeoutException e) {
// "No message matching filter found on topic 'order-events' within 5 seconds"
// "No message matching filter found on queue 'PAYMENT.NOTIFICATIONS' within 10 seconds"
}
}
@Disabled
@TestCase(name = "Ilegal state")
public void ilegalState(Harness harness) {
try {
harness.withKafka()
.toTopic("order-events")
.withKey("key")
.withPayload("{}")
.send();
} catch (IllegalStateException e) {
// "You need to configure endpoints.kafka.bootstrap-servers"
// Stejný vzor jako u Wso2GatewayEndpoint
}
}
@TestCase(name = "Assert methods test")
public void assertMethods(Harness harness) {
String text = harness.withKafka()
.fromTopic("order-events")
.receiveWhere(msg -> msg.extract("orderId").equals("555"))
.withTimeout(30, TimeUnit.SECONDS)
// Assert na hodnotu pole (dot/bracket notace)
.andAssertFieldValue("status", "CREATED")
.andAssertFieldValue("items[0].sku", "A001")
// Assert na přítomnost/nepřítomnost pole
.andAssertPresent("items[0].qty")
.andAssertNotPresent("deletedField")
// Assert na Kafka header
.andAssertHeaderValue("requestID", "req-001")
// Extrakce hodnoty pro další použití v testu
.extract("items[1].note").asText();
Assertions.assertEquals("fragile", text);
}
}