Spring Integration - Message Flow - Router

Reference

Message Flow
Router - 透過條件判斷要把訊息轉到哪, 也可以轉給多個點
Filter - 透過條件決定要不要把訊息 forward 出去
Splitter - 可以把一個訊息轉成多個訊息
Aggregator - 可以接收多個訊息, 等新訊息被組成完整之後就可以送出
Resequencer - 收到訊息之後重新排列訊息再送出

Router Example
MyMessage 有一個 to 指向要去的 target channel.
MessageRouter 接收到 MyMessage 之後回傳要去的 channel, 所以直接回傳 target channel.
RouterMain.java
public class RouterMain {

    public static void main(String[] params) {
        ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:RouterMain.xml");
        MessageChannel input = ctx.getBean("input", MessageChannel.class);

        input.send(MessageBuilder.withPayload(new MyMessage("database", "needPersist")).build());
        input.send(MessageBuilder.withPayload(new MyMessage("memory", "noNeedPersist")).build());


        PollableChannel dbOutput = ctx.getBean("database", PollableChannel.class);
        System.out.println("from DB:" + dbOutput.receive());
        PollableChannel memOutput = ctx.getBean("memory", PollableChannel.class);
        System.out.println("from Memory" + memOutput.receive());
    }
}
MyMessage.java 這裡用了 lombok 減少程式 (lombok 會產生 getter/setter/constructor)
@AllArgsConstructor
@Data
public class MyMessage {

    private String to;
    private String message;

}

MessageRouter.java 收到 MyMessage 之後回傳這個 message 要轉去的 channel
@Component
public class MessageRouter {
    @Router
    public String route(MyMessage msg) {
        System.out.println("msg.to:" + msg.getTo());
        return msg.getTo();
    }
}
RouterMain.xml
定義了 input channel 的訊息會被 messageRouter 導向其他 channel.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/integration
        http://www.springframework.org/schema/integration/spring-integration-5.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-3.0.xsd">
    <context:component-scan base-package="examples.router"/>

    <int:router input-channel="input" ref="messageRouter" />

    <int:channel id="input"/>
    <int:channel id="database">
        <int:queue capacity="10"/>
    </int:channel>
    <int:channel id="memory">
        <int:queue capacity="10"/>
    </int:channel>
    <int:channel id="output">
        <int:queue capacity="10"/>
    </int:channel>
</beans>






Spring Integration - Header Enricher

Reference

Enrich Header XML
  1. Header-enricher 可以改變 msg 的 header, 這裡把 mykey 加入 message
  2. Transformer 可以透過 @Header 或 @Headers 來取得 headers
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/integration
        http://www.springframework.org/schema/integration/spring-integration-5.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-3.0.xsd">
    <context:component-scan base-package="examples"/>
    <int:header-enricher input-channel="input" output-channel="inteceptor">
        <int:header name="mykey" value="myval" />
    </int:header-enricher>

    <int:transformer input-channel="inteceptor" output-channel="output" ref="mapper" />


    <int:channel id="inteceptor" />
    <int:channel id="input"/>
    <int:channel id="output">
        <int:queue capacity="10"/>
    </int:channel>
</beans>

HeaderEnricherMain
public class HeaderEnricherMain {

    public static void main(String[] params) {
        ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:HeaderEnricherMain.xml");
        MessageChannel input = ctx.getBean("input", MessageChannel.class);
        input.send(MessageBuilder.withPayload("test").build());


        PollableChannel output = ctx.getBean("output", PollableChannel.class);
        System.out.println(output.receive().getHeaders().get("mykey")); // print myval
    }

}


Transformer 用 @Header or @Headers 取值
@Component
public class Mapper {

    @Transformer
    public TextMessage map(Map<String, String> message) {
        System.out.println("transform from " + message + " to TextMessage");
        return new TextMessage(message.get("text"));
    }

    @Transformer
    public TextMessage map(String message) {
        System.out.println("transform from string to TextMessage");
        return new TextMessage(message);
    }

    @Transformer
    public TextMessage mapMyKey(@Header("mykey") String myval, @Headers Map<String,String> headerMap, String message) {
        System.out.println("headerMap:" + headerMap);
        return new TextMessage(myval); // myval
    }

}




Spring Integration - Integrate with ActiveMQ

Reference

ActiveMQ Example
這是個純粹的 ActiveMQ + JMS API 範例
public class ActiveMQMain {

