Spring Integration - Message Flow - Router

Reference

Message Flow
Router - 透過條件判斷要把訊息轉到哪, 也可以轉給多個點
Filter - 透過條件決定要不要把訊息 forward 出去
Splitter - 可以把一個訊息轉成多個訊息
Aggregator - 可以接收多個訊息, 等新訊息被組成完整之後就可以送出
Resequencer - 收到訊息之後重新排列訊息再送出

Router Example
MyMessage 有一個 to 指向要去的 target channel.
MessageRouter 接收到 MyMessage 之後回傳要去的 channel, 所以直接回傳 target channel.
RouterMain.java
public class RouterMain {

    public static void main(String[] params) {
        ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:RouterMain.xml");
        MessageChannel input = ctx.getBean("input", MessageChannel.class);

        input.send(MessageBuilder.withPayload(new MyMessage("database", "needPersist")).build());
        input.send(MessageBuilder.withPayload(new MyMessage("memory", "noNeedPersist")).build());


        PollableChannel dbOutput = ctx.getBean("database", PollableChannel.class);
        System.out.println("from DB:" + dbOutput.receive());
        PollableChannel memOutput = ctx.getBean("memory", PollableChannel.class);
        System.out.println("from Memory" + memOutput.receive());
    }
}
MyMessage.java 這裡用了 lombok 減少程式 (lombok 會產生 getter/setter/constructor)
@AllArgsConstructor
@Data
public class MyMessage {

    private String to;
    private String message;

}

MessageRouter.java 收到 MyMessage 之後回傳這個 message 要轉去的 channel
@Component
public class MessageRouter {
    @Router
    public String route(MyMessage msg) {
        System.out.println("msg.to:" + msg.getTo());
        return msg.getTo();
    }
}
RouterMain.xml
定義了 input channel 的訊息會被 messageRouter 導向其他 channel.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/integration
        http://www.springframework.org/schema/integration/spring-integration-5.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-3.0.xsd">
    <context:component-scan base-package="examples.router"/>

    <int:router input-channel="input" ref="messageRouter" />

    <int:channel id="input"/>
    <int:channel id="database">
        <int:queue capacity="10"/>
    </int:channel>
    <int:channel id="memory">
        <int:queue capacity="10"/>
    </int:channel>
    <int:channel id="output">
        <int:queue capacity="10"/>
    </int:channel>
</beans>






沒有留言:

張貼留言

別名演算法 Alias Method

 題目 每個伺服器支援不同的 TPM (transaction per minute) 當 request 來的時候, 系統需要馬上根據 TPM 的能力隨機找到一個適合的 server. 雖然稱為 "隨機", 但還是需要有 TPM 作為權重. 解法 別名演算法...