Reference
Pro Spring Integration - https://www.amazon.com/Pro-Spring-Integration-Experts-Voice-ebook-dp-B005PZ29OA/dp/B005PZ29OA/ref=mt_kindle?_encoding=UTF8&me=&qid=
前篇: https://www.isaacnote.com/2018/11/spring-integration-transformations.html
後篇: https://www.isaacnote.com/2018/11/spring-integration-header-enricher.html
後篇: https://www.isaacnote.com/2018/11/spring-integration-header-enricher.html
ActiveMQ Example
這是個純粹的 ActiveMQ + JMS API 範例
public class ActiveMQMain {
public static void main(String[] params) throws Exception {
ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = getConnection(factory);
connection.start();
System.out.println("create session");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination dst = session.createQueue("test");
System.out.println("create producer");
MessageProducer producer = session.createProducer(dst);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
TextMessage msg = session.createTextMessage("my test msg");
System.out.println("send message");
producer.send(msg);
MessageConsumer consumer = session.createConsumer(dst);
System.out.println("consume message");
System.out.println(consumer.receive());
System.out.println("close session");
session.close();
System.out.println("close connection");
connection.close();
System.out.println("done");
}
private static Connection getConnection(ConnectionFactory factory) {
Connection result = null;
while (result == null) {
try {
result = factory.createConnection();
System.out.println("connection get!");
} catch (Exception ex) {
System.err.println(ex);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
return result;
}
private static void asyncStart(BrokerService brokerService) {
new Thread(() -> {
try {
brokerService.start();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
JmsTemplate Example
用 JmsTemplate 來傳送與接收訊息
public class JmsTemplateMain {
public static void main(String[] params) throws JMSException {
ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
JmsTemplate template = new JmsTemplate(factory);
template.setDefaultDestinationName("test");
template.send(session -> {
MapMessage msg = session.createMapMessage();
msg.setString("text", UUID.randomUUID().toString());
return msg;
});
MapMessage msg = (MapMessage) template.receive();
System.out.println("receive msg:" + msg.getString("text"));
}
private static Connection getConnection(ConnectionFactory factory) {
Connection result = null;
while (result == null) {
try {
result = factory.createConnection();
System.out.println("connection get!");
} catch (Exception ex) {
System.err.println(ex);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
return result;
}
}
Integrate with ActiveMQ Example
Maven Dependency
首先, maven dependency 很關鍵, 以下是我做範例時候使用到相關的 dependency
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>5.0.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>5.0.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jms</artifactId>
<version>5.0.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.3</version>
</dependency>
<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
<version>2.0.1</version>
</dependency>
Main Class
這裡是用 JmsTemplate 向 Spring Integration 的 queue 送訊息, 預期這個訊息會被轉到 output channel.
再用 PollableChannel 取資料
public class IntegrateActiveMQMain {
public static void main(String[] params) {
ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:IntegrateActiveMQMain.xml");
JmsTemplate jmsTemplate = ctx.getBean("jmsTemplate", JmsTemplate.class);
jmsTemplate.send(session -> {
MapMessage msg = session.createMapMessage();
msg.setString("text", UUID.randomUUID().toString());
return msg;
});
PollableChannel output = ctx.getBean("output", PollableChannel.class);
Message<?> reply = output.receive();
System.out.println("received: " + reply.getPayload());
ctx.close();
}
}
IntegrateActiveMQMain.xml
- 設定 connectionFactory, 讓 jms client 發送訊息, 也準備讓接收端拿到 connection. 實際上這兩個角色可能是在不同的 process.
- 用 message-driven-channel-adapter 去連接 MQ.
- 用 transformer 轉換收到的訊息, 目前支援 map message, transformer 會把轉換後的訊息丟給 output channel.
- Transformer 是把 map message 轉為 TextMessageEmptyConstructor
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:jms="http://www.springframework.org/schema/integration/jms"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-5.0.xsd
http://www.springframework.org/schema/integration/jms
http://www.springframework.org/schema/integration/jms/spring-integration-jms-5.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd">
<context:component-scan base-package="examples"/>
<bean id="cachingConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<constructor-arg value="vm://localhost" />
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="cachingConnectionFactory" />
<property name="defaultDestinationName" value="transformation.example.queue" />
</bean>
<jms:message-driven-channel-adapter
channel="map"
connection-factory="cachingConnectionFactory"
destination-name="transformation.example.queue"/>
<int:map-to-object-transformer input-channel="map" output-channel="output"
type="examples.transformer.TextMessageEmptyConstructor" />
<int:channel id="map" />
<int:channel id="output">
<int:queue capacity="10"/>
</int:channel>
</beans>
TextMessageEmptyConstructor
public class TextMessageEmptyConstructor {
private String text;
public TextMessageEmptyConstructor() {}
public TextMessageEmptyConstructor(String text) {
this.text = text;
}
public void setText(String text) {
this.text = text;
}
public String getText() {
return text;
}
@Override
public String toString() {
return getClass().getSimpleName() + "=>" + text;
}
}
沒有留言:
張貼留言