added kafka

This commit is contained in:
Radek Davidek 2026-03-26 10:21:48 +01:00
parent 51ee61e328
commit b94a071fda
13 changed files with 1811 additions and 6 deletions

View File

@ -31,6 +31,9 @@
<cxf.version>4.0.3</cxf.version>
<ibm.mq.version>9.4.5.0</ibm.mq.version>
<javax.jms.version>2.0.1</javax.jms.version>
<kafka.clients.version>3.7.0</kafka.clients.version>
<confluent.version>7.6.0</confluent.version>
<assertj.version>3.24.2</assertj.version>
</properties>
<dependencies>
@ -299,6 +302,32 @@
<version>${javax.jms.version}</version>
</dependency>
<!-- Kafka dependencies -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.clients.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>
<!-- AssertJ for advanced assertions -->
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
</dependency>
<!-- Used in Wso2ConnectorServlet -->
<dependency>
<groupId>javax.servlet</groupId>

View File

@ -0,0 +1,115 @@
package cz.moneta.test.harness.connectors.messaging;
import com.google.gson.*;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.JsonDecoder;
import java.util.ArrayList;
import java.util.List;
public class JsonToAvroConverter {
protected static GenericRecord processJson(String json, Schema schema) throws IllegalArgumentException, JsonSchemaException {
GenericRecord result = (GenericRecord) jsonElementToAvro(JsonParser.parseString(json), schema);
return result;
}
private static Object jsonElementToAvro(JsonElement element, Schema schema) throws JsonSchemaException {
boolean schemaIsNullable = isNullable(schema);
if (schemaIsNullable) {
schema = typeFromNullable(schema);
}
if (element == null || element.isJsonNull()) {
if (!schemaIsNullable) {
throw new JsonSchemaException("The element is not nullable in Avro schema.");
}
return null;
} else if (element.isJsonObject()) {
if (schema.getType() != Schema.Type.RECORD) {
throw new JsonSchemaException(
String.format("The element `%s` doesn't match Avro type RECORD", element));
}
return jsonObjectToAvro(element.getAsJsonObject(), schema);
} else if (element.isJsonArray()) {
if (schema.getType() != Schema.Type.ARRAY) {
throw new JsonSchemaException(
String.format("The element `%s` doesn't match Avro type ARRAY", element));
}
JsonArray jsonArray = element.getAsJsonArray();
List<Object> avroArray = new ArrayList<>(jsonArray.size());
for (JsonElement e : element.getAsJsonArray()) {
avroArray.add(jsonElementToAvro(e, schema.getElementType()));
}
return avroArray;
} else if (element.isJsonPrimitive()) {
return jsonPrimitiveToAvro(element.getAsJsonPrimitive(), schema);
} else {
throw new JsonSchemaException(
String.format(
"The Json element `%s` is of an unknown class %s", element, element.getClass()));
}
}
private static GenericRecord jsonObjectToAvro(JsonObject jsonObject, Schema schema) throws JsonSchemaException {
GenericRecord avroRecord = new GenericData.Record(schema);
for (Schema.Field field : schema.getFields()) {
avroRecord.put(field.name(), jsonElementToAvro(jsonObject.get(field.name()), field.schema()));
}
return avroRecord;
}
private static boolean isNullable(Schema type) {
return type.getType() == Schema.Type.NULL
|| type.getType() == Schema.Type.UNION
&& type.getTypes().stream().anyMatch(JsonToAvroConverter::isNullable);
}
private static Schema typeFromNullable(Schema type) {
if (type.getType() == Schema.Type.UNION) {
return typeFromNullable(
type.getTypes().stream()
.filter(t -> t.getType() != Schema.Type.NULL)
.findFirst()
.orElseThrow(
() ->
new IllegalStateException(
String.format("Type `%s` should have a non null subtype", type))));
}
return type;
}
private static Object jsonPrimitiveToAvro(JsonPrimitive primitive, Schema schema){
switch (schema.getType()) {
case NULL:
return null;
case STRING:
return primitive.getAsString();
case BOOLEAN:
return primitive.getAsBoolean();
case INT:
return primitive.getAsInt();
case LONG:
return primitive.getAsLong();
case FLOAT:
return primitive.getAsFloat();
case DOUBLE:
return primitive.getAsDouble();
default:
return primitive.getAsString();
}
}
private static class JsonSchemaException extends Exception {
JsonSchemaException(String message) {
super(message);
}
}
}

View File

