Reference
Pro Spring Integration - https://www.amazon.com/Pro-Spring-Integration-Experts-Voice-ebook-dp-B005PZ29OA/dp/B005PZ29OA/ref=mt_kindle?_encoding=UTF8&me=&qid=
前篇: https://www.isaacnote.com/2018/09/spring-integration-basic-terms.html
後篇: https://www.isaacnote.com/2018/11/spring-integration-transformations.html
後篇: https://www.isaacnote.com/2018/11/spring-integration-transformations.html
Concept
- Point to Point Channel - 保證只有一個 receiver 會收到訊息. Ex. QueueChannel, PriorityChannel, RendezvousChannel, DirectChannel, ExecutorChannel, and NullChannel.
- Publish-Subscribe Channel - 訊息會被多個 receiver 收到
- Data-Typed Channel - receiver 透過 data type 來決定是否要收訊息, 這種 channel, receiver 需要知道自己要處理的 data type 是什麼.
- Invalid Message Channel - application 決定某個 message 無效之後可以把 message 丟到這個 channel 來讓別的 application 決定怎麼處理無用的 message
- Dead Letter Channel - producer 重複送訊息失敗幾次後就會把訊息送到 Dead Letter Channel 給其他 application 處理
- Channel Adapter - 用來跟 message system 對接
- Message Bridge - 用來連接兩個 channel 或 channel adapter
- Message Bus - 透過一致的 API 讓不同系統進行訊息溝通
- Guaranteed Delivery - 預設 Spring Integration 把 message 存在記憶體中, Spring Integration 2.0 後支援外部 message broker, 讓保證送達的功能由外部 message system 提供
Channels
MessageChannel
用來送訊息
public interface MessageChannel {
boolean send(Message<?> message);
boolean send(Message<?> message, long timeout);
}
PollableChannel
是 MessageChannel 的一種實作, 可以送訊息.
另外 PollableChannel 則是定義可以收訊息.
public interface PollableChannel extends MessageChannel {
Message<?> receive();
Message<?> receive(long timeout);
}
SubscribableChannel
是 MessageChannel 的一種實作, 可以傳送訊息.
另外 SubscribableChannel 還定義 MessageHandler 讓 API Client 能註冊自己的 MessageHandler 接收每一個訊息.
public interface MessageHandler {
void handleMessage(Message<?> message) throws MessageException;
}
public interface SubscribableChannel extends MessageChannel {
boolean subscribe(MessageHandler handler);
boolean unsubscribe(MessageHandler handler);
}
QueueChannel
是一種 Point to Point Channel. 能保證只有一個 consumer 會收到訊息.
Example 每個 receiver 都在 receive, 但每個訊息都只會被一個 receiver 收走.
public class QueueChannelMain {
static final Map<Object, AtomicInteger> receiveMsgCount = new ConcurrentHashMap<>();
static final CountDownLatch l = new CountDownLatch(10);
public static void main(String[] params) throws InterruptedException {
QueueChannel channel = new QueueChannel();
new Receiver(channel).start();
new Receiver(channel).start();
new Receiver(channel).start();
LongStream.range(0,l.getCount()).forEach(idx -> {
String msg = "Message" + idx;
System.out.println("send msg:" + msg);
channel.send(MessageBuilder.withPayload(msg).build());
});
l.await();
System.out.println("test done. receiveMsgCount:" + receiveMsgCount);
}
public static class Receiver extends Thread {
private QueueChannel channel;
Receiver(QueueChannel channel) {
this.channel = channel;
}
@Override
public void run() {
while (l.getCount() != 0) {
Message m = channel.receive(TimeUnit.SECONDS.toMillis(1));
if (m == null) {
continue;
}
Object pl = m.getPayload();
receiveMsgCount.putIfAbsent(pl, new AtomicInteger());
receiveMsgCount.get(pl).incrementAndGet();
System.out.println(Thread.currentThread().getName() + " got message: " + pl + ", count:" + l);
l.countDown();
}
}
}
}
Example QueueChannel 是 FIFO, 如果超過 capacity, sender 會被 block.
public class QueueChannelMainExceedCapacityMain {
public static void main(String[] params) {
QueueChannel channel = new QueueChannel(2);
send(channel, "1");
send(channel, "2");
send(channel, "3");
System.out.println(channel.receive(1000));
System.out.println(channel.receive(1000));
System.out.println(channel.receive(1000));
}
private static void send(QueueChannel channel, String s) {
try {
System.out.println("send:" + s);
channel.send(MessageBuilder.withPayload(s).build(), 1000); // the third message won't be sent
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
PriorityChannel 支援訊息進 channel 之後可以排隊
Example 預設會用 Message 的 header 的 PRIORITY
public class DefaultPriorityChannelMain {
public static void main(String[] params) throws InterruptedException {
PriorityChannel channel = new PriorityChannel();
channel.send(MessageBuilder.withPayload("MSG1").setPriority(1).build());
channel.send(MessageBuilder.withPayload("MSG2").setPriority(2).build());
channel.send(MessageBuilder.withPayload("MSG3").setPriority(3).build());
System.out.println(channel.receive()); // priority:3
System.out.println(channel.receive()); // priority:2
System.out.println(channel.receive()); // priority:1
}
}
Example 可以指定自己的 Comparator
public class PriorityChannelCustomComparatorMain {
public static void main(String[] params) {
PriorityChannel channel = new PriorityChannel(Comparator.comparingLong(m -> (Long) m.getHeaders().get("MYVAL")));
channel.send(MessageBuilder.withPayload("MSG1").setHeader("MYVAL", 3L).build());
channel.send(MessageBuilder.withPayload("MSG2").setHeader("MYVAL", 2L).build());
channel.send(MessageBuilder.withPayload("MSG3").setHeader("MYVAL", 1L).build());
System.out.println(channel.receive()); // MYVAL:1
System.out.println(channel.receive()); // MYVAL:2
System.out.println(channel.receive()); // MYVAL:3
}
}
RendezvousChannel synchronized QueueChannel, 內部 Queue 實作是 SynchronousQueue
Example 等到有人在等待訊息來, 不然會送不出去
public class RendezvousChannelMain {
public static void main(String[] params) {
RendezvousChannel channel = new RendezvousChannel();
receive(channel);
System.out.println("send 1");
channel.send(MessageBuilder.withPayload("MSG1").build());
System.out.println("send 2 and block until message was received");
channel.send(MessageBuilder.withPayload("MSG2").build());
System.out.println("won't reach here");
}
private static void receive(QueueChannel channel) {
new Thread(() -> System.out.println("receive:" + channel.receive())).start();
}
}
DirectChannel 提供 Pub-Sub 的 interface, 用 MessageHandler 收訊息, 但像 QueueChannel 一樣只會有一個 MessageHandler 收到.
Example
public class DirectChannelMain {
public static void main(String[] params) {
DirectChannel channel = new DirectChannel();
channel.subscribe(new MyMessageHandler(1));
channel.subscribe(new MyMessageHandler(2));
channel.subscribe(new MyMessageHandler(3));
channel.send(MessageBuilder.withPayload("MSG1").build());
channel.send(MessageBuilder.withPayload("MSG2").build());
channel.send(MessageBuilder.withPayload("MSG3").build());
}
static class MyMessageHandler implements MessageHandler {
private final int id;
MyMessageHandler(int id) {
this.id = id;
}
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("handler " + id + " receive:" + message);
}
}
}
ExecutorChannel 跟 DirectChannel 一樣, 但可以提供 Executor, 所以 sender 不會被 block
Example
public class ExecutorChannelMain {
public static void main(String[] params) {
ExecutorChannel channel = new ExecutorChannel(Executors.newCachedThreadPool());
channel.subscribe(new MyMessageHandler(1));
channel.subscribe(new MyMessageHandler(2));
channel.subscribe(new MyMessageHandler(3));
channel.send(MessageBuilder.withPayload("MSG1").build());
channel.send(MessageBuilder.withPayload("MSG2").build());
channel.send(MessageBuilder.withPayload("MSG3").build());
}
static class MyMessageHandler implements MessageHandler {
private final int id;
MyMessageHandler(int id) {
this.id = id;
}
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("handler " + id + " receive:" + message);
}
}
}
NullChannel send 的時候永遠說成功但其實不會送, receive 的時候永遠得到 null
Publish-Subscribe Channel 一個訊息送出去, 每個 MessageHandler 都會收到訊息
Example
public class PubsubChannelMain {
public static void main(String[] params) {
PublishSubscribeChannel channel = new PublishSubscribeChannel();
channel.subscribe(new MyMessageHandler(1));
channel.subscribe(new MyMessageHandler(2));
channel.subscribe(new MyMessageHandler(3));
channel.send(MessageBuilder.withPayload("MSG1").build());
channel.send(MessageBuilder.withPayload("MSG2").build());
channel.send(MessageBuilder.withPayload("MSG3").build());
}
static class MyMessageHandler implements MessageHandler {
private final int id;
MyMessageHandler(int id) {
this.id = id;
}
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("handler " + id + " receive:" + message);
}
}
}
Channel Inteceptors 可以攔截送出或接收到的訊息
Example
public class MessageInteceptorMain {
public static void main(String[] params) {
QueueChannel channel = new QueueChannel();
channel.addInterceptor(new ChannelInterceptor() {
@Override
public boolean preReceive(MessageChannel channel) {
System.out.println("pre receive");
return true;
}
@Override
public Message<?> postReceive(Message<?> message, MessageChannel channel) {
System.out.println("post receive:" + message);
return message;
}
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
System.out.println("presend:" + message);
return message;
}
@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
System.out.println("post sned:" + message);
}
});
channel.send(MessageBuilder.withPayload("test").build());
System.out.println("receive:" + channel.receive());
}
}
MessageTemplate 提供一些基本操作例如 sendAndReceive
Example sendAndReceive
public class MessagingTemplateMain {
public static void main(String[] params) {
QueueChannel channel = new QueueChannel();
MessagingTemplate template = new MessagingTemplate(channel);
new Thread(() -> {
Message<?> message = channel.receive();
System.out.println("receive:" + message);
MessageChannel c = (MessageChannel) message.getHeaders().getReplyChannel();
c.send(MessageBuilder.withPayload("OK!").build());
}).start();
template.setReceiveTimeout(10000);
System.out.println("send and receive:" + template.sendAndReceive(MessageBuilder.withPayload("test").build()));
}
}