draft mq and kafka
This commit is contained in:
parent
8ffb0acb26
commit
e9105429e1
55
messaging-connection-demo/pom.xml
Normal file
55
messaging-connection-demo/pom.xml
Normal file
@ -0,0 +1,55 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>cz.moneta.demo</groupId>
|
||||
<artifactId>messaging-connection-demo</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source>17</maven.compiler.source>
|
||||
<maven.compiler.target>17</maven.compiler.target>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
|
||||
<kafka.version>3.7.1</kafka.version>
|
||||
<confluent.version>7.6.1</confluent.version>
|
||||
<ibm.mq.version>9.4.2.0</ibm.mq.version>
|
||||
<jakarta.jms.version>3.1.0</jakarta.jms.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>${kafka.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.confluent</groupId>
|
||||
<artifactId>kafka-avro-serializer</artifactId>
|
||||
<version>${confluent.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.ibm.mq</groupId>
|
||||
<artifactId>com.ibm.mq.allclient</artifactId>
|
||||
<version>${ibm.mq.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>jakarta.jms</groupId>
|
||||
<artifactId>jakarta.jms-api</artifactId>
|
||||
<version>${jakarta.jms.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<repositories>
|
||||
<repository>
|
||||
<id>confluent</id>
|
||||
<name>Confluent Maven Repository</name>
|
||||
<url>https://packages.confluent.io/maven/</url>
|
||||
</repository>
|
||||
</repositories>
|
||||
</project>
|
||||
@ -0,0 +1,69 @@
|
||||
package cz.moneta.demo;
|
||||
|
||||
import com.ibm.msg.client.jms.JmsConnectionFactory;
|
||||
import com.ibm.msg.client.jms.JmsFactoryFactory;
|
||||
import com.ibm.msg.client.wmq.WMQConstants;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
|
||||
import javax.jms.JMSContext;
|
||||
import java.util.Properties;
|
||||
|
||||
public class MessagingConnectionApp {
|
||||
|
||||
public static KafkaProducer<String, String> createKafkaConnection(String bootstrapServers,
|
||||
String apiKey,
|
||||
String apiSecret) {
|
||||
Properties kafkaProps = new Properties();
|
||||
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
||||
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
|
||||
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
|
||||
kafkaProps.put("security.protocol", "SASL_SSL");
|
||||
kafkaProps.put("sasl.mechanism", "PLAIN");
|
||||
kafkaProps.put("sasl.jaas.config",
|
||||
String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
|
||||
apiKey, apiSecret));
|
||||
|
||||
return new KafkaProducer<>(kafkaProps);
|
||||
}
|
||||
|
||||
public static JMSContext createMqConnection(String host,
|
||||
int port,
|
||||
String channel,
|
||||
String queueManager,
|
||||
String user,
|
||||
String password) throws Exception {
|
||||
JmsFactoryFactory factoryFactory = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
|
||||
JmsConnectionFactory connectionFactory = factoryFactory.createConnectionFactory();
|
||||
|
||||
connectionFactory.setStringProperty(WMQConstants.WMQ_HOST_NAME, host);
|
||||
connectionFactory.setIntProperty(WMQConstants.WMQ_PORT, port);
|
||||
connectionFactory.setStringProperty(WMQConstants.WMQ_CHANNEL, channel);
|
||||
connectionFactory.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, queueManager);
|
||||
connectionFactory.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
|
||||
connectionFactory.setStringProperty(WMQConstants.USERID, user);
|
||||
connectionFactory.setStringProperty(WMQConstants.PASSWORD, password);
|
||||
|
||||
return connectionFactory.createContext(user, password, JMSContext.AUTO_ACKNOWLEDGE);
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
String kafkaBootstrap = System.getProperty("kafka.bootstrap", "localhost:9092");
|
||||
String kafkaApiKey = System.getProperty("kafka.apiKey", "api-key");
|
||||
String kafkaApiSecret = System.getProperty("kafka.apiSecret", "api-secret");
|
||||
|
||||
String mqHost = System.getProperty("mq.host", "localhost");
|
||||
int mqPort = Integer.parseInt(System.getProperty("mq.port", "1414"));
|
||||
String mqChannel = System.getProperty("mq.channel", "DEV.APP.SVRCONN");
|
||||
String mqQueueManager = System.getProperty("mq.queueManager", "QM1");
|
||||
String mqUser = System.getProperty("mq.user", "app");
|
||||
String mqPassword = System.getProperty("mq.password", "pass");
|
||||
|
||||
try (KafkaProducer<String, String> kafkaProducer = createKafkaConnection(kafkaBootstrap, kafkaApiKey, kafkaApiSecret);
|
||||
JMSContext mqContext = createMqConnection(mqHost, mqPort, mqChannel, mqQueueManager, mqUser, mqPassword)) {
|
||||
System.out.println("Kafka connection created: " + (kafkaProducer != null));
|
||||
System.out.println("IBM MQ connection created: " + (mqContext != null));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -29,6 +29,10 @@
|
||||
<commons-beanutils.version>1.9.3</commons-beanutils.version>
|
||||
<commons-configuration.version>1.6</commons-configuration.version>
|
||||
<cxf.version>4.0.3</cxf.version>
|
||||
<kafka.version>3.7.1</kafka.version>
|
||||
<confluent.version>7.6.1</confluent.version>
|
||||
<ibm.mq.version>9.4.2.0</ibm.mq.version>
|
||||
<jakarta.jms.version>3.1.0</jakarta.jms.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
@ -266,6 +270,48 @@
|
||||
<version>${appium-java-client.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>${kafka.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.confluent</groupId>
|
||||
<artifactId>kafka-avro-serializer</artifactId>
|
||||
<version>${confluent.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.confluent</groupId>
|
||||
<artifactId>kafka-schema-registry-client</artifactId>
|
||||
<version>${confluent.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.avro</groupId>
|
||||
<artifactId>avro</artifactId>
|
||||
<version>1.11.3</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.ibm.mq</groupId>
|
||||
<artifactId>com.ibm.mq.allclient</artifactId>
|
||||
<version>${ibm.mq.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>javax.jms</groupId>
|
||||
<artifactId>javax.jms-api</artifactId>
|
||||
<version>2.0.1</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>jakarta.jms</groupId>
|
||||
<artifactId>jakarta.jms-api</artifactId>
|
||||
<version>${jakarta.jms.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Used for Web Services connector -->
|
||||
<dependency>
|
||||
<groupId>org.apache.cxf</groupId>
|
||||
@ -405,6 +451,13 @@
|
||||
</activation>
|
||||
|
||||
<repositories>
|
||||
<repository>
|
||||
<snapshots><enabled>false</enabled></snapshots>
|
||||
<releases><enabled>true</enabled></releases>
|
||||
<id>confluent</id>
|
||||
<name>Confluent Hub</name>
|
||||
<url>https://packages.confluent.io/maven/</url>
|
||||
</repository>
|
||||
<repository>
|
||||
<snapshots><enabled>false</enabled></snapshots>
|
||||
<releases><enabled>true</enabled></releases>
|
||||
|
||||
@ -0,0 +1,253 @@
|
||||
package cz.moneta.test.harness.connectors.messaging;
|
||||
|
||||
import com.ibm.msg.client.jms.JmsConnectionFactory;
|
||||
import com.ibm.msg.client.jms.JmsFactoryFactory;
|
||||
import com.ibm.msg.client.wmq.WMQConstants;
|
||||
import cz.moneta.test.harness.connectors.Connector;
|
||||
import cz.moneta.test.harness.exception.MessagingTimeoutException;
|
||||
import cz.moneta.test.harness.messaging.model.MessageContentType;
|
||||
import cz.moneta.test.harness.messaging.model.MqMessageFormat;
|
||||
import cz.moneta.test.harness.messaging.model.ReceivedMessage;
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.JMSConsumer;
|
||||
import javax.jms.JMSContext;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.JMSProducer;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.QueueBrowser;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Enumeration;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
public class IbmMqConnector implements Connector {
|
||||
|
||||
private static final Charset EBCDIC_870 = Charset.forName("IBM870");
|
||||
private static final Charset UTF_8 = StandardCharsets.UTF_8;
|
||||
private final JmsConnectionFactory connectionFactory;
|
||||
private final String user;
|
||||
private final String password;
|
||||
private final Object contextLock = new Object();
|
||||
private volatile JMSContext jmsContext;
|
||||
|
||||
public IbmMqConnector(String host,
|
||||
int port,
|
||||
String channel,
|
||||
String queueManager,
|
||||
String user,
|
||||
String password,
|
||||
String keystorePath,
|
||||
String keystorePassword) {
|
||||
this.user = user;
|
||||
this.password = password;
|
||||
try {
|
||||
if (keystorePath != null && !keystorePath.isBlank()) {
|
||||
System.setProperty("javax.net.ssl.keyStore", keystorePath);
|
||||
if (keystorePassword != null) {
|
||||
System.setProperty("javax.net.ssl.keyStorePassword", keystorePassword);
|
||||
}
|
||||
}
|
||||
|
||||
JmsFactoryFactory factoryFactory = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
|
||||
connectionFactory = factoryFactory.createConnectionFactory();
|
||||
connectionFactory.setStringProperty(WMQConstants.WMQ_HOST_NAME, host);
|
||||
connectionFactory.setIntProperty(WMQConstants.WMQ_PORT, port);
|
||||
connectionFactory.setStringProperty(WMQConstants.WMQ_CHANNEL, channel);
|
||||
connectionFactory.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, queueManager);
|
||||
connectionFactory.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
|
||||
connectionFactory.setStringProperty(WMQConstants.USERID, user);
|
||||
connectionFactory.setStringProperty(WMQConstants.PASSWORD, password);
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException("Failed to initialize IBM MQ connection factory", e);
|
||||
}
|
||||
}
|
||||
|
||||
public void send(String queueName,
|
||||
String payload,
|
||||
MqMessageFormat format,
|
||||
Map<String, String> properties) {
|
||||
switch (Objects.requireNonNull(format, "format")) {
|
||||
case JSON, XML -> sendTextMessage(queueName, payload, properties);
|
||||
case EBCDIC_870 -> sendBytesMessage(queueName, payload, EBCDIC_870, 870, properties);
|
||||
case UTF8_1208 -> sendBytesMessage(queueName, payload, UTF_8, 1208, properties);
|
||||
}
|
||||
}
|
||||
|
||||
public ReceivedMessage receive(String queueName,
|
||||
String messageSelector,
|
||||
MqMessageFormat expectedFormat,
|
||||
Duration timeout) {
|
||||
JMSContext context = getContext();
|
||||
Queue queue = context.createQueue("queue:///" + queueName);
|
||||
try (JMSConsumer consumer = messageSelector == null || messageSelector.isBlank()
|
||||
? context.createConsumer(queue)
|
||||
: context.createConsumer(queue, messageSelector)) {
|
||||
Message message = consumer.receive(Optional.ofNullable(timeout).orElse(Duration.ofSeconds(30)).toMillis());
|
||||
if (message == null) {
|
||||
throw new MessagingTimeoutException("Timeout waiting for IBM MQ message from queue: " + queueName);
|
||||
}
|
||||
return toReceivedMessage(message, queueName, expectedFormat);
|
||||
}
|
||||
}
|
||||
|
||||
public List<ReceivedMessage> browse(String queueName,
|
||||
Predicate<ReceivedMessage> filter,
|
||||
MqMessageFormat expectedFormat,
|
||||
Duration timeout) {
|
||||
long timeoutMillis = Optional.ofNullable(timeout).orElse(Duration.ofSeconds(30)).toMillis();
|
||||
long deadline = System.currentTimeMillis() + timeoutMillis;
|
||||
long backoff = 100;
|
||||
JMSContext context = getContext();
|
||||
Queue queue = context.createQueue("queue:///" + queueName);
|
||||
|
||||
while (System.currentTimeMillis() < deadline) {
|
||||
List<ReceivedMessage> matched = new ArrayList<>();
|
||||
try (QueueBrowser browser = context.createBrowser(queue)) {
|
||||
Enumeration<?> messages = browser.getEnumeration();
|
||||
while (messages.hasMoreElements()) {
|
||||
Message message = (Message) messages.nextElement();
|
||||
ReceivedMessage receivedMessage = toReceivedMessage(message, queueName, expectedFormat);
|
||||
if (filter == null || filter.test(receivedMessage)) {
|
||||
matched.add(receivedMessage);
|
||||
}
|
||||
}
|
||||
} catch (JMSException e) {
|
||||
throw new IllegalStateException("Failed to browse IBM MQ queue: " + queueName, e);
|
||||
}
|
||||
|
||||
if (!matched.isEmpty()) {
|
||||
return matched;
|
||||
}
|
||||
|
||||
try {
|
||||
TimeUnit.MILLISECONDS.sleep(backoff);
|
||||
} catch (InterruptedException ignored) {
|
||||
Thread.currentThread().interrupt();
|
||||
break;
|
||||
}
|
||||
backoff = Math.min(backoff * 2, 1000);
|
||||
}
|
||||
throw new MessagingTimeoutException("Timeout waiting for IBM MQ message from queue: " + queueName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
JMSContext context = jmsContext;
|
||||
if (context != null) {
|
||||
context.close();
|
||||
}
|
||||
}
|
||||
|
||||
private void sendTextMessage(String queueName, String payload, Map<String, String> properties) {
|
||||
JMSContext context = getContext();
|
||||
JMSProducer producer = context.createProducer();
|
||||
TextMessage message = context.createTextMessage(payload);
|
||||
applyProperties(message, properties);
|
||||
producer.send(context.createQueue("queue:///" + queueName), message);
|
||||
}
|
||||
|
||||
private void sendBytesMessage(String queueName,
|
||||
String payload,
|
||||
Charset charset,
|
||||
int ccsid,
|
||||
Map<String, String> properties) {
|
||||
try {
|
||||
JMSContext context = getContext();
|
||||
JMSProducer producer = context.createProducer();
|
||||
BytesMessage message = context.createBytesMessage();
|
||||
message.writeBytes(Optional.ofNullable(payload).orElse("").getBytes(charset));
|
||||
message.setIntProperty(WMQConstants.JMS_IBM_CHARACTER_SET, ccsid);
|
||||
applyProperties(message, properties);
|
||||
producer.send(context.createQueue("queue:///" + queueName), message);
|
||||
} catch (JMSException e) {
|
||||
throw new IllegalStateException("Failed to send bytes message to IBM MQ queue: " + queueName, e);
|
||||
}
|
||||
}
|
||||
|
||||
private void applyProperties(Message message, Map<String, String> properties) {
|
||||
Optional.ofNullable(properties).orElseGet(Collections::emptyMap)
|
||||
.forEach((key, value) -> {
|
||||
try {
|
||||
message.setStringProperty(key, String.valueOf(value));
|
||||
} catch (JMSException e) {
|
||||
throw new IllegalStateException("Failed to set JMS property: " + key, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private ReceivedMessage toReceivedMessage(Message message, String queueName, MqMessageFormat format) {
|
||||
try {
|
||||
Map<String, String> headers = new LinkedHashMap<>();
|
||||
Enumeration<?> names = message.getPropertyNames();
|
||||
while (names.hasMoreElements()) {
|
||||
String name = String.valueOf(names.nextElement());
|
||||
headers.put(name, String.valueOf(message.getObjectProperty(name)));
|
||||
}
|
||||
|
||||
String body = decodeMessage(message, format);
|
||||
MessageContentType contentType = resolveContentType(message, format);
|
||||
return new ReceivedMessage(body, contentType, headers, message.getJMSTimestamp(), queueName);
|
||||
} catch (JMSException e) {
|
||||
throw new IllegalStateException("Failed to decode IBM MQ message", e);
|
||||
}
|
||||
}
|
||||
|
||||
private MessageContentType resolveContentType(Message message, MqMessageFormat expectedFormat) {
|
||||
if (message instanceof TextMessage) {
|
||||
return expectedFormat == MqMessageFormat.XML ? MessageContentType.XML : MessageContentType.JSON;
|
||||
}
|
||||
if (expectedFormat == MqMessageFormat.XML) {
|
||||
return MessageContentType.XML;
|
||||
}
|
||||
if (expectedFormat == MqMessageFormat.JSON) {
|
||||
return MessageContentType.JSON;
|
||||
}
|
||||
return MessageContentType.RAW_TEXT;
|
||||
}
|
||||
|
||||
private String decodeMessage(Message jmsMessage, MqMessageFormat format) {
|
||||
try {
|
||||
if (jmsMessage instanceof TextMessage textMessage) {
|
||||
return textMessage.getText();
|
||||
}
|
||||
|
||||
if (jmsMessage instanceof BytesMessage bytesMessage) {
|
||||
byte[] data = new byte[(int) bytesMessage.getBodyLength()];
|
||||
bytesMessage.readBytes(data);
|
||||
Charset charset = switch (format) {
|
||||
case EBCDIC_870 -> EBCDIC_870;
|
||||
case UTF8_1208, JSON, XML -> UTF_8;
|
||||
};
|
||||
return new String(data, charset);
|
||||
}
|
||||
return "";
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException("Failed to decode JMS message", e);
|
||||
}
|
||||
}
|
||||
|
||||
private JMSContext getContext() {
|
||||
JMSContext current = jmsContext;
|
||||
if (current == null) {
|
||||
synchronized (contextLock) {
|
||||
current = jmsContext;
|
||||
if (current == null) {
|
||||
jmsContext = current = connectionFactory.createContext(user, password, JMSContext.AUTO_ACKNOWLEDGE);
|
||||
}
|
||||
}
|
||||
}
|
||||
return current;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,353 @@
|
||||
package cz.moneta.test.harness.connectors.messaging;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import cz.moneta.test.harness.connectors.Connector;
|
||||
import cz.moneta.test.harness.exception.MessagingTimeoutException;
|
||||
import cz.moneta.test.harness.messaging.model.MessageContentType;
|
||||
import cz.moneta.test.harness.messaging.model.ReceivedMessage;
|
||||
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
|
||||
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
|
||||
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
|
||||
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
|
||||
import io.confluent.kafka.serializers.KafkaAvroSerializer;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.header.Header;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Base64;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Properties;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
public class KafkaConnector implements Connector {
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
private final Properties producerProps = new Properties();
|
||||
private final Properties consumerProps = new Properties();
|
||||
private final CachedSchemaRegistryClient schemaRegistryClient;
|
||||
private volatile KafkaProducer<String, GenericRecord> producer;
|
||||
private final Object producerLock = new Object();
|
||||
|
||||
public KafkaConnector(String bootstrapServers,
|
||||
String apiKey,
|
||||
String apiSecret,
|
||||
String schemaRegistryUrl,
|
||||
String schemaRegistryApiKey,
|
||||
String schemaRegistryApiSecret) {
|
||||
String jaasConfig = String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
|
||||
apiKey,
|
||||
apiSecret);
|
||||
String schemaRegistryAuth = schemaRegistryApiKey + ":" + schemaRegistryApiSecret;
|
||||
|
||||
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
||||
producerProps.put("security.protocol", "SASL_SSL");
|
||||
producerProps.put("sasl.mechanism", "PLAIN");
|
||||
producerProps.put("sasl.jaas.config", jaasConfig);
|
||||
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
|
||||
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
|
||||
producerProps.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
|
||||
producerProps.put(AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
|
||||
producerProps.put(AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG, schemaRegistryAuth);
|
||||
producerProps.put("auto.register.schemas", "false");
|
||||
|
||||
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
||||
consumerProps.put("security.protocol", "SASL_SSL");
|
||||
consumerProps.put("sasl.mechanism", "PLAIN");
|
||||
consumerProps.put("sasl.jaas.config", jaasConfig);
|
||||
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
|
||||
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
|
||||
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
|
||||
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
|
||||
consumerProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "false");
|
||||
consumerProps.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
|
||||
consumerProps.put(AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
|
||||
consumerProps.put(AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG, schemaRegistryAuth);
|
||||
|
||||
this.schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128);
|
||||
}
|
||||
|
||||
public void send(String topic, String key, String jsonPayload, Map<String, String> headers) {
|
||||
Objects.requireNonNull(topic, "topic");
|
||||
Schema schema = getSchemaForTopic(topic);
|
||||
GenericRecord record = jsonToAvro(jsonPayload, schema);
|
||||
ProducerRecord<String, GenericRecord> producerRecord = new ProducerRecord<>(topic, key, record);
|
||||
Optional.ofNullable(headers).orElseGet(HashMap::new)
|
||||
.forEach((headerKey, headerValue) -> producerRecord.headers()
|
||||
.add(headerKey, String.valueOf(headerValue).getBytes(StandardCharsets.UTF_8)));
|
||||
|
||||
try {
|
||||
getProducer().send(producerRecord).get(30, TimeUnit.SECONDS);
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException("Failed to send Kafka message to topic: " + topic, e);
|
||||
}
|
||||
}
|
||||
|
||||
public List<ReceivedMessage> receive(String topic,
|
||||
Predicate<ReceivedMessage> filter,
|
||||
Duration timeout) {
|
||||
long timeoutMillis = Optional.ofNullable(timeout).orElse(Duration.ofSeconds(30)).toMillis();
|
||||
long deadline = System.currentTimeMillis() + timeoutMillis;
|
||||
long backoff = 100;
|
||||
|
||||
try (KafkaConsumer<String, Object> consumer = createConsumer()) {
|
||||
List<TopicPartition> partitions = consumer.partitionsFor(topic).stream()
|
||||
.map(info -> new TopicPartition(topic, info.partition()))
|
||||
.toList();
|
||||
consumer.assign(partitions);
|
||||
consumer.seekToEnd(partitions);
|
||||
|
||||
while (System.currentTimeMillis() < deadline) {
|
||||
ConsumerRecords<String, Object> records = consumer.poll(Duration.ofMillis(100));
|
||||
if (!records.isEmpty()) {
|
||||
for (ConsumerRecord<String, Object> record : records) {
|
||||
ReceivedMessage message = toReceivedMessage(record);
|
||||
if (filter == null || filter.test(message)) {
|
||||
return List.of(message);
|
||||
}
|
||||
}
|
||||
backoff = 100;
|
||||
continue;
|
||||
}
|
||||
TimeUnit.MILLISECONDS.sleep(backoff);
|
||||
backoff = Math.min(backoff * 2, 1000);
|
||||
}
|
||||
} catch (MessagingTimeoutException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException("Failed to receive Kafka message from topic: " + topic, e);
|
||||
}
|
||||
throw new MessagingTimeoutException("Timeout waiting for Kafka message from topic: " + topic);
|
||||
}
|
||||
|
||||
public Map<TopicPartition, Long> saveOffsets(String topic) {
|
||||
try (KafkaConsumer<String, Object> consumer = createConsumer()) {
|
||||
List<TopicPartition> partitions = consumer.partitionsFor(topic).stream()
|
||||
.map(info -> new TopicPartition(topic, info.partition()))
|
||||
.toList();
|
||||
consumer.assign(partitions);
|
||||
consumer.seekToEnd(partitions);
|
||||
Map<TopicPartition, Long> offsets = new HashMap<>();
|
||||
partitions.forEach(partition -> offsets.put(partition, consumer.position(partition)));
|
||||
return offsets;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
KafkaProducer<String, GenericRecord> current = producer;
|
||||
if (current != null) {
|
||||
current.close(Duration.ofSeconds(5));
|
||||
}
|
||||
}
|
||||
|
||||
private KafkaProducer<String, GenericRecord> getProducer() {
|
||||
KafkaProducer<String, GenericRecord> current = producer;
|
||||
if (current == null) {
|
||||
synchronized (producerLock) {
|
||||
current = producer;
|
||||
if (current == null) {
|
||||
producer = current = new KafkaProducer<>(producerProps);
|
||||
}
|
||||
}
|
||||
}
|
||||
return current;
|
||||
}
|
||||
|
||||
private KafkaConsumer<String, Object> createConsumer() {
|
||||
Properties properties = new Properties();
|
||||
properties.putAll(consumerProps);
|
||||
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "harness-" + UUID.randomUUID());
|
||||
return new KafkaConsumer<>(properties);
|
||||
}
|
||||
|
||||
private ReceivedMessage toReceivedMessage(ConsumerRecord<String, Object> record) {
|
||||
String body = convertValueToJson(record.value());
|
||||
Map<String, String> headers = new LinkedHashMap<>();
|
||||
for (Header header : record.headers()) {
|
||||
headers.put(header.key(), new String(header.value(), StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
return new ReceivedMessage(body,
|
||||
MessageContentType.JSON,
|
||||
headers,
|
||||
record.timestamp(),
|
||||
record.topic());
|
||||
}
|
||||
|
||||
private String convertValueToJson(Object value) {
|
||||
try {
|
||||
if (value instanceof GenericRecord genericRecord) {
|
||||
return avroToJson(genericRecord);
|
||||
}
|
||||
if (value instanceof CharSequence) {
|
||||
return value.toString();
|
||||
}
|
||||
return OBJECT_MAPPER.writeValueAsString(value);
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException("Failed to convert Kafka payload to JSON", e);
|
||||
}
|
||||
}
|
||||
|
||||
private Schema getSchemaForTopic(String topic) {
|
||||
String subject = topic + "-value";
|
||||
try {
|
||||
// Get all versions and use the latest one
|
||||
java.util.List<Integer> versions = schemaRegistryClient.getAllVersions(subject);
|
||||
int latestVersion = versions.get(versions.size() - 1);
|
||||
io.confluent.kafka.schemaregistry.client.rest.entities.Schema confluentSchema =
|
||||
schemaRegistryClient.getByVersion(subject, latestVersion, false);
|
||||
String schemaString = confluentSchema.getSchema();
|
||||
return new Schema.Parser().parse(schemaString);
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException("Failed to get schema for subject: " + subject, e);
|
||||
}
|
||||
}
|
||||
|
||||
private String avroToJson(GenericRecord record) {
|
||||
try {
|
||||
return OBJECT_MAPPER.writeValueAsString(convertAvroObject(record));
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException("Failed to convert Avro record to JSON", e);
|
||||
}
|
||||
}
|
||||
|
||||
private GenericRecord jsonToAvro(String jsonPayload, Schema schema) {
|
||||
try {
|
||||
JsonNode root = OBJECT_MAPPER.readTree(jsonPayload);
|
||||
Object converted = convertJsonNode(root, schema);
|
||||
return (GenericRecord) converted;
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException("Failed to convert JSON payload to Avro", e);
|
||||
}
|
||||
}
|
||||
|
||||
private Object convertJsonNode(JsonNode node, Schema schema) {
|
||||
return switch (schema.getType()) {
|
||||
case RECORD -> {
|
||||
GenericData.Record record = new GenericData.Record(schema);
|
||||
schema.getFields().forEach(field -> record.put(field.name(),
|
||||
convertJsonNode(node.path(field.name()), field.schema())));
|
||||
yield record;
|
||||
}
|
||||
case ARRAY -> {
|
||||
List<Object> values = new ArrayList<>();
|
||||
node.forEach(item -> values.add(convertJsonNode(item, schema.getElementType())));
|
||||
yield values;
|
||||
}
|
||||
case MAP -> {
|
||||
Map<String, Object> map = new HashMap<>();
|
||||
node.fields().forEachRemaining(entry -> map.put(entry.getKey(),
|
||||
convertJsonNode(entry.getValue(), schema.getValueType())));
|
||||
yield map;
|
||||
}
|
||||
case UNION -> resolveUnion(node, schema);
|
||||
case ENUM -> new GenericData.EnumSymbol(schema, node.asText());
|
||||
case FIXED -> {
|
||||
byte[] fixedBytes = toBytes(node);
|
||||
yield new GenericData.Fixed(schema, fixedBytes);
|
||||
}
|
||||
case STRING -> node.isNull() ? null : node.asText();
|
||||
case INT -> node.isNull() ? null : node.asInt();
|
||||
case LONG -> node.isNull() ? null : node.asLong();
|
||||
case FLOAT -> node.isNull() ? null : (float) node.asDouble();
|
||||
case DOUBLE -> node.isNull() ? null : node.asDouble();
|
||||
case BOOLEAN -> node.isNull() ? null : node.asBoolean();
|
||||
case BYTES -> ByteBuffer.wrap(toBytes(node));
|
||||
case NULL -> null;
|
||||
};
|
||||
}
|
||||
|
||||
private Object resolveUnion(JsonNode node, Schema unionSchema) {
|
||||
if (node == null || node.isNull()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
IllegalStateException lastException = null;
|
||||
for (Schema candidate : unionSchema.getTypes()) {
|
||||
if (candidate.getType() == Schema.Type.NULL) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
Object value = convertJsonNode(node, candidate);
|
||||
if (GenericData.get().validate(candidate, value)) {
|
||||
return value;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
lastException = new IllegalStateException("Failed to resolve union type", e);
|
||||
}
|
||||
}
|
||||
|
||||
if (lastException != null) {
|
||||
throw lastException;
|
||||
}
|
||||
throw new IllegalStateException("Cannot resolve union for node: " + node);
|
||||
}
|
||||
|
||||
private byte[] toBytes(JsonNode node) {
|
||||
if (node.isBinary()) {
|
||||
try {
|
||||
return node.binaryValue();
|
||||
} catch (Exception ignored) {
|
||||
// fallback to textual representation
|
||||
}
|
||||
}
|
||||
String text = node.asText();
|
||||
try {
|
||||
return Base64.getDecoder().decode(text);
|
||||
} catch (Exception ignored) {
|
||||
return text.getBytes(StandardCharsets.UTF_8);
|
||||
}
|
||||
}
|
||||
|
||||
private Object convertAvroObject(Object value) {
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
if (value instanceof GenericRecord record) {
|
||||
Map<String, Object> jsonObject = new LinkedHashMap<>();
|
||||
record.getSchema().getFields().forEach(field -> jsonObject.put(field.name(), convertAvroObject(record.get(field.name()))));
|
||||
return jsonObject;
|
||||
}
|
||||
if (value instanceof GenericData.Array<?> array) {
|
||||
List<Object> values = new ArrayList<>(array.size());
|
||||
array.forEach(item -> values.add(convertAvroObject(item)));
|
||||
return values;
|
||||
}
|
||||
if (value instanceof Map<?, ?> map) {
|
||||
Map<String, Object> values = new LinkedHashMap<>();
|
||||
map.forEach((key, item) -> values.put(String.valueOf(key), convertAvroObject(item)));
|
||||
return values;
|
||||
}
|
||||
if (value instanceof ByteBuffer byteBuffer) {
|
||||
ByteBuffer duplicate = byteBuffer.duplicate();
|
||||
byte[] bytes = new byte[duplicate.remaining()];
|
||||
duplicate.get(bytes);
|
||||
return Base64.getEncoder().encodeToString(bytes);
|
||||
}
|
||||
return value;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,123 @@
|
||||
package cz.moneta.test.harness.endpoints.messaging;
|
||||
|
||||
import cz.moneta.test.harness.connectors.VaultConnector;
|
||||
import cz.moneta.test.harness.connectors.messaging.IbmMqConnector;
|
||||
import cz.moneta.test.harness.constants.HarnessConfigConstants;
|
||||
import cz.moneta.test.harness.context.StoreAccessor;
|
||||
import cz.moneta.test.harness.endpoints.Endpoint;
|
||||
import cz.moneta.test.harness.messaging.model.MqMessageFormat;
|
||||
import cz.moneta.test.harness.messaging.model.ReceivedMessage;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
public class IbmMqEndpoint implements Endpoint {
|
||||
|
||||
private static final String CONFIG_HOST = "messaging.ibmmq.host";
|
||||
private static final String CONFIG_PORT = "messaging.ibmmq.port";
|
||||
private static final String CONFIG_CHANNEL = "messaging.ibmmq.channel";
|
||||
private static final String CONFIG_QUEUE_MANAGER = "messaging.ibmmq.queue-manager";
|
||||
private static final String CONFIG_KEYSTORE_PATH = "messaging.ibmmq.keystore.path";
|
||||
private static final String CONFIG_KEYSTORE_PASSWORD = "messaging.ibmmq.keystore.password";
|
||||
private static final String CONFIG_VAULT_PATH = "vault.path.messaging.ibmmq";
|
||||
private static final String CONFIG_USERNAME = "messaging.ibmmq.username";
|
||||
private static final String CONFIG_PASSWORD = "messaging.ibmmq.password";
|
||||
|
||||
private final StoreAccessor store;
|
||||
private volatile IbmMqConnector connector;
|
||||
private final Object connectorLock = new Object();
|
||||
|
||||
public IbmMqEndpoint(StoreAccessor store) {
|
||||
this.store = store;
|
||||
}
|
||||
|
||||
public void send(String queueName,
|
||||
String payload,
|
||||
MqMessageFormat format,
|
||||
Map<String, String> properties) {
|
||||
getConnector().send(queueName, payload, format, properties);
|
||||
}
|
||||
|
||||
public ReceivedMessage receive(String queueName,
|
||||
String messageSelector,
|
||||
MqMessageFormat expectedFormat,
|
||||
Duration timeout) {
|
||||
return getConnector().receive(queueName, messageSelector, expectedFormat, timeout);
|
||||
}
|
||||
|
||||
public List<ReceivedMessage> browse(String queueName,
|
||||
Predicate<ReceivedMessage> filter,
|
||||
MqMessageFormat expectedFormat,
|
||||
Duration timeout) {
|
||||
return getConnector().browse(queueName, filter, expectedFormat, timeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
IbmMqConnector current = connector;
|
||||
if (current != null) {
|
||||
current.close();
|
||||
}
|
||||
}
|
||||
|
||||
private IbmMqConnector getConnector() {
|
||||
IbmMqConnector current = connector;
|
||||
if (current == null) {
|
||||
synchronized (connectorLock) {
|
||||
current = connector;
|
||||
if (current == null) {
|
||||
current = createConnector();
|
||||
connector = current;
|
||||
}
|
||||
}
|
||||
}
|
||||
return current;
|
||||
}
|
||||
|
||||
private IbmMqConnector createConnector() {
|
||||
String host = requireConfig(CONFIG_HOST);
|
||||
int port = Integer.parseInt(requireConfig(CONFIG_PORT));
|
||||
String channel = requireConfig(CONFIG_CHANNEL);
|
||||
String queueManager = requireConfig(CONFIG_QUEUE_MANAGER);
|
||||
String username = resolveSecret(CONFIG_USERNAME, "username");
|
||||
String password = resolveSecret(CONFIG_PASSWORD, "password");
|
||||
String keystorePath = store.getConfig(CONFIG_KEYSTORE_PATH);
|
||||
String keystorePassword = store.getConfig(CONFIG_KEYSTORE_PASSWORD);
|
||||
|
||||
return new IbmMqConnector(host, port, channel, queueManager, username, password, keystorePath, keystorePassword);
|
||||
}
|
||||
|
||||
private String resolveSecret(String localConfigKey, String vaultKey) {
|
||||
return Optional.ofNullable(store.getConfig(localConfigKey))
|
||||
.or(() -> readFromVault(vaultKey))
|
||||
.orElseThrow(() -> new IllegalStateException("Missing messaging secret: " + localConfigKey));
|
||||
}
|
||||
|
||||
private Optional<String> readFromVault(String key) {
|
||||
String path = store.getConfig(CONFIG_VAULT_PATH);
|
||||
if (path == null || path.isBlank()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
String basePath = store.getConfig("vault.path.base");
|
||||
String resolvedPath = path.startsWith("/") || basePath == null || basePath.isBlank()
|
||||
? path
|
||||
: basePath + "/" + path;
|
||||
|
||||
return createVaultConnector().flatMap(vault -> vault.getValue(resolvedPath, key));
|
||||
}
|
||||
|
||||
private Optional<VaultConnector> createVaultConnector() {
|
||||
return Optional.ofNullable(store.getConfig(HarnessConfigConstants.VAULT_URL_CONFIG))
|
||||
.flatMap(url -> Optional.ofNullable(store.getConfig(HarnessConfigConstants.VAULT_USERNAME_CONFIG))
|
||||
.flatMap(username -> Optional.ofNullable(store.getConfig(HarnessConfigConstants.VAULT_PASSWORD_CONFIG))
|
||||
.map(password -> new VaultConnector(url, username, password))));
|
||||
}
|
||||
|
||||
private String requireConfig(String key) {
|
||||
return Optional.ofNullable(store.getConfig(key))
|
||||
.orElseThrow(() -> new IllegalStateException("Missing required config: " + key));
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,119 @@
|
||||
package cz.moneta.test.harness.endpoints.messaging;
|
||||
|
||||
import cz.moneta.test.harness.connectors.VaultConnector;
|
||||
import cz.moneta.test.harness.connectors.messaging.KafkaConnector;
|
||||
import cz.moneta.test.harness.constants.HarnessConfigConstants;
|
||||
import cz.moneta.test.harness.context.StoreAccessor;
|
||||
import cz.moneta.test.harness.endpoints.Endpoint;
|
||||
import cz.moneta.test.harness.messaging.model.ReceivedMessage;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
public class KafkaEndpoint implements Endpoint {
|
||||
|
||||
private static final String CONFIG_BOOTSTRAP_SERVERS = "messaging.kafka.bootstrap-servers";
|
||||
private static final String CONFIG_SCHEMA_REGISTRY_URL = "messaging.kafka.schema-registry-url";
|
||||
private static final String CONFIG_VAULT_PATH = "vault.path.messaging.kafka";
|
||||
|
||||
private static final String CONFIG_API_KEY = "messaging.kafka.api-key";
|
||||
private static final String CONFIG_API_SECRET = "messaging.kafka.api-secret";
|
||||
private static final String CONFIG_SCHEMA_REGISTRY_API_KEY = "messaging.kafka.schema-registry-api-key";
|
||||
private static final String CONFIG_SCHEMA_REGISTRY_API_SECRET = "messaging.kafka.schema-registry-api-secret";
|
||||
|
||||
private static final String VAULT_API_KEY = "apiKey";
|
||||
private static final String VAULT_API_SECRET = "apiSecret";
|
||||
private static final String VAULT_SCHEMA_REGISTRY_API_KEY = "schemaRegistryApiKey";
|
||||
private static final String VAULT_SCHEMA_REGISTRY_API_SECRET = "schemaRegistryApiSecret";
|
||||
|
||||
private final StoreAccessor store;
|
||||
private volatile KafkaConnector connector;
|
||||
private final Object connectorLock = new Object();
|
||||
|
||||
public KafkaEndpoint(StoreAccessor store) {
|
||||
this.store = store;
|
||||
}
|
||||
|
||||
public void send(String topic, String key, String payload, Map<String, String> headers) {
|
||||
getConnector().send(topic, key, payload, headers);
|
||||
}
|
||||
|
||||
public ReceivedMessage receive(String topic, Predicate<ReceivedMessage> filter, Duration timeout) {
|
||||
List<ReceivedMessage> messages = getConnector().receive(topic, filter, timeout);
|
||||
return messages.isEmpty() ? null : messages.get(0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
KafkaConnector current = connector;
|
||||
if (current != null) {
|
||||
current.close();
|
||||
}
|
||||
}
|
||||
|
||||
private KafkaConnector getConnector() {
|
||||
KafkaConnector current = connector;
|
||||
if (current == null) {
|
||||
synchronized (connectorLock) {
|
||||
current = connector;
|
||||
if (current == null) {
|
||||
current = createConnector();
|
||||
connector = current;
|
||||
}
|
||||
}
|
||||
}
|
||||
return current;
|
||||
}
|
||||
|
||||
private KafkaConnector createConnector() {
|
||||
String bootstrapServers = requireConfig(CONFIG_BOOTSTRAP_SERVERS);
|
||||
String schemaRegistryUrl = requireConfig(CONFIG_SCHEMA_REGISTRY_URL);
|
||||
String apiKey = resolveSecret(CONFIG_API_KEY, VAULT_API_KEY);
|
||||
String apiSecret = resolveSecret(CONFIG_API_SECRET, VAULT_API_SECRET);
|
||||
String schemaRegistryApiKey = resolveSecret(CONFIG_SCHEMA_REGISTRY_API_KEY, VAULT_SCHEMA_REGISTRY_API_KEY);
|
||||
String schemaRegistryApiSecret = resolveSecret(CONFIG_SCHEMA_REGISTRY_API_SECRET, VAULT_SCHEMA_REGISTRY_API_SECRET);
|
||||
|
||||
return new KafkaConnector(
|
||||
bootstrapServers,
|
||||
apiKey,
|
||||
apiSecret,
|
||||
schemaRegistryUrl,
|
||||
schemaRegistryApiKey,
|
||||
schemaRegistryApiSecret
|
||||
);
|
||||
}
|
||||
|
||||
private String resolveSecret(String localConfigKey, String vaultKey) {
|
||||
return Optional.ofNullable(store.getConfig(localConfigKey))
|
||||
.or(() -> readFromVault(vaultKey))
|
||||
.orElseThrow(() -> new IllegalStateException("Missing messaging secret: " + localConfigKey));
|
||||
}
|
||||
|
||||
private Optional<String> readFromVault(String key) {
|
||||
String path = store.getConfig(CONFIG_VAULT_PATH);
|
||||
if (path == null || path.isBlank()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
String basePath = store.getConfig("vault.path.base");
|
||||
String resolvedPath = path.startsWith("/") || basePath == null || basePath.isBlank()
|
||||
? path
|
||||
: basePath + "/" + path;
|
||||
|
||||
return createVaultConnector().flatMap(vault -> vault.getValue(resolvedPath, key));
|
||||
}
|
||||
|
||||
private Optional<VaultConnector> createVaultConnector() {
|
||||
return Optional.ofNullable(store.getConfig(HarnessConfigConstants.VAULT_URL_CONFIG))
|
||||
.flatMap(url -> Optional.ofNullable(store.getConfig(HarnessConfigConstants.VAULT_USERNAME_CONFIG))
|
||||
.flatMap(username -> Optional.ofNullable(store.getConfig(HarnessConfigConstants.VAULT_PASSWORD_CONFIG))
|
||||
.map(password -> new VaultConnector(url, username, password))));
|
||||
}
|
||||
|
||||
private String requireConfig(String key) {
|
||||
return Optional.ofNullable(store.getConfig(key))
|
||||
.orElseThrow(() -> new IllegalStateException("Missing required config: " + key));
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,130 @@
|
||||
package cz.moneta.test.harness.endpoints.messaging;
|
||||
|
||||
import cz.moneta.test.harness.context.StoreAccessor;
|
||||
import cz.moneta.test.harness.endpoints.Endpoint;
|
||||
import cz.moneta.test.harness.exception.MessagingTimeoutException;
|
||||
import cz.moneta.test.harness.messaging.model.Destination;
|
||||
import cz.moneta.test.harness.messaging.model.MqMessageFormat;
|
||||
import cz.moneta.test.harness.messaging.model.Queue;
|
||||
import cz.moneta.test.harness.messaging.model.ReceivedMessage;
|
||||
import cz.moneta.test.harness.messaging.model.Topic;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
public class MessagingEndpoint implements Endpoint {
|
||||
|
||||
private final StoreAccessor store;
|
||||
private volatile KafkaEndpoint kafkaEndpoint;
|
||||
private volatile IbmMqEndpoint ibmMqEndpoint;
|
||||
private final Object endpointLock = new Object();
|
||||
|
||||
public MessagingEndpoint(StoreAccessor store) {
|
||||
this.store = store;
|
||||
}
|
||||
|
||||
public void send(String destinationName,
|
||||
String key,
|
||||
String payload,
|
||||
MqMessageFormat formatOverride,
|
||||
Map<String, String> headers) {
|
||||
Destination destination = resolveDestination(destinationName);
|
||||
if (destination instanceof Topic topic) {
|
||||
getKafkaEndpoint().send(topic.getTopicName(), key, payload, headers);
|
||||
return;
|
||||
}
|
||||
|
||||
Queue queue = (Queue) destination;
|
||||
MqMessageFormat format = Optional.ofNullable(formatOverride).orElse(queue.getFormat());
|
||||
getIbmMqEndpoint().send(queue.getQueueName(), payload, format, headers);
|
||||
}
|
||||
|
||||
public ReceivedMessage receive(String destinationName,
|
||||
Predicate<ReceivedMessage> filter,
|
||||
Duration timeout,
|
||||
MqMessageFormat formatOverride) {
|
||||
Destination destination = resolveDestination(destinationName);
|
||||
if (destination instanceof Topic topic) {
|
||||
return getKafkaEndpoint().receive(topic.getTopicName(), filter, timeout);
|
||||
}
|
||||
|
||||
Queue queue = (Queue) destination;
|
||||
MqMessageFormat format = Optional.ofNullable(formatOverride).orElse(queue.getFormat());
|
||||
List<ReceivedMessage> messages = getIbmMqEndpoint().browse(queue.getQueueName(), filter, format, timeout);
|
||||
if (messages.isEmpty()) {
|
||||
throw new MessagingTimeoutException("No IBM MQ message found for destination: " + destinationName);
|
||||
}
|
||||
return messages.get(0);
|
||||
}
|
||||
|
||||
public Destination resolveDestination(String destinationName) {
|
||||
String prefix = "messaging.destination." + destinationName + ".";
|
||||
String type = Optional.ofNullable(store.getConfig(prefix + "type"))
|
||||
.map(v -> v.toLowerCase(Locale.ROOT))
|
||||
.orElseThrow(() -> new IllegalStateException("Missing destination config: " + prefix + "type"));
|
||||
|
||||
return switch (type) {
|
||||
case "kafka" -> {
|
||||
String topic = requireConfig(prefix + "topic");
|
||||
yield new Topic(destinationName, topic);
|
||||
}
|
||||
case "ibmmq" -> {
|
||||
String queue = requireConfig(prefix + "queue");
|
||||
String format = store.getConfig(prefix + "format", MqMessageFormat.JSON.name().toLowerCase(Locale.ROOT));
|
||||
yield new Queue(destinationName, queue, MqMessageFormat.fromConfig(format, MqMessageFormat.JSON));
|
||||
}
|
||||
default -> throw new IllegalStateException("Unsupported destination type '" + type + "' for destination: " + destinationName);
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
KafkaEndpoint kafka = kafkaEndpoint;
|
||||
if (kafka != null) {
|
||||
kafka.close();
|
||||
}
|
||||
IbmMqEndpoint mq = ibmMqEndpoint;
|
||||
if (mq != null) {
|
||||
mq.close();
|
||||
}
|
||||
}
|
||||
|
||||
private KafkaEndpoint getKafkaEndpoint() {
|
||||
KafkaEndpoint current = kafkaEndpoint;
|
||||
if (current == null) {
|
||||
synchronized (endpointLock) {
|
||||
current = kafkaEndpoint;
|
||||
if (current == null) {
|
||||
current = new KafkaEndpoint(store);
|
||||
kafkaEndpoint = current;
|
||||
}
|
||||
}
|
||||
}
|
||||
return current;
|
||||
}
|
||||
|
||||
private IbmMqEndpoint getIbmMqEndpoint() {
|
||||
IbmMqEndpoint current = ibmMqEndpoint;
|
||||
if (current == null) {
|
||||
synchronized (endpointLock) {
|
||||
current = ibmMqEndpoint;
|
||||
if (current == null) {
|
||||
current = new IbmMqEndpoint(store);
|
||||
ibmMqEndpoint = current;
|
||||
}
|
||||
}
|
||||
}
|
||||
return current;
|
||||
}
|
||||
|
||||
private String requireConfig(String key) {
|
||||
return Optional.ofNullable(store.getConfig(key))
|
||||
.filter(v -> !Objects.equals(v.trim(), ""))
|
||||
.orElseThrow(() -> new IllegalStateException("Missing required config: " + key));
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,8 @@
|
||||
package cz.moneta.test.harness.exception;
|
||||
|
||||
public class MessagingTimeoutException extends HarnessException {
|
||||
|
||||
public MessagingTimeoutException(String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,22 @@
|
||||
package cz.moneta.test.harness.messaging.model;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
public abstract class Destination {
|
||||
|
||||
private final String name;
|
||||
private final String type;
|
||||
|
||||
protected Destination(String name, String type) {
|
||||
this.name = Objects.requireNonNull(name, "name");
|
||||
this.type = Objects.requireNonNull(type, "type");
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public String getType() {
|
||||
return type;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,7 @@
|
||||
package cz.moneta.test.harness.messaging.model;
|
||||
|
||||
public enum MessageContentType {
|
||||
JSON,
|
||||
XML,
|
||||
RAW_TEXT
|
||||
}
|
||||
@ -0,0 +1,18 @@
|
||||
package cz.moneta.test.harness.messaging.model;
|
||||
|
||||
import java.util.Locale;
|
||||
import java.util.Optional;
|
||||
|
||||
public enum MqMessageFormat {
|
||||
JSON,
|
||||
XML,
|
||||
EBCDIC_870,
|
||||
UTF8_1208;
|
||||
|
||||
public static MqMessageFormat fromConfig(String value, MqMessageFormat defaultValue) {
|
||||
return Optional.ofNullable(value)
|
||||
.map(v -> v.toUpperCase(Locale.ROOT).replace('-', '_'))
|
||||
.map(MqMessageFormat::valueOf)
|
||||
.orElse(defaultValue);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,23 @@
|
||||
package cz.moneta.test.harness.messaging.model;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
public class Queue extends Destination {
|
||||
|
||||
private final String queueName;
|
||||
private final MqMessageFormat format;
|
||||
|
||||
public Queue(String name, String queueName, MqMessageFormat format) {
|
||||
super(name, "ibmmq");
|
||||
this.queueName = Objects.requireNonNull(queueName, "queueName");
|
||||
this.format = Objects.requireNonNull(format, "format");
|
||||
}
|
||||
|
||||
public String getQueueName() {
|
||||
return queueName;
|
||||
}
|
||||
|
||||
public MqMessageFormat getFormat() {
|
||||
return format;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,167 @@
|
||||
package cz.moneta.test.harness.messaging.model;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.node.MissingNode;
|
||||
import com.fasterxml.jackson.dataformat.xml.XmlMapper;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import javax.xml.XMLConstants;
|
||||
import javax.xml.namespace.NamespaceContext;
|
||||
import javax.xml.parsers.DocumentBuilder;
|
||||
import javax.xml.parsers.DocumentBuilderFactory;
|
||||
import javax.xml.xpath.XPath;
|
||||
import javax.xml.xpath.XPathConstants;
|
||||
import javax.xml.xpath.XPathFactory;
|
||||
import java.io.StringReader;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.w3c.dom.Document;
|
||||
import org.xml.sax.InputSource;
|
||||
|
||||
public class ReceivedMessage {
|
||||
|
||||
private static final Pattern ARRAY_NODE_PATTERN = Pattern.compile("(.*?)\\[([0-9]*?)\\]");
|
||||
private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
|
||||
private static final XmlMapper XML_MAPPER = new XmlMapper();
|
||||
|
||||
private final String body;
|
||||
private final MessageContentType contentType;
|
||||
private final Map<String, String> headers;
|
||||
private final long timestamp;
|
||||
private final String source;
|
||||
|
||||
public ReceivedMessage(String body,
|
||||
MessageContentType contentType,
|
||||
Map<String, String> headers,
|
||||
long timestamp,
|
||||
String source) {
|
||||
this.body = Optional.ofNullable(body).orElse("");
|
||||
this.contentType = Objects.requireNonNull(contentType, "contentType");
|
||||
this.headers = Collections.unmodifiableMap(new LinkedHashMap<>(Optional.ofNullable(headers).orElseGet(Collections::emptyMap)));
|
||||
this.timestamp = timestamp;
|
||||
this.source = Optional.ofNullable(source).orElse("");
|
||||
}
|
||||
|
||||
public JsonNode extractJson(String path) {
|
||||
JsonNode root = readJsonLikeNode();
|
||||
return extractNode(path, root);
|
||||
}
|
||||
|
||||
public String extractXml(String xpathExpression) {
|
||||
try {
|
||||
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
|
||||
factory.setFeature(XMLConstants.FEATURE_SECURE_PROCESSING, true);
|
||||
factory.setNamespaceAware(false);
|
||||
DocumentBuilder builder = factory.newDocumentBuilder();
|
||||
Document document = builder.parse(new InputSource(new StringReader(body)));
|
||||
XPath xpath = XPathFactory.newInstance().newXPath();
|
||||
xpath.setNamespaceContext(new EmptyNamespaceContext());
|
||||
return String.valueOf(xpath.evaluate(xpathExpression, document, XPathConstants.STRING));
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException("Failed to extract xml value for expression: " + xpathExpression, e);
|
||||
}
|
||||
}
|
||||
|
||||
public String extract(String expression) {
|
||||
return switch (contentType) {
|
||||
case JSON -> extractJson(expression).asText();
|
||||
case XML -> extractXml(expression);
|
||||
case RAW_TEXT -> body;
|
||||
};
|
||||
}
|
||||
|
||||
public <T> T mapTo(Class<T> type) {
|
||||
try {
|
||||
return switch (contentType) {
|
||||
case JSON -> JSON_MAPPER.readValue(body, type);
|
||||
case XML -> XML_MAPPER.readValue(body, type);
|
||||
case RAW_TEXT -> {
|
||||
if (String.class.equals(type)) {
|
||||
yield type.cast(body);
|
||||
}
|
||||
throw new IllegalStateException("RAW_TEXT can only be mapped to String");
|
||||
}
|
||||
};
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException("Failed to map message body", e);
|
||||
}
|
||||
}
|
||||
|
||||
public String getBody() {
|
||||
return body;
|
||||
}
|
||||
|
||||
public MessageContentType getContentType() {
|
||||
return contentType;
|
||||
}
|
||||
|
||||
public Map<String, String> getHeaders() {
|
||||
return headers;
|
||||
}
|
||||
|
||||
public long getTimestamp() {
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
public String getSource() {
|
||||
return source;
|
||||
}
|
||||
|
||||
private JsonNode readJsonLikeNode() {
|
||||
try {
|
||||
return switch (contentType) {
|
||||
case JSON -> JSON_MAPPER.readTree(body);
|
||||
case XML -> XML_MAPPER.readTree(body.getBytes());
|
||||
case RAW_TEXT -> {
|
||||
if (StringUtils.isBlank(body)) {
|
||||
yield MissingNode.getInstance();
|
||||
}
|
||||
yield JSON_MAPPER.readTree(body);
|
||||
}
|
||||
};
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException("Unable to parse message as JSON-like content", e);
|
||||
}
|
||||
}
|
||||
|
||||
private static JsonNode extractNode(String path, JsonNode rootNode) {
|
||||
return Arrays.stream(path.split("\\."))
|
||||
.filter(StringUtils::isNotEmpty)
|
||||
.reduce(rootNode,
|
||||
(r, p) -> {
|
||||
Matcher matcher = ARRAY_NODE_PATTERN.matcher(p);
|
||||
if (matcher.find()) {
|
||||
return r.path(matcher.group(1)).path(Integer.parseInt(matcher.group(2)));
|
||||
}
|
||||
return r.path(p);
|
||||
},
|
||||
(j1, j2) -> j1);
|
||||
}
|
||||
|
||||
private static final class EmptyNamespaceContext implements NamespaceContext {
|
||||
|
||||
@Override
|
||||
public String getNamespaceURI(String prefix) {
|
||||
return XMLConstants.NULL_NS_URI;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPrefix(String namespaceURI) {
|
||||
return "";
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<String> getPrefixes(String namespaceURI) {
|
||||
return Collections.emptyIterator();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,17 @@
|
||||
package cz.moneta.test.harness.messaging.model;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
public class Topic extends Destination {
|
||||
|
||||
private final String topicName;
|
||||
|
||||
public Topic(String name, String topicName) {
|
||||
super(name, "kafka");
|
||||
this.topicName = Objects.requireNonNull(topicName, "topicName");
|
||||
}
|
||||
|
||||
public String getTopicName() {
|
||||
return topicName;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,193 @@
|
||||
package cz.moneta.test.harness.support.messaging;
|
||||
|
||||
import cz.moneta.test.harness.endpoints.messaging.MessagingEndpoint;
|
||||
import cz.moneta.test.harness.messaging.model.MqMessageFormat;
|
||||
import cz.moneta.test.harness.messaging.model.ReceivedMessage;
|
||||
import cz.moneta.test.harness.support.util.FileReader;
|
||||
import cz.moneta.test.harness.support.util.Template;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
public final class MessagingRequest {
|
||||
|
||||
private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(30);
|
||||
|
||||
private final MessagingEndpoint endpoint;
|
||||
private String destinationName;
|
||||
private String key;
|
||||
private String payload;
|
||||
private Duration timeout = DEFAULT_TIMEOUT;
|
||||
private final Map<String, String> headers = new LinkedHashMap<>();
|
||||
private MqMessageFormat formatOverride;
|
||||
private Predicate<ReceivedMessage> receiveFilter;
|
||||
private ReceivedMessage receivedMessage;
|
||||
private boolean receivePending;
|
||||
private Mode mode = Mode.UNSET;
|
||||
|
||||
private MessagingRequest(MessagingEndpoint endpoint) {
|
||||
this.endpoint = endpoint;
|
||||
}
|
||||
|
||||
public static MessagingRequest builder(MessagingEndpoint endpoint) {
|
||||
return new MessagingRequest(endpoint);
|
||||
}
|
||||
|
||||
public MessagingRequest to(String destinationName) {
|
||||
this.destinationName = destinationName;
|
||||
this.mode = Mode.SEND;
|
||||
resetReceiveState();
|
||||
return this;
|
||||
}
|
||||
|
||||
public MessagingRequest from(String destinationName) {
|
||||
this.destinationName = destinationName;
|
||||
this.mode = Mode.RECEIVE;
|
||||
resetReceiveState();
|
||||
return this;
|
||||
}
|
||||
|
||||
public MessagingRequest withKey(String key) {
|
||||
this.key = key;
|
||||
return this;
|
||||
}
|
||||
|
||||
public MessagingRequest withPayload(String payload) {
|
||||
this.payload = payload;
|
||||
return this;
|
||||
}
|
||||
|
||||
public MessagingRequest withPayloadFromTemplate(Template template) {
|
||||
this.payload = template.render();
|
||||
return this;
|
||||
}
|
||||
|
||||
public MessagingRequest withPayloadFromFile(String path) {
|
||||
this.payload = FileReader.readFileFromResources(path);
|
||||
return this;
|
||||
}
|
||||
|
||||
public MessagingRequest withHeader(String key, String value) {
|
||||
this.headers.put(key, value);
|
||||
return this;
|
||||
}
|
||||
|
||||
public MessagingRequest withTraceparent(String value) {
|
||||
return withHeader("traceparent", value);
|
||||
}
|
||||
|
||||
public MessagingRequest withRequestID(String value) {
|
||||
return withHeader("requestID", value);
|
||||
}
|
||||
|
||||
public MessagingRequest withActivityID(String value) {
|
||||
return withHeader("activityID", value);
|
||||
}
|
||||
|
||||
public MessagingRequest withSourceCodebookId(String value) {
|
||||
return withHeader("sourceCodebookId", value);
|
||||
}
|
||||
|
||||
public MessagingRequest asJson() {
|
||||
this.formatOverride = MqMessageFormat.JSON;
|
||||
return this;
|
||||
}
|
||||
|
||||
public MessagingRequest asXml() {
|
||||
this.formatOverride = MqMessageFormat.XML;
|
||||
return this;
|
||||
}
|
||||
|
||||
public MessagingRequest asEbcdic() {
|
||||
this.formatOverride = MqMessageFormat.EBCDIC_870;
|
||||
return this;
|
||||
}
|
||||
|
||||
public MessagingRequest asUtf8() {
|
||||
this.formatOverride = MqMessageFormat.UTF8_1208;
|
||||
return this;
|
||||
}
|
||||
|
||||
public MessagingRequest withTimeout(long value, TimeUnit unit) {
|
||||
this.timeout = Duration.ofMillis(unit.toMillis(value));
|
||||
if (receivePending && receivedMessage == null) {
|
||||
doReceive();
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
public MessagingRequest send() {
|
||||
ensureMode(Mode.SEND);
|
||||
if (payload == null) {
|
||||
throw new IllegalStateException("Message payload must be provided before send()");
|
||||
}
|
||||
endpoint.send(destinationName, key, payload, formatOverride, headers);
|
||||
return this;
|
||||
}
|
||||
|
||||
public MessagingRequest receiveWhere(Predicate<ReceivedMessage> filter) {
|
||||
ensureMode(Mode.RECEIVE);
|
||||
this.receiveFilter = filter;
|
||||
this.receivePending = true;
|
||||
this.receivedMessage = null;
|
||||
return this;
|
||||
}
|
||||
|
||||
public MessagingRequest andAssertFieldValue(String expression, String expectedValue) {
|
||||
ReceivedMessage message = getReceivedMessage();
|
||||
Assertions.assertEquals(expectedValue,
|
||||
message.extract(expression),
|
||||
String.format("Unexpected message field value for '%s'. Message body: %s", expression, message.getBody()));
|
||||
return this;
|
||||
}
|
||||
|
||||
public String extract(String expression) {
|
||||
return getReceivedMessage().extract(expression);
|
||||
}
|
||||
|
||||
public ReceivedMessage getReceivedMessage() {
|
||||
ensureMode(Mode.RECEIVE);
|
||||
if (receivedMessage == null) {
|
||||
doReceive();
|
||||
}
|
||||
return receivedMessage;
|
||||
}
|
||||
|
||||
private void doReceive() {
|
||||
if (!receivePending) {
|
||||
receiveFilter = msg -> true;
|
||||
receivePending = true;
|
||||
}
|
||||
|
||||
receivedMessage = endpoint.receive(
|
||||
destinationName,
|
||||
Optional.ofNullable(receiveFilter).orElse(msg -> true),
|
||||
timeout,
|
||||
formatOverride
|
||||
);
|
||||
receivePending = false;
|
||||
}
|
||||
|
||||
private void ensureMode(Mode requiredMode) {
|
||||
if (this.mode != requiredMode) {
|
||||
throw new IllegalStateException("Messaging request is not in " + requiredMode + " mode");
|
||||
}
|
||||
}
|
||||
|
||||
private void resetReceiveState() {
|
||||
this.receiveFilter = null;
|
||||
this.receivedMessage = null;
|
||||
this.receivePending = false;
|
||||
}
|
||||
|
||||
private enum Mode {
|
||||
UNSET,
|
||||
SEND,
|
||||
RECEIVE
|
||||
}
|
||||
}
|
||||
@ -25,6 +25,7 @@ import cz.moneta.test.dsl.ib.Ib;
|
||||
import cz.moneta.test.dsl.ilods.Ilods;
|
||||
import cz.moneta.test.dsl.kasanova.Kasanova;
|
||||
import cz.moneta.test.dsl.mobile.smartbanking.home.Sb;
|
||||
import cz.moneta.test.dsl.messaging.Messaging;
|
||||
import cz.moneta.test.dsl.monetaapiportal.MonetaApiPortal;
|
||||
import cz.moneta.test.dsl.monetaportal.MonetaPortal;
|
||||
import cz.moneta.test.dsl.mwf.IHub;
|
||||
@ -282,6 +283,10 @@ public class Harness extends BaseStoreAccessor {
|
||||
return new Cashman(this);
|
||||
}
|
||||
|
||||
public Messaging withMessaging() {
|
||||
return new Messaging(this);
|
||||
}
|
||||
|
||||
private void initGenerators() {
|
||||
addGenerator(RC, new RcGenerator());
|
||||
addGenerator(FIRST_NAME, new FirstNameGenerator());
|
||||
|
||||
@ -0,0 +1,26 @@
|
||||
package cz.moneta.test.dsl.messaging;
|
||||
|
||||
import cz.moneta.test.dsl.Harness;
|
||||
import cz.moneta.test.harness.endpoints.messaging.MessagingEndpoint;
|
||||
import cz.moneta.test.harness.support.messaging.MessagingRequest;
|
||||
|
||||
public class Messaging {
|
||||
|
||||
private final Harness harness;
|
||||
|
||||
public Messaging(Harness harness) {
|
||||
this.harness = harness;
|
||||
}
|
||||
|
||||
public MessagingRequest to(String destinationName) {
|
||||
return request().to(destinationName);
|
||||
}
|
||||
|
||||
public MessagingRequest from(String destinationName) {
|
||||
return request().from(destinationName);
|
||||
}
|
||||
|
||||
public MessagingRequest request() {
|
||||
return MessagingRequest.builder(harness.getEndpoint(MessagingEndpoint.class));
|
||||
}
|
||||
}
|
||||
@ -97,4 +97,40 @@ endpoints.szr-mock-api.url=https://api-szr.tst.moneta-containers.net
|
||||
endpoints.exevido.url=https://exevido.tst.moneta-containers.net/#/auth/login
|
||||
|
||||
#Cashman
|
||||
endpoints.cashman.url=https://cashmantst.mbid.cz/
|
||||
endpoints.cashman.url=https://cashmantst.mbid.cz/
|
||||
|
||||
# Messaging - Kafka (Confluent Cloud)
|
||||
messaging.kafka.bootstrap-servers=pkc-xxxxx.eu-central-1.aws.confluent.cloud:9092
|
||||
messaging.kafka.security-protocol=SASL_SSL
|
||||
messaging.kafka.sasl-mechanism=PLAIN
|
||||
messaging.kafka.schema-registry-url=https://psrc-xxxxx.eu-central-1.aws.confluent.cloud
|
||||
messaging.kafka.value-serializer=avro
|
||||
|
||||
# Messaging - IBM MQ
|
||||
messaging.ibmmq.host=mq-server.mbid.cz
|
||||
messaging.ibmmq.port=1414
|
||||
messaging.ibmmq.channel=CLIENT.CHANNEL
|
||||
messaging.ibmmq.queue-manager=QM1
|
||||
messaging.ibmmq.ssl-cipher-suite=TLS_RSA_WITH_AES_256_CBC_SHA256
|
||||
messaging.ibmmq.keystore.path=/opt/harness/keystores/ibmmq-client.p12
|
||||
messaging.ibmmq.keystore.password=changeit
|
||||
|
||||
# Messaging destinations
|
||||
messaging.destination.order-events.type=kafka
|
||||
messaging.destination.order-events.topic=order-events-tst1
|
||||
messaging.destination.payment-notifications.type=ibmmq
|
||||
messaging.destination.payment-notifications.queue=PAYMENT.NOTIFICATIONS
|
||||
messaging.destination.payment-notifications.format=json
|
||||
messaging.destination.mainframe-requests.type=ibmmq
|
||||
messaging.destination.mainframe-requests.queue=MF.REQUESTS
|
||||
messaging.destination.mainframe-requests.format=xml
|
||||
messaging.destination.mainframe-ebcdic-queue.type=ibmmq
|
||||
messaging.destination.mainframe-ebcdic-queue.queue=MF.EBCDIC.QUEUE
|
||||
messaging.destination.mainframe-ebcdic-queue.format=ebcdic_870
|
||||
messaging.destination.mainframe-utf8-queue.type=ibmmq
|
||||
messaging.destination.mainframe-utf8-queue.queue=MF.UTF8.QUEUE
|
||||
messaging.destination.mainframe-utf8-queue.format=utf8_1208
|
||||
|
||||
# Messaging vault paths
|
||||
vault.path.messaging.kafka=/kv/autotesty/tst1/kafka
|
||||
vault.path.messaging.ibmmq=/kv/autotesty/tst1/ibmmq
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user