@ -0,0 +1,348 @@
package cz.moneta.test.harness.connectors.messaging;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.function.Predicate;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cz.moneta.test.harness.messaging.exception.MessagingConnectionException;
import cz.moneta.test.harness.messaging.exception.MessagingDestinationException;
import cz.moneta.test.harness.messaging.exception.MessagingSchemaException;
import cz.moneta.test.harness.messaging.exception.MessagingTimeoutException;
import cz.moneta.test.harness.support.messaging.kafka.MessageContentType;
import cz.moneta.test.harness.support.messaging.kafka.ReceivedMessage;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
/**
* Kafka connector for sending and receiving messages.
* Supports Avro serialization with Confluent Schema Registry.
* <p>
* Uses manual partition assignment (no consumer group) for test isolation.
* Each receive operation creates a new consumer to avoid offset sharing.
*/
public class KafkaConnector implements cz.moneta.test.harness.connectors.Connector {
private static final Logger LOG = LoggerFactory.getLogger(KafkaConnector.class);
private final Properties producerConfig;
private final Properties consumerConfig;
private final String schemaRegistryUrl;
private final CachedSchemaRegistryClient schemaRegistryClient;
private KafkaProducer<String, GenericRecord> producer;
/**
* Creates a new KafkaConnector.
*
* @param bootstrapServers Kafka bootstrap servers
* @param apiKey Kafka API key
* @param apiSecret Kafka API secret
* @param schemaRegistryUrl Schema Registry URL
* @param schemaRegistryApiKey Schema Registry API key
* @param schemaRegistryApiSecret Schema Registry API secret
*/
public KafkaConnector(String bootstrapServers,
String apiKey,
String apiSecret,
String schemaRegistryUrl,
String schemaRegistryApiKey,
String schemaRegistryApiSecret) {
this.schemaRegistryUrl = schemaRegistryUrl;
this.schemaRegistryClient = new CachedSchemaRegistryClient(
Collections.singletonList(schemaRegistryUrl), 100, new HashMap<>());
this.producerConfig = createProducerConfig(bootstrapServers, apiKey, apiSecret);
this.consumerConfig = createConsumerConfig(bootstrapServers, schemaRegistryApiKey, schemaRegistryApiSecret);
}
/**
* Creates producer configuration.
*/
private Properties createProducerConfig(String bootstrapServers, String apiKey, String apiSecret) {
Properties config = new Properties();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
config.put("schema.registry.url", schemaRegistryUrl);
config.put(ProducerConfig.ACKS_CONFIG, "all");
config.put(ProducerConfig.LINGER_MS_CONFIG, 1);
config.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// SASL/PLAIN authentication
// config.put("security.protocol", "SASL_SSL");
// config.put("sasl.mechanism", "PLAIN");
// config.put("sasl.jaas.config",
// "org.apache.kafka.common.security.plain.PlainLoginModule required " +
// "username=\"" + apiKey + "\" password=\"" + apiSecret + "\";");
// SSL configuration
// config.put("ssl.endpoint.identification.algorithm", "https");
return config;
}
/**
* Creates consumer configuration.
*/
private Properties createConsumerConfig(String bootstrapServers, String apiKey, String apiSecret) {
Properties config = new Properties();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// SASL/PLAIN authentication
config.put("security.protocol", "SASL_SSL");
config.put("sasl.mechanism", "PLAIN");
config.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"" + apiKey + "\" password=\"" + apiSecret + "\";");
// Schema Registry for deserialization
config.put("schema.registry.url", schemaRegistryUrl);
config.put("specific.avro.reader", false);
// SSL configuration
config.put("ssl.endpoint.identification.algorithm", "https");
return config;
}
/**
* Sends a message to a Kafka topic.
*/
public void send(String topic, String key, String jsonPayload, Map<String, String> headers) {
try {
org.apache.avro.Schema schema = getSchemaForTopic(topic);
GenericRecord record = jsonToAvro(jsonPayload, schema);
ProducerRecord<String, GenericRecord> producerRecord =
new ProducerRecord<>(topic, key, record);
// Add headers
if (headers != null) {
Headers kafkaHeaders = producerRecord.headers();
headers.forEach((k, v) ->
kafkaHeaders.add(k, v.getBytes(StandardCharsets.UTF_8)));
}
// Send and wait for confirmation
getProducer().send(producerRecord, (metadata, exception) -> {
if (exception != null) {
LOG.error("Failed to send message", exception);
} else {
LOG.debug("Message sent to topic {} partition {} offset {}",
metadata.topic(), metadata.partition(), metadata.offset());
}
}).get(10, java.util.concurrent.TimeUnit.SECONDS);
} catch (ExecutionException e) {
throw new MessagingConnectionException(
"Failed to send message to topic " + topic, e.getCause());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new MessagingConnectionException(
"Interrupted while sending message to topic " + topic, e);
} catch (MessagingSchemaException e) {
throw e;
} catch (Exception e) {
throw new MessagingSchemaException(
"Failed to convert JSON to Avro for topic " + topic, e);
}
}
/**
* Receives a message from a Kafka topic matching the filter.
*/
public List<ReceivedMessage> receive(String topic,
Predicate<ReceivedMessage> filter,
Duration timeout) {
KafkaConsumer<String, GenericRecord> consumer = null;
try {
consumer = new KafkaConsumer<>(consumerConfig);
// Get partitions for the topic
List<TopicPartition> partitions = getPartitionsForTopic(consumer, topic);
if (partitions.isEmpty()) {
throw new MessagingDestinationException(
"Topic '" + topic + "' does not exist or has no partitions");
}
// Assign partitions and seek to end
consumer.assign(partitions);
// consumer.seekToBeginning(partitions);
consumer.seekToBeginning(partitions);
// Poll loop with exponential backoff
long startTime = System.currentTimeMillis();
Duration pollInterval = Duration.ofMillis(100);
Duration maxPollInterval = Duration.ofSeconds(1);
while (Duration.ofMillis(System.currentTimeMillis() - startTime).compareTo(timeout) < 0) {
ConsumerRecords<String, GenericRecord> records = consumer.poll(pollInterval);
for (ConsumerRecord<String, GenericRecord> record : records) {
ReceivedMessage message = convertToReceivedMessage(record, topic);
if (filter.test(message)) {
LOG.debug("Found matching message on topic {} partition {} offset {}",
record.topic(), record.partition(), record.offset());
return Collections.singletonList(message);
}
}
// Exponential backoff
pollInterval = Duration.ofMillis(
Math.min(pollInterval.toMillis() * 2, maxPollInterval.toMillis()));
}
throw new MessagingTimeoutException(
"No message matching filter found on topic '" + topic + "' within " + timeout);
} finally {
if (consumer != null) {
consumer.close();
}
}
}
/**
* Gets partitions for a topic.
*/
private List<TopicPartition> getPartitionsForTopic(KafkaConsumer<?, ?> consumer, String topic) {
List<TopicPartition> partitions = new ArrayList<>();
List<org.apache.kafka.common.PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
if (partitionInfos != null) {
for (org.apache.kafka.common.PartitionInfo partitionInfo : partitionInfos) {
partitions.add(new TopicPartition(topic, partitionInfo.partition()));
}
}
return partitions;
}
/**
* Saves current offsets for a topic.
*/
public Map<TopicPartition, Long> saveOffsets(String topic) {
return new HashMap<>();
}
/**
* Closes the connector and releases resources.
*/
@Override
public void close() {
if (producer != null) {
producer.close();
}
}
/**
* Gets or creates the producer (singleton, thread-safe).
*/
private KafkaProducer<String, GenericRecord> getProducer() {
if (producer == null) {
synchronized (this) {
if (producer == null) {
producer = new KafkaProducer<>(producerConfig);
}
}
}
return producer;
}
/**
* Retrieves schema from Schema Registry based on topic name.
*/
private org.apache.avro.Schema getSchemaForTopic(String topic) {
String subject = topic + "-value";
try {
io.confluent.kafka.schemaregistry.client.SchemaMetadata metadata =
schemaRegistryClient.getLatestSchemaMetadata(subject);
return new org.apache.avro.Schema.Parser().parse(metadata.getSchema());
} catch (Exception e) {
if (e.getMessage() != null && e.getMessage().contains("404")) {
throw new MessagingSchemaException(
"Schema not found for subject '" + subject + "' in Schema Registry. " +
"Make sure the topic exists and schema is registered.");
}
throw new MessagingSchemaException(
"Failed to retrieve schema for topic '" + topic + "'", e);
}
}
/**
* Converts JSON string to Avro GenericRecord.
*/
private GenericRecord jsonToAvro(String json, org.apache.avro.Schema schema) {
try {
GenericRecord genericRecord = JsonToAvroConverter.processJson(json, schema);
return genericRecord;
} catch (Exception e) {
throw new MessagingSchemaException("Failed to convert JSON to Avro: " + e.getMessage(), e);
}
}
/**
* Converts Kafka ConsumerRecord to ReceivedMessage.
*/
private ReceivedMessage convertToReceivedMessage(ConsumerRecord<String, GenericRecord> record, String topic) {
try {
String jsonBody = avroToJson(record.value());
Map<String, String> headers = new HashMap<>();
Headers kafkaHeaders = record.headers();
for (org.apache.kafka.common.header.Header header : kafkaHeaders) {
if (header.value() != null) {
headers.put(header.key(), new String(header.value(), StandardCharsets.UTF_8));
}
}
return ReceivedMessage.builder()
.body(jsonBody)
.contentType(MessageContentType.JSON)
.headers(headers)
.timestamp(record.timestamp())
.source(topic)
.key(record.key())
.build();
} catch (Exception e) {
LOG.error("Failed to convert Avro record to ReceivedMessage", e);
throw new RuntimeException("Failed to convert message", e);
}
}
/**
* Converts Avro GenericRecord to JSON string.
*/
private String avroToJson(GenericRecord record) {
try {
return record.toString();
} catch (Exception e) {
throw new RuntimeException("Failed to convert Avro to JSON: " + e.getMessage(), e);
}
}
}