    public static void main(String[] params) throws Exception {
        ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
        Connection connection = getConnection(factory);
        connection.start();
        System.out.println("create session");
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination dst = session.createQueue("test");

        System.out.println("create producer");
        MessageProducer producer = session.createProducer(dst);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        TextMessage msg = session.createTextMessage("my test msg");

        System.out.println("send message");
        producer.send(msg);

        MessageConsumer consumer = session.createConsumer(dst);
        System.out.println("consume message");
        System.out.println(consumer.receive());

        System.out.println("close session");
        session.close();
        System.out.println("close connection");
        connection.close();
        System.out.println("done");
    }

    private static Connection getConnection(ConnectionFactory factory) {
        Connection result = null;
        while (result == null) {
            try {
                result = factory.createConnection();
                System.out.println("connection get!");
            } catch (Exception ex) {
                System.err.println(ex);
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        return result;
    }

    private static void asyncStart(BrokerService brokerService) {
        new Thread(() -> {
            try {
                brokerService.start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }

}



JmsTemplate Example
用 JmsTemplate 來傳送與接收訊息
public class JmsTemplateMain {

    public static void main(String[] params) throws JMSException {
        ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
        JmsTemplate template = new JmsTemplate(factory);
        template.setDefaultDestinationName("test");
        template.send(session -> {
            MapMessage msg = session.createMapMessage();
            msg.setString("text", UUID.randomUUID().toString());
            return msg;
        });
        MapMessage msg = (MapMessage) template.receive();
        System.out.println("receive msg:" + msg.getString("text"));
    }


    private static Connection getConnection(ConnectionFactory factory) {
        Connection result = null;
        while (result == null) {
            try {
                result = factory.createConnection();
                System.out.println("connection get!");
            } catch (Exception ex) {
                System.err.println(ex);
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        return result;
    }

}


Integrate with ActiveMQ Example
Maven Dependency
首先, maven dependency 很關鍵, 以下是我做範例時候使用到相關的 dependency
<dependency>
   <groupId>org.springframework.integration</groupId>
   <artifactId>spring-integration-core</artifactId>
   <version>5.0.3.RELEASE</version>
</dependency>
<dependency>
   <groupId>org.springframework</groupId>
   <artifactId>spring-jms</artifactId>
   <version>5.0.3.RELEASE</version>
</dependency>
<dependency>
   <groupId>org.springframework.integration</groupId>
   <artifactId>spring-integration-jms</artifactId>
   <version>5.0.3.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.3</version>
</dependency>
<dependency>
    <groupId>javax.jms</groupId>
    <artifactId>javax.jms-api</artifactId>
    <version>2.0.1</version>
</dependency>

Main Class
這裡是用 JmsTemplate 向 Spring Integration 的 queue 送訊息, 預期這個訊息會被轉到 output channel.
再用 PollableChannel 取資料
public class IntegrateActiveMQMain {

    public static void main(String[] params) {
        ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:IntegrateActiveMQMain.xml");
        JmsTemplate jmsTemplate = ctx.getBean("jmsTemplate", JmsTemplate.class);
        jmsTemplate.send(session -> {
            MapMessage msg = session.createMapMessage();
            msg.setString("text", UUID.randomUUID().toString());
            return msg;
        });

        PollableChannel output = ctx.getBean("output", PollableChannel.class);
        Message<?> reply = output.receive();
        System.out.println("received: " + reply.getPayload());

        ctx.close();
    }

}

IntegrateActiveMQMain.xml
  1. 設定 connectionFactory, 讓 jms client 發送訊息, 也準備讓接收端拿到 connection. 實際上這兩個角色可能是在不同的 process.
  2. 用 message-driven-channel-adapter 去連接 MQ.
  3. 用 transformer 轉換收到的訊息, 目前支援 map message, transformer 會把轉換後的訊息丟給 output channel.
  4. Transformer 是把 map message 轉為 TextMessageEmptyConstructor
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:jms="http://www.springframework.org/schema/integration/jms"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/integration
        http://www.springframework.org/schema/integration/spring-integration-5.0.xsd
        http://www.springframework.org/schema/integration/jms
        http://www.springframework.org/schema/integration/jms/spring-integration-jms-5.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-3.0.xsd">
    <context:component-scan base-package="examples"/>

    <bean id="cachingConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <constructor-arg value="vm://localhost" />
    </bean>

    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="cachingConnectionFactory" />
        <property name="defaultDestinationName" value="transformation.example.queue" />
    </bean>

    <jms:message-driven-channel-adapter
            channel="map"
            connection-factory="cachingConnectionFactory"
            destination-name="transformation.example.queue"/>

    <int:map-to-object-transformer input-channel="map" output-channel="output"
                                   type="examples.transformer.TextMessageEmptyConstructor" />

    <int:channel id="map" />
    <int:channel id="output">
        <int:queue capacity="10"/>
    </int:channel>
</beans>



TextMessageEmptyConstructor
public class TextMessageEmptyConstructor {

    private String text;

    public TextMessageEmptyConstructor() {}
    public TextMessageEmptyConstructor(String text) {
        this.text = text;
    }

    public void setText(String text) {
        this.text = text;
    }

    public String getText() {
        return text;
    }

    @Override
    public String toString() {
        return getClass().getSimpleName() + "=>" + text;
    }
}




難開的瓶蓋

家裡最近買一個洗髮精, 讓我洗得很痛苦.

讓我痛苦的點是瓶蓋, 瓶蓋如下圖.

















這個瓶蓋的問題在: 洗澡的時候很難知道要從哪邊開.
這個圓形的瓶蓋, 只有一個凹槽可以把瓶蓋打開, 但那個凹槽 (右邊那張圖) 不明顯.

覺得應該是因為整個洗髮精都是黃色的, 為了一致性所以瓶蓋也需要是黃色的吧?
顏色的部分可能無法改變, 但如果可以改成像寶特瓶那樣用轉的就可以很好用了.
 好用的原因是: 這種瓶蓋的使用方式很明確, 可以跟顏色無關.
如果設計的條件是顏色要黃色, 這種瓶蓋可以避免被顏色干擾.

可能是會被詬病比較醜吧?

Spring Integration - Transformations



Reference

Transformer 用來轉換收進來的訊息
Example 把 Map 或 String 改為 TextMessage. Mapper 掛上 @Transformer 就是註冊 Transformer, 根據 parameter type 可以自動判斷哪個 transformer method 要被呼叫
// Mapper
@Component
public class Mapper {

    @Transformer
    public TextMessage map(Map<String, String> message) {
        System.out.println("transform from " + message + " to TextMessage");
        return new TextMessage(message.get("text"));
    }

    @Transformer
    public TextMessage map(String message) {
        System.out.println("transform from string to TextMessage");
        return new TextMessage(message);
    }

}

// TextMessage
public class TextMessage {

    private final String text;

    public TextMessage(String text) {
        this.text = text;
    }

    @Override
    public String toString() {
        return text;
    }
}

// TransformerMain.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/integration
        http://www.springframework.org/schema/integration/spring-integration-5.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-3.0.xsd">
    <context:component-scan base-package="examples.transformer"/>
    <int:transformer input-channel="input"
                     output-channel="output"
                     ref="mapper"/>
    <int:channel id="input"/>
    <int:channel id="output">
        <int:queue capacity="10"/>
    </int:channel>
</beans>

// TransformerMain.java
public class TransformerMain {

    public static void main(String[] params) {
        ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:TransformerMain.xml");
        MessageChannel input = ctx.getBean("input", MessageChannel.class);
        PollableChannel output = ctx.getBean("output", PollableChannel.class);
        Map<String,String> message = new HashMap<>();
        message.put("text", "here");
        input.send(MessageBuilder.withPayload(message).build());
        System.out.println(output.receive().getPayload());


        input.send(MessageBuilder.withPayload("fromString").build());
        System.out.println(output.receive().getPayload());
    }

}


Object-to-string transformer
Example 不用設定 mapper 就會呼叫 toString 轉換
// ObjectToStringTransformer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/integration
        http://www.springframework.org/schema/integration/spring-integration-5.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-3.0.xsd">
    <context:component-scan base-package="examples.transformer"/>
    <int:object-to-string-transformer input-channel="input" output-channel="output" />
    <int:channel id="input"/>
    <int:channel id="output">
        <int:queue capacity="10"/>
    </int:channel>
</beans>

public class ObjectToStringTransformerMain {

    public static void main(String[] params) {
        ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:ObjectToStringTransformerMain.xml");
        MessageChannel input = ctx.getBean("input", MessageChannel.class);
        PollableChannel output = ctx.getBean("output", PollableChannel.class);
        Map<String,String> message = new HashMap<>();
        message.put("text", "here");
        input.send(MessageBuilder.withPayload(message).build());
        System.out.println(output.receive().getPayload());


        input.send(MessageBuilder.withPayload("fromString").build());
        System.out.println(output.receive().getPayload());
    }

}

Payload-serializer-transformer 對 payload 直接做 serialize/deserialize
Example localId in TextMessage 是 transient, 所以不會被 serialize/deserialize
// XML
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/integration
        http://www.springframework.org/schema/integration/spring-integration-5.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-3.0.xsd">
    <context:component-scan base-package="examples.transformer"/>
    <int:payload-serializing-transformer input-channel="input" output-channel="byte-array" />
    <int:payload-deserializing-transformer input-channel="byte-array" output-channel="output" />
    <int:channel id="byte-array" />
    <int:channel id="input"/>
    <int:channel id="output">
        <int:queue capacity="10"/>
    </int:channel>
</beans>


public class TextMessage implements Serializable {

    private final String text;
    private final transient String localId;

    public TextMessage(String text) {
        this(text, UUID.randomUUID().toString());
    }

    public TextMessage(String text, String localId) {
        this.text = text;
        this.localId = localId;
    }

    @Override
    public String toString() {
        return localId + ":" + text;
    }
}

public class PayloadSerializingTransformerMain {

    public static void main(String[] params) {
        ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:PayloadSerializingTransformerMain.xml");
        MessageChannel input = ctx.getBean("input", MessageChannel.class);
        PollableChannel output = ctx.getBean("output", PollableChannel.class);
        TextMessage message = new TextMessage("testa", "1");

        input.send(MessageBuilder.withPayload(message).build());
        System.out.println(output.receive().getPayload()); // localId will become null
    }

}



Map-to-object transformer 替 payload 做 marshal/unmarshal to map
Example 這裡要注意的是 訊息必須有空的建構子與 setter/getter, 不然會失敗
// XML
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/integration
        http://www.springframework.org/schema/integration/spring-integration-5.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-3.0.xsd">
    <context:component-scan base-package="examples.transformer"/>
    <int:object-to-map-transformer input-channel="input" output-channel="map" />
    <int:map-to-object-transformer input-channel="map" output-channel="output" type="examples.transformer.TextMessageEmptyConstructor" />
    <int:channel id="map" />
    <int:channel id="input"/>
    <int:channel id="output">
        <int:queue capacity="10"/>
    </int:channel>
</beans>

// Payload class
public class TextMessageEmptyConstructor {

    private String text;

    public TextMessageEmptyConstructor() {}
    public TextMessageEmptyConstructor(String text) {
        this.text = text;
    }

    public void setText(String text) {
        this.text = text;
    }

    public String getText() {
        return text;
    }

    @Override
    public String toString() {
        return text;
    }
}

// Transformer
public class MapToObjectTransformerMain {

    public static void main(String[] params) {
        ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:MapToObjectTransformerMain.xml");
        MessageChannel input = ctx.getBean("input", MessageChannel.class);
        PollableChannel output = ctx.getBean("output", PollableChannel.class);

        TextMessageEmptyConstructor message = new TextMessageEmptyConstructor("testa");

        input.send(MessageBuilder.withPayload(message).build());
        System.out.println(output.receive().getPayload()); // localId will become null
    }

}


Object-to-json transformer payload 會轉成 json 再轉回 payload 物件
Example
// XML
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/integration
        http://www.springframework.org/schema/integration/spring-integration-5.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-3.0.xsd">
    <context:component-scan base-package="examples.transformer"/>
    <int:object-to-json-transformer input-channel="input" output-channel="json" />
    <int:json-to-object-transformer input-channel="json" output-channel="output" type="examples.transformer.TextMessageEmptyConstructor" />

    <int:channel id="json" />
    <int:channel id="input"/>
    <int:channel id="output">
        <int:queue capacity="10"/>
    </int:channel>
</beans>

// transformer
public class ObjectToJsonTransformerMain {

    public static void main(String[] params) {
        ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:ObjectToJsonTransformerMain.xml");
        MessageChannel input = ctx.getBean("input", MessageChannel.class);
        PollableChannel output = ctx.getBean("output", PollableChannel.class);

        TextMessageEmptyConstructor message = new TextMessageEmptyConstructor("test-object-to-json");

        input.send(MessageBuilder.withPayload(message).build());
        System.out.println(output.receive().getPayload());
    }

}



別名演算法 Alias Method

 題目 每個伺服器支援不同的 TPM (transaction per minute) 當 request 來的時候, 系統需要馬上根據 TPM 的能力隨機找到一個適合的 server. 雖然稱為 "隨機", 但還是需要有 TPM 作為權重. 解法 別名演算法...