Spring Integration - Channels


Reference

Concept
  • Point to Point Channel - 保證只有一個 receiver 會收到訊息. Ex. QueueChannel, PriorityChannel, RendezvousChannel, DirectChannel, ExecutorChannel, and NullChannel.
  • Publish-Subscribe Channel - 訊息會被多個 receiver 收到
  • Data-Typed Channel - receiver 透過 data type 來決定是否要收訊息, 這種 channel, receiver 需要知道自己要處理的 data type 是什麼.
  • Invalid Message Channel - application 決定某個 message 無效之後可以把 message 丟到這個 channel 來讓別的 application 決定怎麼處理無用的 message
  • Dead Letter Channel - producer 重複送訊息失敗幾次後就會把訊息送到 Dead Letter Channel 給其他 application 處理
  • Channel Adapter - 用來跟 message system 對接
  • Message Bridge - 用來連接兩個 channel 或 channel adapter
  • Message Bus - 透過一致的 API 讓不同系統進行訊息溝通
  • Guaranteed Delivery - 預設 Spring Integration 把 message 存在記憶體中, Spring Integration 2.0 後支援外部 message broker, 讓保證送達的功能由外部 message system 提供
Channels
MessageChannel
用來送訊息
public interface MessageChannel {
  boolean send(Message<?> message);
  boolean send(Message<?> message, long timeout);
}

PollableChannel
是 MessageChannel 的一種實作, 可以送訊息.
另外 PollableChannel 則是定義可以收訊息.
public interface PollableChannel extends MessageChannel {
  Message<?> receive();
  Message<?> receive(long timeout);
}
SubscribableChannel
是 MessageChannel 的一種實作, 可以傳送訊息.
另外 SubscribableChannel 還定義 MessageHandler 讓 API Client 能註冊自己的 MessageHandler 接收每一個訊息.
public interface MessageHandler {
  void handleMessage(Message<?> message) throws MessageException;
}

public interface SubscribableChannel extends MessageChannel {
  boolean subscribe(MessageHandler handler);
  boolean unsubscribe(MessageHandler handler);
}
QueueChannel
是一種 Point to Point Channel. 能保證只有一個 consumer 會收到訊息.
Example 每個 receiver 都在 receive, 但每個訊息都只會被一個 receiver 收走.
public class QueueChannelMain {

    static final Map<Object, AtomicInteger> receiveMsgCount = new ConcurrentHashMap<>();
    static final CountDownLatch l = new CountDownLatch(10);

    public static void main(String[] params) throws InterruptedException {
        QueueChannel channel = new QueueChannel();
        new Receiver(channel).start();
        new Receiver(channel).start();
        new Receiver(channel).start();
        LongStream.range(0,l.getCount()).forEach(idx -> {
            String msg = "Message" + idx;
            System.out.println("send msg:" + msg);
            channel.send(MessageBuilder.withPayload(msg).build());
        });
        l.await();
        System.out.println("test done. receiveMsgCount:" + receiveMsgCount);
    }

    public static class Receiver extends Thread {
        private QueueChannel channel;

        Receiver(QueueChannel channel) {
            this.channel = channel;
        }

        @Override
        public void run() {
            while (l.getCount() != 0) {
                Message m = channel.receive(TimeUnit.SECONDS.toMillis(1));
                if (m == null) {
                    continue;
                }
                Object pl = m.getPayload();
                receiveMsgCount.putIfAbsent(pl, new AtomicInteger());
                receiveMsgCount.get(pl).incrementAndGet();
                System.out.println(Thread.currentThread().getName() + " got message: " + pl + ", count:" + l);
                l.countDown();
            }
        }
    }

}
Example QueueChannel 是 FIFO, 如果超過 capacity, sender 會被 block.
public class QueueChannelMainExceedCapacityMain {

    public static void main(String[] params) {
        QueueChannel channel = new QueueChannel(2);
        send(channel, "1");
        send(channel, "2");
        send(channel, "3");
        System.out.println(channel.receive(1000));
        System.out.println(channel.receive(1000));
        System.out.println(channel.receive(1000));
    }