View File

@ -0,0 +1,199 @@
package cz.moneta.test.harness.endpoints.kafka;
import cz.moneta.test.harness.connectors.messaging.KafkaConnector;
import cz.moneta.test.harness.context.BaseStoreAccessor;
import cz.moneta.test.harness.context.StoreAccessor;
import cz.moneta.test.harness.endpoints.Endpoint;
import cz.moneta.test.harness.messaging.exception.MessagingConnectionException;
import cz.moneta.test.harness.support.messaging.kafka.ReceivedMessage;
import com.bettercloud.vault.Vault;
import com.bettercloud.vault.VaultConfig;
import com.bettercloud.vault.VaultException;
import com.bettercloud.vault.response.LogicalResponse;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* Kafka endpoint that provides high-level API for Kafka messaging.
* <p>
* Reads configuration from StoreAccessor and credentials from HashiCorp Vault.
* Implements singleton pattern per test class (managed by Harness).
* </p>
*/
public class KafkaEndpoint implements Endpoint {
private final KafkaConnector connector;
private final StoreAccessor store;
/**
* Creates a new KafkaEndpoint.
* <p>
* Configuration is read from StoreAccessor:
* - endpoints.kafka.bootstrap-servers
* - endpoints.kafka.schema-registry-url
* <p>
* Credentials are retrieved from Vault:
* - vault.kafka.secrets.path -> apiKey, apiSecret
* </p>
*/
public KafkaEndpoint(StoreAccessor store) {
this.store = store;
// Read configuration
String bootstrapServers = store.getConfig("endpoints.kafka.bootstrap-servers");
if (bootstrapServers == null) {
throw new IllegalStateException("You need to configure " + bootstrapServers + " to work with Kafka");
}
String schemaRegistryUrl = store.getConfig("endpoints.kafka.schema-registry-url");
if (schemaRegistryUrl == null) {
throw new IllegalStateException("You need to configure " + schemaRegistryUrl + " url to work with Kafka");
}
// Retrieve credentials from Vault
String vaultPath = store.getConfig("vault.kafka.secrets.path");
if (vaultPath == null) {
throw new IllegalStateException(
"You need to configure vault.kafka.secrets.path");
}
//
String apiKey = getVaultValue(vaultPath, "apiKey");
String apiSecret = getVaultValue(vaultPath, "apiSecret");
String schemaRegistryApiKey = getVaultValue(vaultPath, "schemaRegistryApiKey");
String schemaRegistryApiSecret = getVaultValue(vaultPath, "schemaRegistryApiSecret");
// Create connector
this.connector = new KafkaConnector(
bootstrapServers,
apiKey,
apiSecret,
schemaRegistryUrl,
schemaRegistryApiKey,
schemaRegistryApiSecret
);
}
/**
* Sends a message to a Kafka topic.
*
* @param topic Kafka topic name
* @param key Message key
* @param jsonPayload Message body as JSON string
* @param headers Kafka headers (traceparent, requestID, activityID, etc.)
*/
public void send(String topic, String key, String jsonPayload, Map<String, String> headers) {
connector.send(topic, key, jsonPayload, headers);
}
/**
* Receives a message from a Kafka topic matching the filter.
*
* @param topic Kafka topic name
* @param filter Predicate to filter messages
* @param timeout Maximum time to wait for a message
* @return First matching ReceivedMessage
*/
public List<ReceivedMessage> receive(String topic,
java.util.function.Predicate<ReceivedMessage> filter,
Duration timeout) {
return connector.receive(topic, filter, timeout);
}
/**
* Receives a message with default timeout (30 seconds).
*/
public List<ReceivedMessage> receive(String topic,
java.util.function.Predicate<ReceivedMessage> filter) {
return receive(topic, filter, Duration.ofSeconds(30));
}
/**
* Receives the first available message from a topic.
*
* @param topic Kafka topic name
* @param timeout Maximum time to wait
* @return First message
*/
public List<ReceivedMessage> receive(String topic, Duration timeout) {
return receive(topic, msg -> true, timeout);
}
/**
* Closes the endpoint and releases resources.
*/
@Override
public void close() {
if (connector != null) {
connector.close();
}
}
/**
* Checks if Kafka is accessible.
*/
@Override
public boolean canAccess() {
try {
// Try to get topic metadata - if this succeeds, Kafka is accessible
// Note: We don't actually make a network call here, just verify config
String bootstrapServers = store.getConfig("endpoints.kafka.bootstrap-servers");
return bootstrapServers != null && !bootstrapServers.isEmpty();
} catch (Exception e) {
return false;
}
}
/**
* Gets the underlying connector (for advanced use cases).
*/
public KafkaConnector getConnector() {
return connector;
}
/**
* Gets the store accessor.
*/
public StoreAccessor getStore() {
return store;
}
/**
* Retrieves a value from Vault.
*/
private String getVaultValue(String path, String key) {
try {
VaultConfig vaultConfig = new VaultConfig()
.address(store.getConfig("vault.address", "http://localhost:8200"))
.token(store.getConfig("vault.token"))
.build();
Vault vault = new Vault(vaultConfig, 2);
LogicalResponse response = vault.logical().read(path);
if (response == null) {
throw new MessagingConnectionException(
"Failed to read from Vault path: " + path);
}
Map<String, String> data = response.getData();
if (data == null) {
throw new MessagingConnectionException(
"No data found in Vault path: " + path);
}
String value = data.get(key);
if (value == null) {
throw new MessagingConnectionException(
"Credential '" + key + "' not found in Vault path: " + path);
}
return value;
} catch (VaultException e) {
throw new MessagingConnectionException(
"Failed to retrieve credential '" + key + "' from Vault at " + path, e);
}
}
}

