From b94a071fda2451b8d628783c43116308a25caf08 Mon Sep 17 00:00:00 2001 From: Radek Davidek Date: Thu, 26 Mar 2026 10:21:48 +0100 Subject: [PATCH] added kafka --- test-harness/pom.xml | 29 + .../messaging/JsonToAvroConverter.java | 115 ++++ .../connectors/messaging/KafkaConnector.java | 348 +++++++++++ .../endpoints/kafka/KafkaEndpoint.java | 199 ++++++ .../MessagingConnectionException.java | 6 +- .../MessagingDestinationException.java | 7 +- .../exception/MessagingException.java | 7 +- .../exception/MessagingSchemaException.java | 8 +- .../exception/MessagingTimeoutException.java | 7 +- .../support/messaging/kafka/KafkaRequest.java | 568 ++++++++++++++++++ .../messaging/kafka/MessageContentType.java | 19 + .../messaging/kafka/MessageResponse.java | 139 +++++ .../messaging/kafka/ReceivedMessage.java | 365 +++++++++++ 13 files changed, 1811 insertions(+), 6 deletions(-) create mode 100644 test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/JsonToAvroConverter.java create mode 100644 test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/KafkaConnector.java create mode 100644 test-harness/src/main/java/cz/moneta/test/harness/endpoints/kafka/KafkaEndpoint.java create mode 100644 test-harness/src/main/java/cz/moneta/test/harness/support/messaging/kafka/KafkaRequest.java create mode 100644 test-harness/src/main/java/cz/moneta/test/harness/support/messaging/kafka/MessageContentType.java create mode 100644 test-harness/src/main/java/cz/moneta/test/harness/support/messaging/kafka/MessageResponse.java create mode 100644 test-harness/src/main/java/cz/moneta/test/harness/support/messaging/kafka/ReceivedMessage.java diff --git a/test-harness/pom.xml b/test-harness/pom.xml index 886b8c2..c24931f 100644 --- a/test-harness/pom.xml +++ b/test-harness/pom.xml @@ -31,6 +31,9 @@ 4.0.3 9.4.5.0 2.0.1 + 3.7.0 + 7.6.0 + 3.24.2 @@ -299,6 +302,32 @@ ${javax.jms.version} + + + org.apache.kafka + kafka-clients + ${kafka.clients.version} + + + + io.confluent + kafka-avro-serializer + ${confluent.version} + + + + org.apache.avro + avro + 1.11.3 + + + + + org.assertj + assertj-core + ${assertj.version} + + javax.servlet diff --git a/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/JsonToAvroConverter.java b/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/JsonToAvroConverter.java new file mode 100644 index 0000000..1f9f093 --- /dev/null +++ b/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/JsonToAvroConverter.java @@ -0,0 +1,115 @@ +package cz.moneta.test.harness.connectors.messaging; + +import com.google.gson.*; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.JsonDecoder; + +import java.util.ArrayList; +import java.util.List; + +public class JsonToAvroConverter { + + protected static GenericRecord processJson(String json, Schema schema) throws IllegalArgumentException, JsonSchemaException { + GenericRecord result = (GenericRecord) jsonElementToAvro(JsonParser.parseString(json), schema); + return result; + } + + private static Object jsonElementToAvro(JsonElement element, Schema schema) throws JsonSchemaException { + boolean schemaIsNullable = isNullable(schema); + if (schemaIsNullable) { + schema = typeFromNullable(schema); + } + if (element == null || element.isJsonNull()) { + if (!schemaIsNullable) { + throw new JsonSchemaException("The element is not nullable in Avro schema."); + } + return null; + } else if (element.isJsonObject()) { + if (schema.getType() != Schema.Type.RECORD) { + throw new JsonSchemaException( + String.format("The element `%s` doesn't match Avro type RECORD", element)); + } + return jsonObjectToAvro(element.getAsJsonObject(), schema); + } else if (element.isJsonArray()) { + if (schema.getType() != Schema.Type.ARRAY) { + throw new JsonSchemaException( + String.format("The element `%s` doesn't match Avro type ARRAY", element)); + } + JsonArray jsonArray = element.getAsJsonArray(); + List avroArray = new ArrayList<>(jsonArray.size()); + for (JsonElement e : element.getAsJsonArray()) { + avroArray.add(jsonElementToAvro(e, schema.getElementType())); + } + return avroArray; + } else if (element.isJsonPrimitive()) { + return jsonPrimitiveToAvro(element.getAsJsonPrimitive(), schema); + } else { + throw new JsonSchemaException( + String.format( + "The Json element `%s` is of an unknown class %s", element, element.getClass())); + } + } + + private static GenericRecord jsonObjectToAvro(JsonObject jsonObject, Schema schema) throws JsonSchemaException { + GenericRecord avroRecord = new GenericData.Record(schema); + + for (Schema.Field field : schema.getFields()) { + avroRecord.put(field.name(), jsonElementToAvro(jsonObject.get(field.name()), field.schema())); + } + return avroRecord; + } + + private static boolean isNullable(Schema type) { + return type.getType() == Schema.Type.NULL + || type.getType() == Schema.Type.UNION + && type.getTypes().stream().anyMatch(JsonToAvroConverter::isNullable); + } + + private static Schema typeFromNullable(Schema type) { + if (type.getType() == Schema.Type.UNION) { + return typeFromNullable( + type.getTypes().stream() + .filter(t -> t.getType() != Schema.Type.NULL) + .findFirst() + .orElseThrow( + () -> + new IllegalStateException( + String.format("Type `%s` should have a non null subtype", type)))); + } + return type; + } + + private static Object jsonPrimitiveToAvro(JsonPrimitive primitive, Schema schema){ + switch (schema.getType()) { + case NULL: + return null; + case STRING: + return primitive.getAsString(); + case BOOLEAN: + return primitive.getAsBoolean(); + case INT: + return primitive.getAsInt(); + case LONG: + return primitive.getAsLong(); + case FLOAT: + return primitive.getAsFloat(); + case DOUBLE: + return primitive.getAsDouble(); + default: + return primitive.getAsString(); + } + } + + private static class JsonSchemaException extends Exception { + JsonSchemaException(String message) { + super(message); + } + } +} diff --git a/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/KafkaConnector.java b/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/KafkaConnector.java new file mode 100644 index 0000000..7ee7a3c --- /dev/null +++ b/test-harness/src/main/java/cz/moneta/test/harness/connectors/messaging/KafkaConnector.java @@ -0,0 +1,348 @@ +package cz.moneta.test.harness.connectors.messaging; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.function.Predicate; + +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import cz.moneta.test.harness.messaging.exception.MessagingConnectionException; +import cz.moneta.test.harness.messaging.exception.MessagingDestinationException; +import cz.moneta.test.harness.messaging.exception.MessagingSchemaException; +import cz.moneta.test.harness.messaging.exception.MessagingTimeoutException; +import cz.moneta.test.harness.support.messaging.kafka.MessageContentType; +import cz.moneta.test.harness.support.messaging.kafka.ReceivedMessage; +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import io.confluent.kafka.serializers.KafkaAvroSerializer; + +/** + * Kafka connector for sending and receiving messages. + * Supports Avro serialization with Confluent Schema Registry. + *

+ * Uses manual partition assignment (no consumer group) for test isolation. + * Each receive operation creates a new consumer to avoid offset sharing. + */ +public class KafkaConnector implements cz.moneta.test.harness.connectors.Connector { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaConnector.class); + + private final Properties producerConfig; + private final Properties consumerConfig; + private final String schemaRegistryUrl; + private final CachedSchemaRegistryClient schemaRegistryClient; + private KafkaProducer producer; + + /** + * Creates a new KafkaConnector. + * + * @param bootstrapServers Kafka bootstrap servers + * @param apiKey Kafka API key + * @param apiSecret Kafka API secret + * @param schemaRegistryUrl Schema Registry URL + * @param schemaRegistryApiKey Schema Registry API key + * @param schemaRegistryApiSecret Schema Registry API secret + */ + public KafkaConnector(String bootstrapServers, + String apiKey, + String apiSecret, + String schemaRegistryUrl, + String schemaRegistryApiKey, + String schemaRegistryApiSecret) { + this.schemaRegistryUrl = schemaRegistryUrl; + this.schemaRegistryClient = new CachedSchemaRegistryClient( + Collections.singletonList(schemaRegistryUrl), 100, new HashMap<>()); + + this.producerConfig = createProducerConfig(bootstrapServers, apiKey, apiSecret); + this.consumerConfig = createConsumerConfig(bootstrapServers, schemaRegistryApiKey, schemaRegistryApiSecret); + } + + /** + * Creates producer configuration. + */ + private Properties createProducerConfig(String bootstrapServers, String apiKey, String apiSecret) { + Properties config = new Properties(); + config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); + config.put("schema.registry.url", schemaRegistryUrl); + config.put(ProducerConfig.ACKS_CONFIG, "all"); + config.put(ProducerConfig.LINGER_MS_CONFIG, 1); + config.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); + +// SASL/PLAIN authentication +// config.put("security.protocol", "SASL_SSL"); +// config.put("sasl.mechanism", "PLAIN"); +// config.put("sasl.jaas.config", +// "org.apache.kafka.common.security.plain.PlainLoginModule required " + +// "username=\"" + apiKey + "\" password=\"" + apiSecret + "\";"); + + // SSL configuration +// config.put("ssl.endpoint.identification.algorithm", "https"); + + return config; + } + + /** + * Creates consumer configuration. + */ + private Properties createConsumerConfig(String bootstrapServers, String apiKey, String apiSecret) { + Properties config = new Properties(); + config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class); + config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); + config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + + // SASL/PLAIN authentication + config.put("security.protocol", "SASL_SSL"); + config.put("sasl.mechanism", "PLAIN"); + config.put("sasl.jaas.config", + "org.apache.kafka.common.security.plain.PlainLoginModule required " + + "username=\"" + apiKey + "\" password=\"" + apiSecret + "\";"); + + // Schema Registry for deserialization + config.put("schema.registry.url", schemaRegistryUrl); + config.put("specific.avro.reader", false); + + // SSL configuration + config.put("ssl.endpoint.identification.algorithm", "https"); + + return config; + } + + /** + * Sends a message to a Kafka topic. + */ + public void send(String topic, String key, String jsonPayload, Map headers) { + try { + org.apache.avro.Schema schema = getSchemaForTopic(topic); + GenericRecord record = jsonToAvro(jsonPayload, schema); + + ProducerRecord producerRecord = + new ProducerRecord<>(topic, key, record); + + // Add headers + if (headers != null) { + Headers kafkaHeaders = producerRecord.headers(); + headers.forEach((k, v) -> + kafkaHeaders.add(k, v.getBytes(StandardCharsets.UTF_8))); + } + + // Send and wait for confirmation + getProducer().send(producerRecord, (metadata, exception) -> { + if (exception != null) { + LOG.error("Failed to send message", exception); + } else { + LOG.debug("Message sent to topic {} partition {} offset {}", + metadata.topic(), metadata.partition(), metadata.offset()); + } + }).get(10, java.util.concurrent.TimeUnit.SECONDS); + + } catch (ExecutionException e) { + throw new MessagingConnectionException( + "Failed to send message to topic " + topic, e.getCause()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new MessagingConnectionException( + "Interrupted while sending message to topic " + topic, e); + } catch (MessagingSchemaException e) { + throw e; + } catch (Exception e) { + throw new MessagingSchemaException( + "Failed to convert JSON to Avro for topic " + topic, e); + } + } + + /** + * Receives a message from a Kafka topic matching the filter. + */ + public List receive(String topic, + Predicate filter, + Duration timeout) { + KafkaConsumer consumer = null; + try { + consumer = new KafkaConsumer<>(consumerConfig); + + // Get partitions for the topic + List partitions = getPartitionsForTopic(consumer, topic); + if (partitions.isEmpty()) { + throw new MessagingDestinationException( + "Topic '" + topic + "' does not exist or has no partitions"); + } + + // Assign partitions and seek to end + consumer.assign(partitions); +// consumer.seekToBeginning(partitions); + consumer.seekToBeginning(partitions); + + // Poll loop with exponential backoff + long startTime = System.currentTimeMillis(); + Duration pollInterval = Duration.ofMillis(100); + Duration maxPollInterval = Duration.ofSeconds(1); + + while (Duration.ofMillis(System.currentTimeMillis() - startTime).compareTo(timeout) < 0) { + ConsumerRecords records = consumer.poll(pollInterval); + + for (ConsumerRecord record : records) { + ReceivedMessage message = convertToReceivedMessage(record, topic); + if (filter.test(message)) { + LOG.debug("Found matching message on topic {} partition {} offset {}", + record.topic(), record.partition(), record.offset()); + return Collections.singletonList(message); + } + } + + // Exponential backoff + pollInterval = Duration.ofMillis( + Math.min(pollInterval.toMillis() * 2, maxPollInterval.toMillis())); + } + + throw new MessagingTimeoutException( + "No message matching filter found on topic '" + topic + "' within " + timeout); + + } finally { + if (consumer != null) { + consumer.close(); + } + } + } + + /** + * Gets partitions for a topic. + */ + private List getPartitionsForTopic(KafkaConsumer consumer, String topic) { + List partitions = new ArrayList<>(); + List partitionInfos = consumer.partitionsFor(topic); + if (partitionInfos != null) { + for (org.apache.kafka.common.PartitionInfo partitionInfo : partitionInfos) { + partitions.add(new TopicPartition(topic, partitionInfo.partition())); + } + } + return partitions; + } + + /** + * Saves current offsets for a topic. + */ + public Map saveOffsets(String topic) { + return new HashMap<>(); + } + + /** + * Closes the connector and releases resources. + */ + @Override + public void close() { + if (producer != null) { + producer.close(); + } + } + + /** + * Gets or creates the producer (singleton, thread-safe). + */ + private KafkaProducer getProducer() { + if (producer == null) { + synchronized (this) { + if (producer == null) { + producer = new KafkaProducer<>(producerConfig); + } + } + } + return producer; + } + + /** + * Retrieves schema from Schema Registry based on topic name. + */ + private org.apache.avro.Schema getSchemaForTopic(String topic) { + String subject = topic + "-value"; + try { + io.confluent.kafka.schemaregistry.client.SchemaMetadata metadata = + schemaRegistryClient.getLatestSchemaMetadata(subject); + return new org.apache.avro.Schema.Parser().parse(metadata.getSchema()); + } catch (Exception e) { + if (e.getMessage() != null && e.getMessage().contains("404")) { + throw new MessagingSchemaException( + "Schema not found for subject '" + subject + "' in Schema Registry. " + + "Make sure the topic exists and schema is registered."); + } + throw new MessagingSchemaException( + "Failed to retrieve schema for topic '" + topic + "'", e); + } + } + + /** + * Converts JSON string to Avro GenericRecord. + */ + private GenericRecord jsonToAvro(String json, org.apache.avro.Schema schema) { + try { + GenericRecord genericRecord = JsonToAvroConverter.processJson(json, schema); + return genericRecord; + } catch (Exception e) { + throw new MessagingSchemaException("Failed to convert JSON to Avro: " + e.getMessage(), e); + } + } + + /** + * Converts Kafka ConsumerRecord to ReceivedMessage. + */ + private ReceivedMessage convertToReceivedMessage(ConsumerRecord record, String topic) { + try { + String jsonBody = avroToJson(record.value()); + + Map headers = new HashMap<>(); + Headers kafkaHeaders = record.headers(); + for (org.apache.kafka.common.header.Header header : kafkaHeaders) { + if (header.value() != null) { + headers.put(header.key(), new String(header.value(), StandardCharsets.UTF_8)); + } + } + + return ReceivedMessage.builder() + .body(jsonBody) + .contentType(MessageContentType.JSON) + .headers(headers) + .timestamp(record.timestamp()) + .source(topic) + .key(record.key()) + .build(); + + } catch (Exception e) { + LOG.error("Failed to convert Avro record to ReceivedMessage", e); + throw new RuntimeException("Failed to convert message", e); + } + } + + /** + * Converts Avro GenericRecord to JSON string. + */ + private String avroToJson(GenericRecord record) { + try { + return record.toString(); + } catch (Exception e) { + throw new RuntimeException("Failed to convert Avro to JSON: " + e.getMessage(), e); + } + } +} diff --git a/test-harness/src/main/java/cz/moneta/test/harness/endpoints/kafka/KafkaEndpoint.java b/test-harness/src/main/java/cz/moneta/test/harness/endpoints/kafka/KafkaEndpoint.java new file mode 100644 index 0000000..c49f6d1 --- /dev/null +++ b/test-harness/src/main/java/cz/moneta/test/harness/endpoints/kafka/KafkaEndpoint.java @@ -0,0 +1,199 @@ +package cz.moneta.test.harness.endpoints.kafka; + +import cz.moneta.test.harness.connectors.messaging.KafkaConnector; +import cz.moneta.test.harness.context.BaseStoreAccessor; +import cz.moneta.test.harness.context.StoreAccessor; +import cz.moneta.test.harness.endpoints.Endpoint; +import cz.moneta.test.harness.messaging.exception.MessagingConnectionException; +import cz.moneta.test.harness.support.messaging.kafka.ReceivedMessage; +import com.bettercloud.vault.Vault; +import com.bettercloud.vault.VaultConfig; +import com.bettercloud.vault.VaultException; +import com.bettercloud.vault.response.LogicalResponse; + +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * Kafka endpoint that provides high-level API for Kafka messaging. + *

