Spring Integration - Integrate with ActiveMQ

Reference

ActiveMQ Example
這是個純粹的 ActiveMQ + JMS API 範例
public class ActiveMQMain {

    public static void main(String[] params) throws Exception {
        ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
        Connection connection = getConnection(factory);
        connection.start();
        System.out.println("create session");
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination dst = session.createQueue("test");

        System.out.println("create producer");
        MessageProducer producer = session.createProducer(dst);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        TextMessage msg = session.createTextMessage("my test msg");

        System.out.println("send message");
        producer.send(msg);

        MessageConsumer consumer = session.createConsumer(dst);
        System.out.println("consume message");
        System.out.println(consumer.receive());

        System.out.println("close session");
        session.close();
        System.out.println("close connection");
        connection.close();
        System.out.println("done");
    }

    private static Connection getConnection(ConnectionFactory factory) {
        Connection result = null;
        while (result == null) {
            try {
                result = factory.createConnection();
                System.out.println("connection get!");
            } catch (Exception ex) {
                System.err.println(ex);
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        return result;
    }

    private static void asyncStart(BrokerService brokerService) {
        new Thread(() -> {
            try {
                brokerService.start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }

}



JmsTemplate Example
用 JmsTemplate 來傳送與接收訊息
public class JmsTemplateMain {

    public static void main(String[] params) throws JMSException {
        ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
        JmsTemplate template = new JmsTemplate(factory);
        template.setDefaultDestinationName("test");
        template.send(session -> {
            MapMessage msg = session.createMapMessage();
            msg.setString("text", UUID.randomUUID().toString());
            return msg;
        });
        MapMessage msg = (MapMessage) template.receive();
        System.out.println("receive msg:" + msg.getString("text"));
    }


    private static Connection getConnection(ConnectionFactory factory) {
        Connection result = null;
        while (result == null) {
            try {
                result = factory.createConnection();
                System.out.println("connection get!");
            } catch (Exception ex) {
                System.err.println(ex);
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        return result;
    }

}


Integrate with ActiveMQ Example
Maven Dependency
首先, maven dependency 很關鍵, 以下是我做範例時候使用到相關的 dependency
<dependency>
   <groupId>org.springframework.integration</groupId>
   <artifactId>spring-integration-core</artifactId>
   <version>5.0.3.RELEASE</version>
</dependency>
<dependency>
   <groupId>org.springframework</groupId>
   <artifactId>spring-jms</artifactId>
   <version>5.0.3.RELEASE</version>
</dependency>
<dependency>
   <groupId>org.springframework.integration</groupId>
   <artifactId>spring-integration-jms</artifactId>
   <version>5.0.3.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.3</version>
</dependency>
<dependency>
    <groupId>javax.jms</groupId>
    <artifactId>javax.jms-api</artifactId>
    <version>2.0.1</version>
</dependency>

Main Class
這裡是用 JmsTemplate 向 Spring Integration 的 queue 送訊息, 預期這個訊息會被轉到 output channel.
再用 PollableChannel 取資料
public class IntegrateActiveMQMain {

    public static void main(String[] params) {
        ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:IntegrateActiveMQMain.xml");
        JmsTemplate jmsTemplate = ctx.getBean("jmsTemplate", JmsTemplate.class);
        jmsTemplate.send(session -> {
            MapMessage msg = session.createMapMessage();
            msg.setString("text", UUID.randomUUID().toString());
            return msg;
        });

        PollableChannel output = ctx.getBean("output", PollableChannel.class);
        Message<?> reply = output.receive();
        System.out.println("received: " + reply.getPayload());

        ctx.close();
    }

}

IntegrateActiveMQMain.xml
  1. 設定 connectionFactory, 讓 jms client 發送訊息, 也準備讓接收端拿到 connection. 實際上這兩個角色可能是在不同的 process.
  2. 用 message-driven-channel-adapter 去連接 MQ.
  3. 用 transformer 轉換收到的訊息, 目前支援 map message, transformer 會把轉換後的訊息丟給 output channel.
  4. Transformer 是把 map message 轉為 TextMessageEmptyConstructor
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:jms="http://www.springframework.org/schema/integration/jms"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/integration
        http://www.springframework.org/schema/integration/spring-integration-5.0.xsd
        http://www.springframework.org/schema/integration/jms
        http://www.springframework.org/schema/integration/jms/spring-integration-jms-5.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-3.0.xsd">
    <context:component-scan base-package="examples"/>

    <bean id="cachingConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <constructor-arg value="vm://localhost" />
    </bean>

    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="cachingConnectionFactory" />
        <property name="defaultDestinationName" value="transformation.example.queue" />
    </bean>

    <jms:message-driven-channel-adapter
            channel="map"
            connection-factory="cachingConnectionFactory"
            destination-name="transformation.example.queue"/>

    <int:map-to-object-transformer input-channel="map" output-channel="output"
                                   type="examples.transformer.TextMessageEmptyConstructor" />

    <int:channel id="map" />
    <int:channel id="output">
        <int:queue capacity="10"/>
    </int:channel>
</beans>



TextMessageEmptyConstructor
public class TextMessageEmptyConstructor {

    private String text;

    public TextMessageEmptyConstructor() {}
    public TextMessageEmptyConstructor(String text) {
        this.text = text;
    }

    public void setText(String text) {
        this.text = text;
    }

    public String getText() {
        return text;
    }

    @Override
    public String toString() {
        return getClass().getSimpleName() + "=>" + text;
    }
}




沒有留言:

張貼留言

別名演算法 Alias Method

 題目 每個伺服器支援不同的 TPM (transaction per minute) 當 request 來的時候, 系統需要馬上根據 TPM 的能力隨機找到一個適合的 server. 雖然稱為 "隨機", 但還是需要有 TPM 作為權重. 解法 別名演算法...