diff -Nru activemq-5.15.10/activemq-all/pom.xml activemq-5.15.11/activemq-all/pom.xml
--- activemq-5.15.10/activemq-all/pom.xml 2019-08-28 06:44:45.000000000 +0000
+++ activemq-5.15.11/activemq-all/pom.xml 2019-11-20 16:00:05.000000000 +0000
@@ -14,7 +14,7 @@
org.apache.activemq
activemq-parent
- 5.15.10
+ 5.15.11
activemq-all
@@ -115,7 +115,7 @@
org.fusesource.hawtbuf:hawtbuf
org.jasypt:jasypt
org.apache.geronimo.specs:geronimo-jms_1.1_spec
- org.apache.geronimo.specs:geronimo-jta_1.0.1B_spec
+ org.apache.geronimo.specs:geronimo-jta_1.1_spec
org.apache.geronimo.specs:geronimo-j2ee-management_1.1_spec
org.apache.geronimo.specs:geronimo-annotation_1.0_spec
org.slf4j:slf4j-api
diff -Nru activemq-5.15.10/activemq-amqp/pom.xml activemq-5.15.11/activemq-amqp/pom.xml
--- activemq-5.15.10/activemq-amqp/pom.xml 2019-08-28 06:44:45.000000000 +0000
+++ activemq-5.15.11/activemq-amqp/pom.xml 2019-11-20 16:00:05.000000000 +0000
@@ -22,7 +22,7 @@
org.apache.activemq
activemq-parent
- 5.15.10
+ 5.15.11
activemq-amqp
diff -Nru activemq-5.15.10/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java activemq-5.15.11/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
--- activemq-5.15.10/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java 2019-08-28 06:44:45.000000000 +0000
+++ activemq-5.15.11/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java 2019-11-20 16:00:05.000000000 +0000
@@ -52,6 +52,7 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import javax.jms.JMSException;
@@ -67,7 +68,11 @@
import org.apache.activemq.command.ActiveMQStreamMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.CommandTypes;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.transport.amqp.AmqpProtocolException;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.TypeConversionSupport;
@@ -333,6 +338,15 @@
apMap = new HashMap<>();
}
apMap.put(key, value);
+
+ int messageType = message.getDataStructureType();
+ if (messageType == CommandTypes.ACTIVEMQ_MESSAGE) {
+ // Type of command to recognize advisory message
+ Object data = message.getDataStructure();
+ if(data != null) {
+ apMap.put("ActiveMqDataStructureType", data.getClass().getSimpleName());
+ }
+ }
}
final AmqpWritableBuffer buffer = new AmqpWritableBuffer();
@@ -376,7 +390,39 @@
int messageType = message.getDataStructureType();
- if (messageType == CommandTypes.ACTIVEMQ_BYTES_MESSAGE) {
+ if (messageType == CommandTypes.ACTIVEMQ_MESSAGE) {
+ Object data = message.getDataStructure();
+ if (data instanceof ConnectionInfo) {
+ ConnectionInfo connectionInfo = (ConnectionInfo)data;
+ final HashMap connectionMap = new LinkedHashMap();
+
+ connectionMap.put("ConnectionId", connectionInfo.getConnectionId().getValue());
+ connectionMap.put("ClientId", connectionInfo.getClientId());
+ connectionMap.put("ClientIp", connectionInfo.getClientIp());
+ connectionMap.put("UserName", connectionInfo.getUserName());
+ connectionMap.put("BrokerMasterConnector", connectionInfo.isBrokerMasterConnector());
+ connectionMap.put("Manageable", connectionInfo.isManageable());
+ connectionMap.put("ClientMaster", connectionInfo.isClientMaster());
+ connectionMap.put("FaultTolerant", connectionInfo.isFaultTolerant());
+ connectionMap.put("FailoverReconnect", connectionInfo.isFailoverReconnect());
+
+ body = new AmqpValue(connectionMap);
+ } else if (data instanceof RemoveInfo) {
+ RemoveInfo removeInfo = (RemoveInfo)message.getDataStructure();
+ final HashMap removeMap = new LinkedHashMap();
+
+ if (removeInfo.isConnectionRemove()) {
+ removeMap.put(ConnectionId.class.getSimpleName(), ((ConnectionId)removeInfo.getObjectId()).getValue());
+ } else if (removeInfo.isConsumerRemove()) {
+ removeMap.put(ConsumerId.class.getSimpleName(), ((ConsumerId)removeInfo.getObjectId()).getValue());
+ removeMap.put("SessionId", ((ConsumerId)removeInfo.getObjectId()).getSessionId());
+ removeMap.put("ConnectionId", ((ConsumerId)removeInfo.getObjectId()).getConnectionId());
+ removeMap.put("ParentId", ((ConsumerId)removeInfo.getObjectId()).getParentId());
+ }
+
+ body = new AmqpValue(removeMap);
+ }
+ } else if (messageType == CommandTypes.ACTIVEMQ_BYTES_MESSAGE) {
Binary payload = getBinaryFromMessageBody((ActiveMQBytesMessage) message);
if (payload == null) {
diff -Nru activemq-5.15.10/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java activemq-5.15.11/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
--- activemq-5.15.10/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java 2019-08-28 06:44:45.000000000 +0000
+++ activemq-5.15.11/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java 2019-11-20 16:00:05.000000000 +0000
@@ -39,6 +39,7 @@
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -57,6 +58,9 @@
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
@@ -771,6 +775,82 @@
String contents = new String(data.getArray(), data.getArrayOffset(), data.getLength(), StandardCharsets.UTF_8);
assertEquals(contentString, contents);
}
+
+ @Test
+ public void testConvertConnectionInfo() throws Exception {
+ String connectionId = "myConnectionId";
+ String clientId = "myClientId";
+
+ ConnectionInfo dataStructure = new ConnectionInfo();
+ dataStructure.setConnectionId(new ConnectionId(connectionId));
+ dataStructure.setClientId(clientId);
+
+ ActiveMQMessage outbound = createMessage();
+ Map properties = new HashMap();
+ properties.put("originUrl", "localhost");
+ outbound.setProperties(properties);
+ outbound.setDataStructure(dataStructure);
+ outbound.onSend();
+ outbound.storeContent();
+
+ JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
+
+ EncodedMessage encoded = transformer.transform(outbound);
+ assertNotNull(encoded);
+
+ Message amqp = encoded.decode();
+
+ assertNotNull(amqp.getApplicationProperties());
+
+ Map apMap = amqp.getApplicationProperties().getValue();
+ assertEquals(ConnectionInfo.class.getSimpleName(), apMap.get("ActiveMqDataStructureType"));
+
+ assertNotNull(amqp.getBody());
+ assertTrue(amqp.getBody() instanceof AmqpValue);
+ assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Map);
+
+ @SuppressWarnings("unchecked")
+ Map