+ * Reads configuration from StoreAccessor and credentials from HashiCorp Vault. + * Implements singleton pattern per test class (managed by Harness). + *

+ */ +public class KafkaEndpoint implements Endpoint { + + private final KafkaConnector connector; + private final StoreAccessor store; + + /** + * Creates a new KafkaEndpoint. + *

+ * Configuration is read from StoreAccessor: + * - endpoints.kafka.bootstrap-servers + * - endpoints.kafka.schema-registry-url + *

+ * Credentials are retrieved from Vault: + * - vault.kafka.secrets.path -> apiKey, apiSecret + *

+ */ + public KafkaEndpoint(StoreAccessor store) { + this.store = store; + + // Read configuration + String bootstrapServers = store.getConfig("endpoints.kafka.bootstrap-servers"); + if (bootstrapServers == null) { + throw new IllegalStateException("You need to configure " + bootstrapServers + " to work with Kafka"); + } + + String schemaRegistryUrl = store.getConfig("endpoints.kafka.schema-registry-url"); + if (schemaRegistryUrl == null) { + throw new IllegalStateException("You need to configure " + schemaRegistryUrl + " url to work with Kafka"); + } + + // Retrieve credentials from Vault + String vaultPath = store.getConfig("vault.kafka.secrets.path"); + if (vaultPath == null) { + throw new IllegalStateException( + "You need to configure vault.kafka.secrets.path"); + } +// + String apiKey = getVaultValue(vaultPath, "apiKey"); + String apiSecret = getVaultValue(vaultPath, "apiSecret"); + String schemaRegistryApiKey = getVaultValue(vaultPath, "schemaRegistryApiKey"); + String schemaRegistryApiSecret = getVaultValue(vaultPath, "schemaRegistryApiSecret"); + + // Create connector + this.connector = new KafkaConnector( + bootstrapServers, + apiKey, + apiSecret, + schemaRegistryUrl, + schemaRegistryApiKey, + schemaRegistryApiSecret + ); + } + + /** + * Sends a message to a Kafka topic. + * + * @param topic Kafka topic name + * @param key Message key + * @param jsonPayload Message body as JSON string + * @param headers Kafka headers (traceparent, requestID, activityID, etc.) + */ + public void send(String topic, String key, String jsonPayload, Map headers) { + connector.send(topic, key, jsonPayload, headers); + } + + /** + * Receives a message from a Kafka topic matching the filter. + * + * @param topic Kafka topic name + * @param filter Predicate to filter messages + * @param timeout Maximum time to wait for a message + * @return First matching ReceivedMessage + */ + public List receive(String topic, + java.util.function.Predicate filter, + Duration timeout) { + return connector.receive(topic, filter, timeout); + } + + /** + * Receives a message with default timeout (30 seconds). + */ + public List receive(String topic, + java.util.function.Predicate filter) { + return receive(topic, filter, Duration.ofSeconds(30)); + } + + /** + * Receives the first available message from a topic. + * + * @param topic Kafka topic name + * @param timeout Maximum time to wait + * @return First message + */ + public List receive(String topic, Duration timeout) { + return receive(topic, msg -> true, timeout); + } + + /** + * Closes the endpoint and releases resources. + */ + @Override + public void close() { + if (connector != null) { + connector.close(); + } + } + + /** + * Checks if Kafka is accessible. + */ + @Override + public boolean canAccess() { + try { + // Try to get topic metadata - if this succeeds, Kafka is accessible + // Note: We don't actually make a network call here, just verify config + String bootstrapServers = store.getConfig("endpoints.kafka.bootstrap-servers"); + return bootstrapServers != null && !bootstrapServers.isEmpty(); + } catch (Exception e) { + return false; + } + } + + /** + * Gets the underlying connector (for advanced use cases). + */ + public KafkaConnector getConnector() { + return connector; + } + + /** + * Gets the store accessor. + */ + public StoreAccessor getStore() { + return store; + } + + /** + * Retrieves a value from Vault. + */ + private String getVaultValue(String path, String key) { + try { + VaultConfig vaultConfig = new VaultConfig() + .address(store.getConfig("vault.address", "http://localhost:8200")) + .token(store.getConfig("vault.token")) + .build(); + Vault vault = new Vault(vaultConfig, 2); + + LogicalResponse response = vault.logical().read(path); + if (response == null) { + throw new MessagingConnectionException( + "Failed to read from Vault path: " + path); + } + Map data = response.getData(); + if (data == null) { + throw new MessagingConnectionException( + "No data found in Vault path: " + path); + } + + String value = data.get(key); + if (value == null) { + throw new MessagingConnectionException( + "Credential '" + key + "' not found in Vault path: " + path); + } + + return value; + + } catch (VaultException e) { + throw new MessagingConnectionException( + "Failed to retrieve credential '" + key + "' from Vault at " + path, e); + } + } +} diff --git a/test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingConnectionException.java b/test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingConnectionException.java index 85a1d3f..33707c4 100644 --- a/test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingConnectionException.java +++ b/test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingConnectionException.java @@ -2,7 +2,7 @@ package cz.moneta.test.harness.messaging.exception; /** * Exception thrown when connection to messaging system fails. - * Covers authentication failures, network issues, and connection problems. + * Includes authentication failures, network issues, etc. */ public class MessagingConnectionException extends MessagingException { @@ -13,4 +13,8 @@ public class MessagingConnectionException extends MessagingException { public MessagingConnectionException(String message, Throwable cause) { super(message, cause); } + + public MessagingConnectionException(Throwable cause) { + super(cause); + } } diff --git a/test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingDestinationException.java b/test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingDestinationException.java index 5bd5ae5..9192959 100644 --- a/test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingDestinationException.java +++ b/test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingDestinationException.java @@ -1,7 +1,8 @@ package cz.moneta.test.harness.messaging.exception; /** - * Exception thrown when destination (queue, topic) is not found or inaccessible. + * Exception thrown when destination (topic/queue) does not exist or is inaccessible. + * Includes permission issues and unknown object errors. */ public class MessagingDestinationException extends MessagingException { @@ -12,4 +13,8 @@ public class MessagingDestinationException extends MessagingException { public MessagingDestinationException(String message, Throwable cause) { super(message, cause); } + + public MessagingDestinationException(Throwable cause) { + super(cause); + } } diff --git a/test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingException.java b/test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingException.java index f923bda..34f538f 100644 --- a/test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingException.java +++ b/test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingException.java @@ -1,7 +1,8 @@ package cz.moneta.test.harness.messaging.exception; /** - * Abstract base exception for messaging system errors (IBM MQ, Kafka). + * Base exception for all messaging-related errors. + * Extends RuntimeException for unchecked behavior. */ public abstract class MessagingException extends RuntimeException { @@ -12,4 +13,8 @@ public abstract class MessagingException extends RuntimeException { protected MessagingException(String message, Throwable cause) { super(message, cause); } + + protected MessagingException(Throwable cause) { + super(cause); + } } diff --git a/test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingSchemaException.java b/test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingSchemaException.java index 5e0860a..efbc63c 100644 --- a/test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingSchemaException.java +++ b/test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingSchemaException.java @@ -1,8 +1,8 @@ package cz.moneta.test.harness.messaging.exception; /** - * Exception thrown when message schema validation fails. - * Currently primarily used for IBM MQ message format issues. + * Exception thrown when schema validation fails or schema is not found. + * Used for Avro schema mismatches or missing schema in Schema Registry. */ public class MessagingSchemaException extends MessagingException { @@ -13,4 +13,8 @@ public class MessagingSchemaException extends MessagingException { public MessagingSchemaException(String message, Throwable cause) { super(message, cause); } + + public MessagingSchemaException(Throwable cause) { + super(cause); + } } diff --git a/test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingTimeoutException.java b/test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingTimeoutException.java index 5d69880..53ce43f 100644 --- a/test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingTimeoutException.java +++ b/test-harness/src/main/java/cz/moneta/test/harness/messaging/exception/MessagingTimeoutException.java @@ -1,7 +1,8 @@ package cz.moneta.test.harness.messaging.exception; /** - * Exception thrown when waiting for a message times out. + * Exception thrown when message receiving times out. + * No matching message found within the specified timeout period. */ public class MessagingTimeoutException extends MessagingException { @@ -12,4 +13,8 @@ public class MessagingTimeoutException extends MessagingException { public MessagingTimeoutException(String message, Throwable cause) { super(message, cause); } + + public MessagingTimeoutException(Throwable cause) { + super(cause); + } } diff --git a/test-harness/src/main/java/cz/moneta/test/harness/support/messaging/kafka/KafkaRequest.java b/test-harness/src/main/java/cz/moneta/test/harness/support/messaging/kafka/KafkaRequest.java new file mode 100644 index 0000000..ff04708 --- /dev/null +++ b/test-harness/src/main/java/cz/moneta/test/harness/support/messaging/kafka/KafkaRequest.java @@ -0,0 +1,568 @@ +package cz.moneta.test.harness.support.messaging.kafka; + +import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.lang3.StringUtils; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import cz.moneta.test.harness.endpoints.kafka.KafkaEndpoint; +import cz.moneta.test.harness.support.util.FileReader; + +/** + * Fluent builder for creating and sending Kafka messages. + *

+ * Usage: + *

{@code
+ * harness.withKafka()
+ *     .toTopic("order-events")
+ *     .withKey("order-123")
+ *     .withPayload("{\"orderId\": \"123\", \"status\": \"CREATED\"}")
+ *     .withTraceparent("00-traceId-spanId-01")
+ *     .send();
+ * }
+ *

+ */ +public class KafkaRequest { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final Pattern ARRAY_NODE_PATTERN = Pattern.compile("(.*?)\\[([0-9]*?)\\]"); + + private KafkaRequest() { + } + + /** + * Creates a new Kafka request builder. + */ + public static KafkaBuilder builder(KafkaEndpoint endpoint) { + return new KafkaRequestBuilder(endpoint); + } + + /** + * Phase 1: Select direction (send or receive). + */ + public interface KafkaBuilder { + /** + * Specifies the target topic for sending. + * + * @param topic Kafka topic name + * @return KafkaPayloadPhase for payload configuration + */ + KafkaPayloadPhase toTopic(String topic); + + /** + * Specifies the source topic for receiving. + * + * @param topic Kafka topic name + * @return KafkaReceiveFilterPhase for filter configuration + */ + KafkaReceiveFilterPhase fromTopic(String topic); + } + + /** + * Phase 2a: Configure payload and headers for sending. + */ + public interface KafkaPayloadPhase { + /** + * Sets the message key. + * + * @param key Message key + * @return this for chaining + */ + KafkaPayloadPhase withKey(String key); + + /** + * Sets the payload as JSON string. + * + * @param json JSON payload + * @return this for chaining + */ + KafkaPayloadPhase withPayload(String json); + + /** + * Loads payload from a file. + * + * @param path Path to JSON file in resources + * @return this for chaining + */ + KafkaPayloadPhase withPayloadFromFile(String path); + + /** + * Loads payload from a template file and renders it. + * + * @param path Path to template file + * @param variables Variables for template rendering + * @return this for chaining + */ + KafkaPayloadPhase withPayloadFromTemplate(String path, Map variables); + + /** + * Adds a field to the JSON payload. + * + * @param fieldName Field name + * @param value Field value + * @return this for chaining + */ + KafkaPayloadPhase addField(String fieldName, Object value); + + /** + * Adds a field to a nested path in the JSON payload. + * + * @param path Path to parent node (e.g., "items[1].details") + * @param fieldName Field name to add + * @param value Field value + * @return this for chaining + */ + KafkaPayloadPhase addField(String path, String fieldName, Object value); + + /** + * Appends a value to an array in the JSON payload. + * + * @param path Path to array (e.g., "items") + * @param value Value to append + * @return this for chaining + */ + KafkaPayloadPhase appendToArray(String path, Object value); + + /** + * Adds W3C Trace Context traceparent header. + * + * @param value Traceparent value (e.g., "00-traceId-spanId-01") + * @return this for chaining + */ + KafkaPayloadPhase withTraceparent(String value); + + /** + * Adds requestID header. + * + * @param value Request ID + * @return this for chaining + */ + KafkaPayloadPhase withRequestID(String value); + + /** + * Adds activityID header. + * + * @param value Activity ID + * @return this for chaining + */ + KafkaPayloadPhase withActivityID(String value); + + /** + * Adds sourceCodebookId header. + * + * @param value Source codebook ID + * @return this for chaining + */ + KafkaPayloadPhase withSourceCodebookId(String value); + + /** + * Adds a custom header. + * + * @param key Header name + * @param value Header value + * @return this for chaining + */ + KafkaPayloadPhase withHeader(String key, String value); + + /** + * Sends the message. + */ + void send(); + } + + /** + * Phase 2b: Configure filter for receiving. + */ + public interface KafkaReceiveFilterPhase { + /** + * Specifies the filter predicate for receiving messages. + * + * @param filter Predicate to filter messages + * @return KafkaAwaitingPhase for timeout configuration + */ + KafkaAwaitingPhase receiveWhere(Predicate filter); + } + + /** + * Phase 3: Configure timeout. + */ + public interface KafkaAwaitingPhase { + /** + * Sets the timeout for waiting for a message. + * + * @param duration Timeout duration + * @param unit Time unit + * @return MessageResponse for assertions + */ + MessageResponse withTimeout(long duration, TimeUnit unit); + } + + /** + * Internal implementation class. + */ + private static class KafkaRequestBuilder implements + KafkaBuilder, + KafkaPayloadPhase, KafkaReceiveFilterPhase, KafkaAwaitingPhase, + MessageResponse { + + private final KafkaEndpoint endpoint; + + // Send configuration + private String topic; + private String key; + private String payload; + private final Map headers = new HashMap<>(); + + // Receive configuration + private Predicate filter; + private Duration timeout; + + // Response (after receive) + private List messages; + + public KafkaRequestBuilder(KafkaEndpoint endpoint) { + this.endpoint = endpoint; + } + + // === Send phase === + + @Override + public KafkaPayloadPhase toTopic(String topic) { + this.topic = topic; + return this; + } + + @Override + public KafkaPayloadPhase withKey(String key) { + this.key = key; + return this; + } + + @Override + public KafkaPayloadPhase withPayload(String json) { + this.payload = json; + return this; + } + + @Override + public KafkaPayloadPhase withPayloadFromFile(String path) { + this.payload = FileReader.readFileFromResources(path); + return this; + } + + @Override + public KafkaPayloadPhase withPayloadFromTemplate(String path, Map variables) { + String templateContent = FileReader.readFileFromResources(path); + cz.moneta.test.harness.support.util.Template template = + new cz.moneta.test.harness.support.util.Template(templateContent); + if (variables != null) { + variables.forEach((k, v) -> template.set(k, v != null ? v.toString() : "")); + } + this.payload = template.render(); + return this; + } + + @Override + public KafkaPayloadPhase addField(String fieldName, Object value) { + return addField("", fieldName, value); + } + + @Override + public KafkaPayloadPhase addField(String path, String fieldName, Object value) { + try { + if (payload == null) { + payload = "{}"; + } + JsonNode rootNode = MAPPER.readTree(payload); + JsonNode targetNode = extractNode(path, rootNode); + JsonNode newNode = MAPPER.valueToTree(value); + ((ObjectNode) targetNode).set(fieldName, newNode); + payload = MAPPER.writeValueAsString(rootNode); + } catch (Exception e) { + throw new IllegalStateException("Failed to add field '" + fieldName + "' at path '" + path + "': " + e.getMessage(), e); + } + return this; + } + + @Override + public KafkaPayloadPhase appendToArray(String path, Object value) { + try { + if (payload == null) { + payload = "{}"; + } + JsonNode rootNode = MAPPER.readTree(payload); + JsonNode targetNode = extractNode(path, rootNode); + JsonNode newNode = MAPPER.valueToTree(value); + if (targetNode.isArray()) { + ((ArrayNode) targetNode).add(newNode); + } else { + throw new IllegalStateException("Path '" + path + "' does not point to an array"); + } + payload = MAPPER.writeValueAsString(rootNode); + } catch (Exception e) { + throw new IllegalStateException("Failed to append to array at path '" + path + "': " + e.getMessage(), e); + } + return this; + } + + @Override + public KafkaPayloadPhase withTraceparent(String value) { + return withHeader("traceparent", value); + } + + @Override + public KafkaPayloadPhase withRequestID(String value) { + return withHeader("requestID", value); + } + + @Override + public KafkaPayloadPhase withActivityID(String value) { + return withHeader("activityID", value); + } + + @Override + public KafkaPayloadPhase withSourceCodebookId(String value) { + return withHeader("sourceCodebookId", value); + } + + @Override + public KafkaPayloadPhase withHeader(String key, String value) { + this.headers.put(key, value); + return this; + } + + @Override + public void send() { + if (topic == null) { + throw new IllegalStateException("Topic not specified. Call toTopic() first."); + } + if (payload == null) { + throw new IllegalStateException("Payload not specified. Call withPayload() or withPayloadFromFile() first."); + } + endpoint.send(topic, key, payload, headers); + } + + // === Receive phase === + + @Override + public KafkaReceiveFilterPhase fromTopic(String topic) { + this.topic = topic; + return this; + } + + @Override + public KafkaAwaitingPhase receiveWhere(Predicate filter) { + this.filter = filter; + return this; + } + + @Override + public MessageResponse withTimeout(long duration, TimeUnit unit) { + this.timeout = Duration.of(duration, unit.toChronoUnit()); + messages = endpoint.receive(topic, filter, timeout); + return this; + } + + // === MessageResponse implementation === + + @Override + public MessageResponse andAssertFieldValue(String path, String value) { + if (messages == null || messages.isEmpty()) { + throw new IllegalStateException("No message received to assert on"); + } + ReceivedMessage msg = messages.get(0); + String actual = msg.extract(path); + if (!Objects.equals(value, actual)) { + throw new AssertionError( + String.format("Field '%s' has value '%s', expected '%s'", path, actual, value)); + } + return this; + } + + @Override + public MessageResponse andAssertPresent(String path) { + if (messages == null || messages.isEmpty()) { + throw new IllegalStateException("No message received to assert on"); + } + ReceivedMessage msg = messages.get(0); + JsonNode node = msg.extractJson(path); + if (node.isMissingNode()) { + throw new AssertionError("Field '" + path + "' is missing"); + } + return this; + } + + @Override + public MessageResponse andAssertNotPresent(String path) { + if (messages == null || messages.isEmpty()) { + throw new IllegalStateException("No message received to assert on"); + } + ReceivedMessage msg = messages.get(0); + JsonNode node = msg.extractJson(path); + if (!node.isMissingNode()) { + throw new AssertionError("Field '" + path + "' is present with value: " + node.asText()); + } + return this; + } + + @Override + public MessageResponse andAssertHeaderValue(String headerName, String value) { + if (messages == null || messages.isEmpty()) { + throw new IllegalStateException("No message received to assert on"); + } + ReceivedMessage msg = messages.get(0); + String actual = msg.getHeader(headerName); + if (!Objects.equals(value, actual)) { + throw new AssertionError( + String.format("Header '%s' has value '%s', expected '%s'", headerName, actual, value)); + } + return this; + } + + @Override + public MessageResponse andAssertBodyContains(String substring) { + if (messages == null || messages.isEmpty()) { + throw new IllegalStateException("No message received to assert on"); + } + ReceivedMessage msg = messages.get(0); + String body = msg.getBody(); + if (body == null || !body.contains(substring)) { + throw new AssertionError( + String.format("Body does not contain '%s'. Actual body: %s", substring, body)); + } + return this; + } + + @Override + public net.javacrumbs.jsonunit.assertj.JsonAssert.ConfigurableJsonAssert andAssertWithAssertJ() { + if (messages == null || messages.isEmpty()) { + throw new IllegalStateException("No message received to assert on"); + } + return net.javacrumbs.jsonunit.assertj.JsonAssertions.assertThatJson(messages.get(0).getBody()); + } + + @Override + public JsonPathValue extract(String path) { + if (messages == null || messages.isEmpty()) { + throw new IllegalStateException("No message received to extract from"); + } + JsonNode node = messages.get(0).extractJson(path); + return new JsonPathValueImpl(node); + } + + @Override + public T mapTo(Class type) { + if (messages == null || messages.isEmpty()) { + throw new IllegalStateException("No message received to map"); + } + return messages.get(0).mapTo(type); + } + + @Override + public ReceivedMessage getMessage() { + if (messages == null || messages.isEmpty()) { + throw new IllegalStateException("No message received"); + } + return messages.get(0); + } + + @Override + public String getBody() { + if (messages == null || messages.isEmpty()) { + return null; + } + return messages.get(0).getBody(); + } + + @Override + public String getHeader(String name) { + if (messages == null || messages.isEmpty()) { + return null; + } + return messages.get(0).getHeader(name); + } + + // === Helper methods === + + private JsonNode extractNode(String path, JsonNode rootNode) { + if (StringUtils.isBlank(path)) { + return rootNode; + } + + return Arrays.stream(path.split("\\.")) + .filter(StringUtils::isNotEmpty) + .reduce(rootNode, + (r, p) -> { + Matcher matcher = ARRAY_NODE_PATTERN.matcher(p); + if (matcher.find()) { + return r.path(matcher.group(1)).path(Integer.valueOf(matcher.group(2))); + } else { + return r.path(p); + } + }, + (j1, j2) -> j1); + } + } + + /** + * Implementation of JsonPathValue. + */ + private static class JsonPathValueImpl implements MessageResponse.JsonPathValue { + + private final JsonNode node; + + public JsonPathValueImpl(JsonNode node) { + this.node = node; + } + + @Override + public String asText() { + if (node == null || node.isMissingNode()) { + return null; + } + return node.asText(); + } + + @Override + public Integer asInt() { + if (node == null || node.isMissingNode()) { + return null; + } + return node.asInt(); + } + + @Override + public Long asLong() { + if (node == null || node.isMissingNode()) { + return null; + } + return node.asLong(); + } + + @Override + public Boolean asBoolean() { + if (node == null || node.isMissingNode()) { + return null; + } + return node.asBoolean(); + } + + @Override + public boolean isMissing() { + return node == null || node.isMissingNode(); + } + } +} diff --git a/test-harness/src/main/java/cz/moneta/test/harness/support/messaging/kafka/MessageContentType.java b/test-harness/src/main/java/cz/moneta/test/harness/support/messaging/kafka/MessageContentType.java new file mode 100644 index 0000000..d0520a5 --- /dev/null +++ b/test-harness/src/main/java/cz/moneta/test/harness/support/messaging/kafka/MessageContentType.java @@ -0,0 +1,19 @@ +package cz.moneta.test.harness.support.messaging.kafka; + +/** + * Enum representing the content type of a received message. + */ +public enum MessageContentType { + /** + * JSON content - parsed with Jackson ObjectMapper + */ + JSON, + /** + * XML content - can be parsed with XPath or Jackson XmlMapper + */ + XML, + /** + * Raw text content - not parsed, returned as-is + */ + RAW_TEXT +} diff --git a/test-harness/src/main/java/cz/moneta/test/harness/support/messaging/kafka/MessageResponse.java b/test-harness/src/main/java/cz/moneta/test/harness/support/messaging/kafka/MessageResponse.java new file mode 100644 index 0000000..877147e --- /dev/null +++ b/test-harness/src/main/java/cz/moneta/test/harness/support/messaging/kafka/MessageResponse.java @@ -0,0 +1,139 @@ +package cz.moneta.test.harness.support.messaging.kafka; + +import net.javacrumbs.jsonunit.assertj.JsonAssert; + +/** + * Interface representing the response from a message receive operation. + * Provides fluent assertion methods for testing received messages. + */ +public interface MessageResponse { + + /** + * Asserts that a field has the expected value. + * + * @param path JSON path or XPath + * @param value expected value as string + * @return this for chaining + */ + MessageResponse andAssertFieldValue(String path, String value); + + /** + * Asserts that a field is present. + * + * @param path JSON path or XPath + * @return this for chaining + */ + MessageResponse andAssertPresent(String path); + + /** + * Asserts that a field is not present. + * + * @param path JSON path or XPath + * @return this for chaining + */ + MessageResponse andAssertNotPresent(String path); + + /** + * Asserts that a header has the expected value. + * + * @param headerName name of the header + * @param value expected value + * @return this for chaining + */ + MessageResponse andAssertHeaderValue(String headerName, String value); + + /** + * Asserts that the body contains a substring. + * Useful for EBCDIC/UTF-8 raw text messages. + * + * @param substring expected substring + * @return this for chaining + */ + MessageResponse andAssertBodyContains(String substring); + + /** + * Returns AssertJ JsonAssert for complex assertions. + * + * @return JsonAssert instance + */ + JsonAssert.ConfigurableJsonAssert andAssertWithAssertJ(); + + /** + * Extracts a value from the message body. + * + * @param path JSON path or XPath + * @return JsonPathValue wrapper for further conversion + */ + JsonPathValue extract(String path); + + /** + * Deserializes the message body to a Java object. + * + * @param type the target type + * @param the target type + * @return deserialized object + */ + T mapTo(Class type); + + /** + * Gets the underlying received message. + * + * @return the received message + */ + ReceivedMessage getMessage(); + + /** + * Gets the message body as string. + * + * @return body string + */ + String getBody(); + + /** + * Gets a header value. + * + * @param name header name + * @return header value or null + */ + String getHeader(String name); + + /** + * Interface for extracted path values. + */ + interface JsonPathValue { + /** + * Converts to string. + * + * @return string value + */ + String asText(); + + /** + * Converts to integer. + * + * @return integer value + */ + Integer asInt(); + + /** + * Converts to long. + * + * @return long value + */ + Long asLong(); + + /** + * Converts to boolean. + * + * @return boolean value + */ + Boolean asBoolean(); + + /** + * Checks if value is missing/null. + * + * @return true if value is missing + */ + boolean isMissing(); + } +} diff --git a/test-harness/src/main/java/cz/moneta/test/harness/support/messaging/kafka/ReceivedMessage.java b/test-harness/src/main/java/cz/moneta/test/harness/support/messaging/kafka/ReceivedMessage.java new file mode 100644 index 0000000..fe46f40 --- /dev/null +++ b/test-harness/src/main/java/cz/moneta/test/harness/support/messaging/kafka/ReceivedMessage.java @@ -0,0 +1,365 @@ +package cz.moneta.test.harness.support.messaging.kafka; + +import java.io.StringReader; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.xpath.XPath; +import javax.xml.xpath.XPathConstants; +import javax.xml.xpath.XPathFactory; + +import org.apache.commons.lang3.StringUtils; +import org.w3c.dom.Document; +import org.w3c.dom.NodeList; +import org.xml.sax.InputSource; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.xml.XmlMapper; + +import net.javacrumbs.jsonunit.assertj.JsonAssert; + +/** + * Represents a received message from a messaging system (Kafka or IBM MQ). + * Provides unified API for extracting data regardless of the source system. + */ +public class ReceivedMessage { + + private static final ObjectMapper JSON_MAPPER = new ObjectMapper(); + private static final XmlMapper XML_MAPPER = new XmlMapper(); + private static final Pattern ARRAY_NODE_PATTERN = Pattern.compile("(.*?)\\[([0-9]*?)\\]"); + + private final String body; + private final MessageContentType contentType; + private final Map headers; + private final long timestamp; + private final String source; + private final String key; + + private ReceivedMessage(Builder builder) { + this.body = builder.body; + this.contentType = builder.contentType; + this.headers = builder.headers != null ? new HashMap<>(builder.headers) : new HashMap<>(); + this.timestamp = builder.timestamp; + this.source = builder.source; + this.key = builder.key; + } + + /** + * Creates a new builder for ReceivedMessage. + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Extracts value using JSON path (dot/bracket notation). + * For XML content, automatically converts to XPath evaluation. + * + * @param path JSON path (e.g., "items[0].sku") or XPath for XML + * @return JsonNode representing the extracted value + */ + public JsonNode extractJson(String path) { + if (body == null) { + return null; + } + + if (contentType == MessageContentType.XML) { + // For XML, try XPath first, then fall back to JSON path + try { + return extractAsXml(path); + } catch (Exception e) { + // Fall through to JSON parsing + } + } + + try { + JsonNode rootNode = JSON_MAPPER.readTree(body); + return extractNode(path, rootNode); + } catch (Exception e) { + throw new RuntimeException("Failed to extract JSON path '" + path + "' from body: " + e.getMessage(), e); + } + } + + /** + * Extracts value using XPath expression (for XML messages). + * + * @param xpath XPath expression (e.g., "/response/balance") + * @return String value of the extracted node + */ + public String extractXml(String xpath) { + if (body == null) { + return null; + } + + try { + Document doc = XML_MAPPER.readTree(body).isMissingNode() ? null : XML_MAPPER.readTree(body).isObject() + ? parseXmlToDocument(body) + : parseXmlToDocument(body); + + if (doc == null) { + doc = parseXmlToDocument(body); + } + + XPath xPath = XPathFactory.newInstance().newXPath(); + NodeList nodes = (NodeList) xPath.evaluate(xpath, doc, XPathConstants.NODESET); + + if (nodes.getLength() == 0) { + return null; + } + + return nodes.item(0).getTextContent(); + } catch (Exception e) { + throw new RuntimeException("Failed to evaluate XPath '" + xpath + "' on body: " + e.getMessage(), e); + } + } + + /** + * Universal extract method - auto-detects content type and evaluates expression accordingly. + * + * @param expression JSON path for JSON content, XPath for XML content + * @return String value of the extracted node + */ + public String extract(String expression) { + if (body == null) { + return null; + } + + return switch (contentType) { + case JSON -> extractJson(expression).asText(); + case XML -> extractXml(expression); + case RAW_TEXT -> body; + }; + } + + /** + * Deserializes the message body to a Java object. + * + * @param type the target type + * @param the target type + * @return deserialized object + */ + public T mapTo(Class type) { + if (body == null) { + return null; + } + + try { + if (contentType == MessageContentType.JSON) { + return JSON_MAPPER.readValue(body, type); + } else if (contentType == MessageContentType.XML) { + return XML_MAPPER.readValue(body, type); + } else { + throw new IllegalStateException("Cannot deserialize RAW_TEXT to " + type.getName()); + } + } catch (Exception e) { + throw new RuntimeException("Failed to deserialize body to " + type.getName() + ": " + e.getMessage(), e); + } + } + + /** + * Deserializes the message body to a list of objects. + * + * @param type the element type + * @param the element type + * @return list of deserialized objects + */ + public List mapToList(Class type) { + if (body == null) { + return Collections.emptyList(); + } + + try { + if (contentType == MessageContentType.JSON) { + return JSON_MAPPER.readValue(body, new TypeReference>() {}); + } else { + throw new IllegalStateException("Cannot deserialize to list for content type " + contentType); + } + } catch (Exception e) { + throw new RuntimeException("Failed to deserialize body to List<" + type.getName() + ">: " + e.getMessage(), e); + } + } + + /** + * Gets the message body as string. + */ + public String getBody() { + return body; + } + + /** + * Gets the message key (Kafka) or null (IBM MQ). + */ + public String getKey() { + return key; + } + + /** + * Gets header value by name (Kafka header or JMS property). + */ + public String getHeader(String name) { + return headers.get(name); + } + + /** + * Gets all headers. + */ + public Map getHeaders() { + return Collections.unmodifiableMap(headers); + } + + /** + * Gets the message timestamp. + */ + public long getTimestamp() { + return timestamp; + } + + /** + * Gets the source (topic name for Kafka, queue name for IBM MQ). + */ + public String getSource() { + return source; + } + + /** + * Gets the content type. + */ + public MessageContentType getContentType() { + return contentType; + } + + /** + * Creates JsonAssert for AssertJ-style assertions on the message body. + */ + public JsonAssert.ConfigurableJsonAssert andAssertWithAssertJ() { + if (body == null) { + throw new IllegalStateException("Cannot assert on null body"); + } + return net.javacrumbs.jsonunit.assertj.JsonAssertions.assertThatJson(body); + } + + private JsonNode extractNode(String path, JsonNode rootNode) { + if (StringUtils.isBlank(path)) { + return rootNode; + } + + return Arrays.stream(path.split("\\.")) + .filter(StringUtils::isNotEmpty) + .reduce(rootNode, + (r, p) -> { + Matcher matcher = ARRAY_NODE_PATTERN.matcher(p); + if (matcher.find()) { + return r.path(matcher.group(1)).path(Integer.valueOf(matcher.group(2))); + } else { + return r.path(p); + } + }, + (j1, j2) -> j1); + } + + private JsonNode extractAsXml(String path) throws Exception { + // Try XPath first + try { + String value = extractXml(path); + if (value != null) { + return JSON_MAPPER.valueToTree(value); + } + } catch (Exception e) { + // Fall back to JSON path on XML body + } + return extractNode(path, XML_MAPPER.readTree(body)); + } + + private Document parseXmlToDocument(String xml) throws Exception { + DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); + factory.setFeature(javax.xml.XMLConstants.FEATURE_SECURE_PROCESSING, true); + DocumentBuilder builder = factory.newDocumentBuilder(); + return builder.parse(new InputSource(new StringReader(xml))); + } + + /** + * Builder for ReceivedMessage. + */ + public static class Builder { + private String body; + private MessageContentType contentType = MessageContentType.JSON; + private Map headers = new HashMap<>(); + private long timestamp = System.currentTimeMillis(); + private String source; + private String key; + + public Builder body(String body) { + this.body = body; + return this; + } + + public Builder contentType(MessageContentType contentType) { + this.contentType = contentType; + return this; + } + + public Builder headers(Map headers) { + this.headers = headers; + return this; + } + + public Builder timestamp(long timestamp) { + this.timestamp = timestamp; + return this; + } + + public Builder source(String source) { + this.source = source; + return this; + } + + public Builder key(String key) { + this.key = key; + return this; + } + + public ReceivedMessage build() { + return new ReceivedMessage(this); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ReceivedMessage that = (ReceivedMessage) o; + return timestamp == that.timestamp && + Objects.equals(body, that.body) && + contentType == that.contentType && + Objects.equals(headers, that.headers) && + Objects.equals(source, that.source) && + Objects.equals(key, that.key); + } + + @Override + public int hashCode() { + return Objects.hash(body, contentType, headers, timestamp, source, key); + } + + @Override + public String toString() { + return "ReceivedMessage{" + + "body='" + body + '\'' + + ", contentType=" + contentType + + ", headers=" + headers + + ", timestamp=" + timestamp + + ", source='" + source + '\'' + + ", key='" + key + '\'' + + '}'; + } +}