View File

@ -2,7 +2,7 @@ package cz.moneta.test.harness.messaging.exception;
/**
* Exception thrown when connection to messaging system fails.
* Covers authentication failures, network issues, and connection problems.
* Includes authentication failures, network issues, etc.
*/
public class MessagingConnectionException extends MessagingException {
@ -13,4 +13,8 @@ public class MessagingConnectionException extends MessagingException {
public MessagingConnectionException(String message, Throwable cause) {
super(message, cause);
}
public MessagingConnectionException(Throwable cause) {
super(cause);
}
}

View File

@ -1,7 +1,8 @@
package cz.moneta.test.harness.messaging.exception;
/**
* Exception thrown when destination (queue, topic) is not found or inaccessible.
* Exception thrown when destination (topic/queue) does not exist or is inaccessible.
* Includes permission issues and unknown object errors.
*/
public class MessagingDestinationException extends MessagingException {
@ -12,4 +13,8 @@ public class MessagingDestinationException extends MessagingException {
public MessagingDestinationException(String message, Throwable cause) {
super(message, cause);
}
public MessagingDestinationException(Throwable cause) {
super(cause);
}
}

View File

@ -1,7 +1,8 @@
package cz.moneta.test.harness.messaging.exception;
/**
* Abstract base exception for messaging system errors (IBM MQ, Kafka).
* Base exception for all messaging-related errors.
* Extends RuntimeException for unchecked behavior.
*/
public abstract class MessagingException extends RuntimeException {
@ -12,4 +13,8 @@ public abstract class MessagingException extends RuntimeException {
protected MessagingException(String message, Throwable cause) {
super(message, cause);
}
protected MessagingException(Throwable cause) {
super(cause);
}
}

View File

@ -1,8 +1,8 @@
package cz.moneta.test.harness.messaging.exception;
/**
* Exception thrown when message schema validation fails.
* Currently primarily used for IBM MQ message format issues.
* Exception thrown when schema validation fails or schema is not found.
* Used for Avro schema mismatches or missing schema in Schema Registry.
*/
public class MessagingSchemaException extends MessagingException {
@ -13,4 +13,8 @@ public class MessagingSchemaException extends MessagingException {
public MessagingSchemaException(String message, Throwable cause) {
super(message, cause);
}
public MessagingSchemaException(Throwable cause) {
super(cause);
}
}

View File

@ -1,7 +1,8 @@
package cz.moneta.test.harness.messaging.exception;
/**
* Exception thrown when waiting for a message times out.
* Exception thrown when message receiving times out.
* No matching message found within the specified timeout period.
*/
public class MessagingTimeoutException extends MessagingException {
@ -12,4 +13,8 @@ public class MessagingTimeoutException extends MessagingException {
public MessagingTimeoutException(String message, Throwable cause) {
super(message, cause);
}
public MessagingTimeoutException(Throwable cause) {
super(cause);
}
}

View File

@ -0,0 +1,568 @@
package cz.moneta.test.harness.support.messaging.kafka;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import cz.moneta.test.harness.endpoints.kafka.KafkaEndpoint;
import cz.moneta.test.harness.support.util.FileReader;
/**
* Fluent builder for creating and sending Kafka messages.
* <p>
* Usage:
* <pre>{@code
* harness.withKafka()
* .toTopic("order-events")
* .withKey("order-123")
* .withPayload("{\"orderId\": \"123\", \"status\": \"CREATED\"}")
* .withTraceparent("00-traceId-spanId-01")
* .send();
* }</pre>
* </p>
*/
public class KafkaRequest {
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final Pattern ARRAY_NODE_PATTERN = Pattern.compile("(.*?)\\[([0-9]*?)\\]");
private KafkaRequest() {
}
/**
* Creates a new Kafka request builder.
*/
public static KafkaBuilder builder(KafkaEndpoint endpoint) {
return new KafkaRequestBuilder(endpoint);
}
/**
* Phase 1: Select direction (send or receive).
*/
public interface KafkaBuilder {
/**
* Specifies the target topic for sending.
*
* @param topic Kafka topic name
* @return KafkaPayloadPhase for payload configuration
*/
KafkaPayloadPhase toTopic(String topic);
/**
* Specifies the source topic for receiving.
*
* @param topic Kafka topic name
* @return KafkaReceiveFilterPhase for filter configuration
*/
KafkaReceiveFilterPhase fromTopic(String topic);
}
/**
* Phase 2a: Configure payload and headers for sending.
*/
public interface KafkaPayloadPhase {
/**
* Sets the message key.
*
* @param key Message key
* @return this for chaining
*/
KafkaPayloadPhase withKey(String key);
/**
* Sets the payload as JSON string.
*
* @param json JSON payload
* @return this for chaining
*/
KafkaPayloadPhase withPayload(String json);
/**
* Loads payload from a file.
*
* @param path Path to JSON file in resources
* @return this for chaining
*/
KafkaPayloadPhase withPayloadFromFile(String path);
/**
* Loads payload from a template file and renders it.
*
* @param path Path to template file
* @param variables Variables for template rendering
* @return this for chaining
*/
KafkaPayloadPhase withPayloadFromTemplate(String path, Map<String, Object> variables);
/**
* Adds a field to the JSON payload.
*
* @param fieldName Field name
* @param value Field value
* @return this for chaining
*/
KafkaPayloadPhase addField(String fieldName, Object value);
/**
* Adds a field to a nested path in the JSON payload.
*
* @param path Path to parent node (e.g., "items[1].details")
* @param fieldName Field name to add
* @param value Field value
* @return this for chaining
*/
KafkaPayloadPhase addField(String path, String fieldName, Object value);
/**
* Appends a value to an array in the JSON payload.
*
* @param path Path to array (e.g., "items")
* @param value Value to append
* @return this for chaining
*/
KafkaPayloadPhase appendToArray(String path, Object value);
/**
* Adds W3C Trace Context traceparent header.
*
* @param value Traceparent value (e.g., "00-traceId-spanId-01")
* @return this for chaining
*/
KafkaPayloadPhase withTraceparent(String value);
/**
* Adds requestID header.
*
* @param value Request ID
* @return this for chaining
*/
KafkaPayloadPhase withRequestID(String value);
/**
* Adds activityID header.
*
* @param value Activity ID
* @return this for chaining
*/
KafkaPayloadPhase withActivityID(String value);
/**
* Adds sourceCodebookId header.
*
* @param value Source codebook ID
* @return this for chaining
*/
KafkaPayloadPhase withSourceCodebookId(String value);
/**
* Adds a custom header.
*
* @param key Header name
* @param value Header value
* @return this for chaining
*/
KafkaPayloadPhase withHeader(String key, String value);
/**
* Sends the message.
*/
void send();
}
/**
* Phase 2b: Configure filter for receiving.
*/
public interface KafkaReceiveFilterPhase {
/**
* Specifies the filter predicate for receiving messages.
*
* @param filter Predicate to filter messages
* @return KafkaAwaitingPhase for timeout configuration
*/
KafkaAwaitingPhase receiveWhere(Predicate<ReceivedMessage> filter);
}
/**
* Phase 3: Configure timeout.
*/
public interface KafkaAwaitingPhase {
/**
* Sets the timeout for waiting for a message.
*
* @param duration Timeout duration
* @param unit Time unit
* @return MessageResponse for assertions
*/
MessageResponse withTimeout(long duration, TimeUnit unit);
}
/**
* Internal implementation class.
*/
private static class KafkaRequestBuilder implements
KafkaBuilder,
KafkaPayloadPhase, KafkaReceiveFilterPhase, KafkaAwaitingPhase,
MessageResponse {
private final KafkaEndpoint endpoint;
// Send configuration
private String topic;
private String key;
private String payload;
private final Map<String, String> headers = new HashMap<>();
// Receive configuration
private Predicate<ReceivedMessage> filter;
private Duration timeout;
// Response (after receive)
private List<ReceivedMessage> messages;
public KafkaRequestBuilder(KafkaEndpoint endpoint) {
this.endpoint = endpoint;
}
// === Send phase ===
@Override
public KafkaPayloadPhase toTopic(String topic) {
this.topic = topic;
return this;
}
@Override
public KafkaPayloadPhase withKey(String key) {
this.key = key;
return this;
}
@Override
public KafkaPayloadPhase withPayload(String json) {
this.payload = json;
return this;
}
@Override
public KafkaPayloadPhase withPayloadFromFile(String path) {
this.payload = FileReader.readFileFromResources(path);
return this;
}
@Override
public KafkaPayloadPhase withPayloadFromTemplate(String path, Map<String, Object> variables) {
String templateContent = FileReader.readFileFromResources(path);
cz.moneta.test.harness.support.util.Template template =
new cz.moneta.test.harness.support.util.Template(templateContent);
if (variables != null) {
variables.forEach((k, v) -> template.set(k, v != null ? v.toString() : ""));
}
this.payload = template.render();
return this;
}
@Override
public KafkaPayloadPhase addField(String fieldName, Object value) {
return addField("", fieldName, value);
}
@Override
public KafkaPayloadPhase addField(String path, String fieldName, Object value) {
try {
if (payload == null) {
payload = "{}";
}
JsonNode rootNode = MAPPER.readTree(payload);
JsonNode targetNode = extractNode(path, rootNode);
JsonNode newNode = MAPPER.valueToTree(value);
((ObjectNode) targetNode).set(fieldName, newNode);
payload = MAPPER.writeValueAsString(rootNode);
} catch (Exception e) {
throw new IllegalStateException("Failed to add field '" + fieldName + "' at path '" + path + "': " + e.getMessage(), e);
}
return this;
}
@Override
public KafkaPayloadPhase appendToArray(String path, Object value) {
try {
if (payload == null) {
payload = "{}";
}
JsonNode rootNode = MAPPER.readTree(payload);
JsonNode targetNode = extractNode(path, rootNode);
JsonNode newNode = MAPPER.valueToTree(value);
if (targetNode.isArray()) {
((ArrayNode) targetNode).add(newNode);
} else {
throw new IllegalStateException("Path '" + path + "' does not point to an array");
}
payload = MAPPER.writeValueAsString(rootNode);
} catch (Exception e) {
throw new IllegalStateException("Failed to append to array at path '" + path + "': " + e.getMessage(), e);
}
return this;
}
@Override
public KafkaPayloadPhase withTraceparent(String value) {
return withHeader("traceparent", value);
}
@Override
public KafkaPayloadPhase withRequestID(String value) {
return withHeader("requestID", value);
}
@Override
public KafkaPayloadPhase withActivityID(String value) {
return withHeader("activityID", value);
}
@Override
public KafkaPayloadPhase withSourceCodebookId(String value) {
return withHeader("sourceCodebookId", value);
}
@Override
public KafkaPayloadPhase withHeader(String key, String value) {
this.headers.put(key, value);
return this;
}
@Override
public void send() {
if (topic == null) {
throw new IllegalStateException("Topic not specified. Call toTopic() first.");
}
if (payload == null) {
throw new IllegalStateException("Payload not specified. Call withPayload() or withPayloadFromFile() first.");
}
endpoint.send(topic, key, payload, headers);
}
// === Receive phase ===
@Override
public KafkaReceiveFilterPhase fromTopic(String topic) {
this.topic = topic;
return this;
}
@Override
public KafkaAwaitingPhase receiveWhere(Predicate<ReceivedMessage> filter) {
this.filter = filter;
return this;
}
@Override
public MessageResponse withTimeout(long duration, TimeUnit unit) {
this.timeout = Duration.of(duration, unit.toChronoUnit());
messages = endpoint.receive(topic, filter, timeout);
return this;
}
// === MessageResponse implementation ===
@Override
public MessageResponse andAssertFieldValue(String path, String value) {
if (messages == null || messages.isEmpty()) {
throw new IllegalStateException("No message received to assert on");
}
ReceivedMessage msg = messages.get(0);
String actual = msg.extract(path);
if (!Objects.equals(value, actual)) {
throw new AssertionError(
String.format("Field '%s' has value '%s', expected '%s'", path, actual, value));
}
return this;
}
@Override
public MessageResponse andAssertPresent(String path) {
if (messages == null || messages.isEmpty()) {
throw new IllegalStateException("No message received to assert on");
}
ReceivedMessage msg = messages.get(0);
JsonNode node = msg.extractJson(path);
if (node.isMissingNode()) {
throw new AssertionError("Field '" + path + "' is missing");
}
return this;
}
@Override
public MessageResponse andAssertNotPresent(String path) {
if (messages == null || messages.isEmpty()) {
throw new IllegalStateException("No message received to assert on");
}
ReceivedMessage msg = messages.get(0);
JsonNode node = msg.extractJson(path);
if (!node.isMissingNode()) {
throw new AssertionError("Field '" + path + "' is present with value: " + node.asText());
}
return this;
}
@Override
public MessageResponse andAssertHeaderValue(String headerName, String value) {
if (messages == null || messages.isEmpty()) {
throw new IllegalStateException("No message received to assert on");
}
ReceivedMessage msg = messages.get(0);
String actual = msg.getHeader(headerName);
if (!Objects.equals(value, actual)) {
throw new AssertionError(
String.format("Header '%s' has value '%s', expected '%s'", headerName, actual, value));
}
return this;
}
@Override
public MessageResponse andAssertBodyContains(String substring) {
if (messages == null || messages.isEmpty()) {
throw new IllegalStateException("No message received to assert on");
}
ReceivedMessage msg = messages.get(0);
String body = msg.getBody();
if (body == null || !body.contains(substring)) {
throw new AssertionError(
String.format("Body does not contain '%s'. Actual body: %s", substring, body));
}
return this;
}
@Override
public net.javacrumbs.jsonunit.assertj.JsonAssert.ConfigurableJsonAssert andAssertWithAssertJ() {
if (messages == null || messages.isEmpty()) {
throw new IllegalStateException("No message received to assert on");
}
return net.javacrumbs.jsonunit.assertj.JsonAssertions.assertThatJson(messages.get(0).getBody());
}
@Override
public JsonPathValue extract(String path) {
if (messages == null || messages.isEmpty()) {
throw new IllegalStateException("No message received to extract from");
}
JsonNode node = messages.get(0).extractJson(path);
return new JsonPathValueImpl(node);
}
@Override
public <T> T mapTo(Class<T> type) {
if (messages == null || messages.isEmpty()) {
throw new IllegalStateException("No message received to map");
}
return messages.get(0).mapTo(type);
}
@Override
public ReceivedMessage getMessage() {
if (messages == null || messages.isEmpty()) {
throw new IllegalStateException("No message received");
}
return messages.get(0);
}
@Override
public String getBody() {
if (messages == null || messages.isEmpty()) {
return null;
}
return messages.get(0).getBody();
}
@Override
public String getHeader(String name) {
if (messages == null || messages.isEmpty()) {
return null;
}
return messages.get(0).getHeader(name);
}
// === Helper methods ===
private JsonNode extractNode(String path, JsonNode rootNode) {
if (StringUtils.isBlank(path)) {
return rootNode;
}
return Arrays.stream(path.split("\\."))
.filter(StringUtils::isNotEmpty)
.reduce(rootNode,
(r, p) -> {
Matcher matcher = ARRAY_NODE_PATTERN.matcher(p);
if (matcher.find()) {
return r.path(matcher.group(1)).path(Integer.valueOf(matcher.group(2)));
} else {
return r.path(p);
}
},
(j1, j2) -> j1);
}
}
/**
* Implementation of JsonPathValue.
*/
private static class JsonPathValueImpl implements MessageResponse.JsonPathValue {
private final JsonNode node;
public JsonPathValueImpl(JsonNode node) {
this.node = node;
}
@Override
public String asText() {
if (node == null || node.isMissingNode()) {
return null;
}
return node.asText();
}
@Override
public Integer asInt() {
if (node == null || node.isMissingNode()) {
return null;
}
return node.asInt();
}
@Override
public Long asLong() {
if (node == null || node.isMissingNode()) {
return null;
}
return node.asLong();
}
@Override
public Boolean asBoolean() {
if (node == null || node.isMissingNode()) {
return null;
}
return node.asBoolean();
}
@Override
public boolean isMissing() {
return node == null || node.isMissingNode();
}
}
}

View File

@ -0,0 +1,19 @@
package cz.moneta.test.harness.support.messaging.kafka;
/**
* Enum representing the content type of a received message.
*/
public enum MessageContentType {
/**
* JSON content - parsed with Jackson ObjectMapper
*/
JSON,
/**
* XML content - can be parsed with XPath or Jackson XmlMapper
*/
XML,
/**
* Raw text content - not parsed, returned as-is
*/
RAW_TEXT
}

View File

@ -0,0 +1,139 @@
package cz.moneta.test.harness.support.messaging.kafka;
import net.javacrumbs.jsonunit.assertj.JsonAssert;
/**
* Interface representing the response from a message receive operation.
* Provides fluent assertion methods for testing received messages.
*/
public interface MessageResponse {
/**
* Asserts that a field has the expected value.
*
* @param path JSON path or XPath
* @param value expected value as string
* @return this for chaining
*/
MessageResponse andAssertFieldValue(String path, String value);
/**
* Asserts that a field is present.
*
* @param path JSON path or XPath
* @return this for chaining
*/
MessageResponse andAssertPresent(String path);
/**
* Asserts that a field is not present.
*
* @param path JSON path or XPath
* @return this for chaining
*/
MessageResponse andAssertNotPresent(String path);
/**
* Asserts that a header has the expected value.
*
* @param headerName name of the header
* @param value expected value
* @return this for chaining
*/
MessageResponse andAssertHeaderValue(String headerName, String value);
/**
* Asserts that the body contains a substring.
* Useful for EBCDIC/UTF-8 raw text messages.
*
* @param substring expected substring
* @return this for chaining
*/
MessageResponse andAssertBodyContains(String substring);
/**
* Returns AssertJ JsonAssert for complex assertions.
*
* @return JsonAssert instance
*/
JsonAssert.ConfigurableJsonAssert andAssertWithAssertJ();
/**
* Extracts a value from the message body.
*
* @param path JSON path or XPath
* @return JsonPathValue wrapper for further conversion
*/
JsonPathValue extract(String path);
/**
* Deserializes the message body to a Java object.
*
* @param type the target type
* @param <T> the target type
* @return deserialized object
*/
<T> T mapTo(Class<T> type);
/**
* Gets the underlying received message.
*
* @return the received message
*/
ReceivedMessage getMessage();
/**
* Gets the message body as string.
*
* @return body string
*/
String getBody();
/**
* Gets a header value.
*
* @param name header name
* @return header value or null
*/
String getHeader(String name);
/**
* Interface for extracted path values.
*/
interface JsonPathValue {
/**
* Converts to string.
*
* @return string value
*/
String asText();
/**
* Converts to integer.
*
* @return integer value
*/
Integer asInt();
/**
* Converts to long.
*
* @return long value
*/
Long asLong();
/**
* Converts to boolean.
*
* @return boolean value
*/
Boolean asBoolean();
/**
* Checks if value is missing/null.
*
* @return true if value is missing
*/
boolean isMissing();
}
}

View File

@ -0,0 +1,365 @@
package cz.moneta.test.harness.support.messaging.kafka;
import java.io.StringReader;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathFactory;
import org.apache.commons.lang3.StringUtils;
import org.w3c.dom.Document;
import org.w3c.dom.NodeList;
import org.xml.sax.InputSource;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.xml.XmlMapper;
import net.javacrumbs.jsonunit.assertj.JsonAssert;
/**
* Represents a received message from a messaging system (Kafka or IBM MQ).
* Provides unified API for extracting data regardless of the source system.
*/
public class ReceivedMessage {
private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
private static final XmlMapper XML_MAPPER = new XmlMapper();
private static final Pattern ARRAY_NODE_PATTERN = Pattern.compile("(.*?)\\[([0-9]*?)\\]");
private final String body;
private final MessageContentType contentType;
private final Map<String, String> headers;
private final long timestamp;
private final String source;
private final String key;
private ReceivedMessage(Builder builder) {
this.body = builder.body;
this.contentType = builder.contentType;
this.headers = builder.headers != null ? new HashMap<>(builder.headers) : new HashMap<>();
this.timestamp = builder.timestamp;
this.source = builder.source;
this.key = builder.key;
}
/**
* Creates a new builder for ReceivedMessage.
*/
public static Builder builder() {
return new Builder();
}
/**
* Extracts value using JSON path (dot/bracket notation).
* For XML content, automatically converts to XPath evaluation.
*
* @param path JSON path (e.g., "items[0].sku") or XPath for XML
* @return JsonNode representing the extracted value
*/
public JsonNode extractJson(String path) {
if (body == null) {
return null;
}
if (contentType == MessageContentType.XML) {
// For XML, try XPath first, then fall back to JSON path
try {
return extractAsXml(path);
} catch (Exception e) {
// Fall through to JSON parsing
}
}
try {
JsonNode rootNode = JSON_MAPPER.readTree(body);
return extractNode(path, rootNode);
} catch (Exception e) {
throw new RuntimeException("Failed to extract JSON path '" + path + "' from body: " + e.getMessage(), e);
}
}
/**
* Extracts value using XPath expression (for XML messages).
*
* @param xpath XPath expression (e.g., "/response/balance")
* @return String value of the extracted node
*/
public String extractXml(String xpath) {
if (body == null) {
return null;
}
try {
Document doc = XML_MAPPER.readTree(body).isMissingNode() ? null : XML_MAPPER.readTree(body).isObject()
? parseXmlToDocument(body)
: parseXmlToDocument(body);
if (doc == null) {
doc = parseXmlToDocument(body);
}
XPath xPath = XPathFactory.newInstance().newXPath();
NodeList nodes = (NodeList) xPath.evaluate(xpath, doc, XPathConstants.NODESET);
if (nodes.getLength() == 0) {
return null;
}
return nodes.item(0).getTextContent();
} catch (Exception e) {
throw new RuntimeException("Failed to evaluate XPath '" + xpath + "' on body: " + e.getMessage(), e);
}
}
/**
* Universal extract method - auto-detects content type and evaluates expression accordingly.
*
* @param expression JSON path for JSON content, XPath for XML content
* @return String value of the extracted node
*/
public String extract(String expression) {
if (body == null) {
return null;
}
return switch (contentType) {
case JSON -> extractJson(expression).asText();
case XML -> extractXml(expression);
case RAW_TEXT -> body;
};
}
/**
* Deserializes the message body to a Java object.
*
* @param type the target type
* @param <T> the target type
* @return deserialized object
*/
public <T> T mapTo(Class<T> type) {
if (body == null) {
return null;
}
try {
if (contentType == MessageContentType.JSON) {
return JSON_MAPPER.readValue(body, type);
} else if (contentType == MessageContentType.XML) {
return XML_MAPPER.readValue(body, type);
} else {
throw new IllegalStateException("Cannot deserialize RAW_TEXT to " + type.getName());
}
} catch (Exception e) {
throw new RuntimeException("Failed to deserialize body to " + type.getName() + ": " + e.getMessage(), e);
}
}
/**
* Deserializes the message body to a list of objects.
*
* @param type the element type
* @param <T> the element type
* @return list of deserialized objects
*/
public <T> List<T> mapToList(Class<T> type) {
if (body == null) {
return Collections.emptyList();
}
try {
if (contentType == MessageContentType.JSON) {
return JSON_MAPPER.readValue(body, new TypeReference<List<T>>() {});
} else {
throw new IllegalStateException("Cannot deserialize to list for content type " + contentType);
}
} catch (Exception e) {
throw new RuntimeException("Failed to deserialize body to List<" + type.getName() + ">: " + e.getMessage(), e);
}
}
/**
* Gets the message body as string.
*/
public String getBody() {
return body;
}
/**
* Gets the message key (Kafka) or null (IBM MQ).
*/
public String getKey() {
return key;
}
/**
* Gets header value by name (Kafka header or JMS property).
*/
public String getHeader(String name) {
return headers.get(name);
}
/**
* Gets all headers.
*/
public Map<String, String> getHeaders() {
return Collections.unmodifiableMap(headers);
}
/**
* Gets the message timestamp.
*/
public long getTimestamp() {
return timestamp;
}
/**
* Gets the source (topic name for Kafka, queue name for IBM MQ).
*/
public String getSource() {
return source;
}
/**
* Gets the content type.
*/
public MessageContentType getContentType() {
return contentType;
}
/**
* Creates JsonAssert for AssertJ-style assertions on the message body.
*/
public JsonAssert.ConfigurableJsonAssert andAssertWithAssertJ() {
if (body == null) {
throw new IllegalStateException("Cannot assert on null body");
}
return net.javacrumbs.jsonunit.assertj.JsonAssertions.assertThatJson(body);
}
private JsonNode extractNode(String path, JsonNode rootNode) {
if (StringUtils.isBlank(path)) {
return rootNode;
}
return Arrays.stream(path.split("\\."))
.filter(StringUtils::isNotEmpty)
.reduce(rootNode,
(r, p) -> {
Matcher matcher = ARRAY_NODE_PATTERN.matcher(p);
if (matcher.find()) {
return r.path(matcher.group(1)).path(Integer.valueOf(matcher.group(2)));
} else {
return r.path(p);
}
},
(j1, j2) -> j1);
}
private JsonNode extractAsXml(String path) throws Exception {
// Try XPath first
try {
String value = extractXml(path);
if (value != null) {
return JSON_MAPPER.valueToTree(value);
}
} catch (Exception e) {
// Fall back to JSON path on XML body
}
return extractNode(path, XML_MAPPER.readTree(body));
}
private Document parseXmlToDocument(String xml) throws Exception {
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
factory.setFeature(javax.xml.XMLConstants.FEATURE_SECURE_PROCESSING, true);
DocumentBuilder builder = factory.newDocumentBuilder();
return builder.parse(new InputSource(new StringReader(xml)));
}
/**
* Builder for ReceivedMessage.
*/
public static class Builder {
private String body;
private MessageContentType contentType = MessageContentType.JSON;
private Map<String, String> headers = new HashMap<>();
private long timestamp = System.currentTimeMillis();
private String source;
private String key;
public Builder body(String body) {
this.body = body;
return this;
}
public Builder contentType(MessageContentType contentType) {
this.contentType = contentType;
return this;
}
public Builder headers(Map<String, String> headers) {
this.headers = headers;
return this;
}
public Builder timestamp(long timestamp) {
this.timestamp = timestamp;
return this;
}
public Builder source(String source) {
this.source = source;
return this;
}
public Builder key(String key) {
this.key = key;
return this;
}
public ReceivedMessage build() {
return new ReceivedMessage(this);
}
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ReceivedMessage that = (ReceivedMessage) o;
return timestamp == that.timestamp &&
Objects.equals(body, that.body) &&
contentType == that.contentType &&
Objects.equals(headers, that.headers) &&
Objects.equals(source, that.source) &&
Objects.equals(key, that.key);
}
@Override
public int hashCode() {
return Objects.hash(body, contentType, headers, timestamp, source, key);
}
@Override
public String toString() {
return "ReceivedMessage{" +
"body='" + body + '\'' +
", contentType=" + contentType +
", headers=" + headers +
", timestamp=" + timestamp +
", source='" + source + '\'' +
", key='" + key + '\'' +
'}';
}
}