    private static void send(QueueChannel channel, String s) {
        try {
            System.out.println("send:" + s);
            channel.send(MessageBuilder.withPayload(s).build(), 1000); // the third message won't be sent
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

}



PriorityChannel 支援訊息進 channel 之後可以排隊
Example 預設會用 Message 的 header 的 PRIORITY
public class DefaultPriorityChannelMain {

    public static void main(String[] params) throws InterruptedException {
        PriorityChannel channel = new PriorityChannel();
        channel.send(MessageBuilder.withPayload("MSG1").setPriority(1).build());
        channel.send(MessageBuilder.withPayload("MSG2").setPriority(2).build());
        channel.send(MessageBuilder.withPayload("MSG3").setPriority(3).build());
        System.out.println(channel.receive()); // priority:3
        System.out.println(channel.receive()); // priority:2
        System.out.println(channel.receive()); // priority:1
    }

}
Example 可以指定自己的 Comparator
public class PriorityChannelCustomComparatorMain {

    public static void main(String[] params) {
        PriorityChannel channel = new PriorityChannel(Comparator.comparingLong(m -> (Long) m.getHeaders().get("MYVAL")));
        channel.send(MessageBuilder.withPayload("MSG1").setHeader("MYVAL", 3L).build());
        channel.send(MessageBuilder.withPayload("MSG2").setHeader("MYVAL", 2L).build());
        channel.send(MessageBuilder.withPayload("MSG3").setHeader("MYVAL", 1L).build());
        System.out.println(channel.receive()); // MYVAL:1
        System.out.println(channel.receive()); // MYVAL:2
        System.out.println(channel.receive()); // MYVAL:3
    }

}
RendezvousChannel synchronized QueueChannel, 內部 Queue 實作是 SynchronousQueue
Example 等到有人在等待訊息來, 不然會送不出去
public class RendezvousChannelMain {

    public static void main(String[] params) {
        RendezvousChannel channel = new RendezvousChannel();
        receive(channel);
        System.out.println("send 1");
        channel.send(MessageBuilder.withPayload("MSG1").build());
        System.out.println("send 2 and block until message was received");
        channel.send(MessageBuilder.withPayload("MSG2").build());
        System.out.println("won't reach here");
    }

    private static void receive(QueueChannel channel) {
        new Thread(() -> System.out.println("receive:" + channel.receive())).start();
    }

}

DirectChannel 提供 Pub-Sub 的 interface, 用 MessageHandler 收訊息, 但像 QueueChannel 一樣只會有一個 MessageHandler 收到.
Example
public class DirectChannelMain {

    public static void main(String[] params) {
        DirectChannel channel = new DirectChannel();
        channel.subscribe(new MyMessageHandler(1));
        channel.subscribe(new MyMessageHandler(2));
        channel.subscribe(new MyMessageHandler(3));
        channel.send(MessageBuilder.withPayload("MSG1").build());
        channel.send(MessageBuilder.withPayload("MSG2").build());
        channel.send(MessageBuilder.withPayload("MSG3").build());
    }

    static class MyMessageHandler implements MessageHandler {

        private final int id;

        MyMessageHandler(int id) {
            this.id = id;
        }

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            System.out.println("handler " + id + " receive:" + message);
        }
    }
}

ExecutorChannel 跟 DirectChannel 一樣, 但可以提供 Executor, 所以 sender 不會被 block
Example
public class ExecutorChannelMain {
    public static void main(String[] params) {
        ExecutorChannel channel = new ExecutorChannel(Executors.newCachedThreadPool());
        channel.subscribe(new MyMessageHandler(1));
        channel.subscribe(new MyMessageHandler(2));
        channel.subscribe(new MyMessageHandler(3));
        channel.send(MessageBuilder.withPayload("MSG1").build());
        channel.send(MessageBuilder.withPayload("MSG2").build());
        channel.send(MessageBuilder.withPayload("MSG3").build());
    }

    static class MyMessageHandler implements MessageHandler {

        private final int id;

        MyMessageHandler(int id) {
            this.id = id;
        }

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            System.out.println("handler " + id + " receive:" + message);
        }
    }
}

NullChannel send 的時候永遠說成功但其實不會送, receive 的時候永遠得到 null
Publish-Subscribe Channel 一個訊息送出去, 每個 MessageHandler 都會收到訊息
Example
public class PubsubChannelMain {

    public static void main(String[] params) {
        PublishSubscribeChannel channel = new PublishSubscribeChannel();
        channel.subscribe(new MyMessageHandler(1));
        channel.subscribe(new MyMessageHandler(2));
        channel.subscribe(new MyMessageHandler(3));
        channel.send(MessageBuilder.withPayload("MSG1").build());
        channel.send(MessageBuilder.withPayload("MSG2").build());
        channel.send(MessageBuilder.withPayload("MSG3").build());
    }

    static class MyMessageHandler implements MessageHandler {

        private final int id;

        MyMessageHandler(int id) {
            this.id = id;
        }

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            System.out.println("handler " + id + " receive:" + message);
        }
    }

}
Channel Inteceptors 可以攔截送出或接收到的訊息
Example
public class MessageInteceptorMain {

    public static void main(String[] params) {
        QueueChannel channel = new QueueChannel();
        channel.addInterceptor(new ChannelInterceptor() {
            @Override
            public boolean preReceive(MessageChannel channel) {
                System.out.println("pre receive");
                return true;
            }

            @Override
            public Message<?> postReceive(Message<?> message, MessageChannel channel) {
                System.out.println("post receive:" + message);
                return message;
            }

            @Override
            public Message<?> preSend(Message<?> message, MessageChannel channel) {
                System.out.println("presend:" + message);
                return message;
            }

            @Override
            public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
                System.out.println("post sned:" + message);
            }
        });
        channel.send(MessageBuilder.withPayload("test").build());
        System.out.println("receive:" + channel.receive());
    }

}

MessageTemplate 提供一些基本操作例如 sendAndReceive
Example sendAndReceive
public class MessagingTemplateMain {

    public static void main(String[] params) {
        QueueChannel channel = new QueueChannel();
        MessagingTemplate template = new MessagingTemplate(channel);
        new Thread(() -> {
            Message<?> message = channel.receive();
            System.out.println("receive:" + message);
            MessageChannel c = (MessageChannel) message.getHeaders().getReplyChannel();
            c.send(MessageBuilder.withPayload("OK!").build());
        }).start();
        template.setReceiveTimeout(10000);
        System.out.println("send and receive:" + template.sendAndReceive(MessageBuilder.withPayload("test").build()));

    }

}


沒有留言:

張貼留言

別名演算法 Alias Method

 題目 每個伺服器支援不同的 TPM (transaction per minute) 當 request 來的時候, 系統需要馬上根據 TPM 的能力隨機找到一個適合的 server. 雖然稱為 "隨機", 但還是需要有 TPM 作為權重. 解法 別名演算法...