Spring Integration - Channels


Reference

Concept
  • Point to Point Channel - 保證只有一個 receiver 會收到訊息. Ex. QueueChannel, PriorityChannel, RendezvousChannel, DirectChannel, ExecutorChannel, and NullChannel.
  • Publish-Subscribe Channel - 訊息會被多個 receiver 收到
  • Data-Typed Channel - receiver 透過 data type 來決定是否要收訊息, 這種 channel, receiver 需要知道自己要處理的 data type 是什麼.
  • Invalid Message Channel - application 決定某個 message 無效之後可以把 message 丟到這個 channel 來讓別的 application 決定怎麼處理無用的 message
  • Dead Letter Channel - producer 重複送訊息失敗幾次後就會把訊息送到 Dead Letter Channel 給其他 application 處理
  • Channel Adapter - 用來跟 message system 對接
  • Message Bridge - 用來連接兩個 channel 或 channel adapter
  • Message Bus - 透過一致的 API 讓不同系統進行訊息溝通
  • Guaranteed Delivery - 預設 Spring Integration 把 message 存在記憶體中, Spring Integration 2.0 後支援外部 message broker, 讓保證送達的功能由外部 message system 提供
Channels
MessageChannel
用來送訊息
public interface MessageChannel {
  boolean send(Message<?> message);
  boolean send(Message<?> message, long timeout);
}

PollableChannel
是 MessageChannel 的一種實作, 可以送訊息.
另外 PollableChannel 則是定義可以收訊息.
public interface PollableChannel extends MessageChannel {
  Message<?> receive();
  Message<?> receive(long timeout);
}
SubscribableChannel
是 MessageChannel 的一種實作, 可以傳送訊息.
另外 SubscribableChannel 還定義 MessageHandler 讓 API Client 能註冊自己的 MessageHandler 接收每一個訊息.
public interface MessageHandler {
  void handleMessage(Message<?> message) throws MessageException;
}

public interface SubscribableChannel extends MessageChannel {
  boolean subscribe(MessageHandler handler);
  boolean unsubscribe(MessageHandler handler);
}
QueueChannel
是一種 Point to Point Channel. 能保證只有一個 consumer 會收到訊息.
Example 每個 receiver 都在 receive, 但每個訊息都只會被一個 receiver 收走.
public class QueueChannelMain {

    static final Map<Object, AtomicInteger> receiveMsgCount = new ConcurrentHashMap<>();
    static final CountDownLatch l = new CountDownLatch(10);

    public static void main(String[] params) throws InterruptedException {
        QueueChannel channel = new QueueChannel();
        new Receiver(channel).start();
        new Receiver(channel).start();
        new Receiver(channel).start();
        LongStream.range(0,l.getCount()).forEach(idx -> {
            String msg = "Message" + idx;
            System.out.println("send msg:" + msg);
            channel.send(MessageBuilder.withPayload(msg).build());
        });
        l.await();
        System.out.println("test done. receiveMsgCount:" + receiveMsgCount);
    }

    public static class Receiver extends Thread {
        private QueueChannel channel;

        Receiver(QueueChannel channel) {
            this.channel = channel;
        }

        @Override
        public void run() {
            while (l.getCount() != 0) {
                Message m = channel.receive(TimeUnit.SECONDS.toMillis(1));
                if (m == null) {
                    continue;
                }
                Object pl = m.getPayload();
                receiveMsgCount.putIfAbsent(pl, new AtomicInteger());
                receiveMsgCount.get(pl).incrementAndGet();
                System.out.println(Thread.currentThread().getName() + " got message: " + pl + ", count:" + l);
                l.countDown();
            }
        }
    }

}
Example QueueChannel 是 FIFO, 如果超過 capacity, sender 會被 block.
public class QueueChannelMainExceedCapacityMain {

    public static void main(String[] params) {
        QueueChannel channel = new QueueChannel(2);
        send(channel, "1");
        send(channel, "2");
        send(channel, "3");
        System.out.println(channel.receive(1000));
        System.out.println(channel.receive(1000));
        System.out.println(channel.receive(1000));
    }

    private static void send(QueueChannel channel, String s) {
        try {
            System.out.println("send:" + s);
            channel.send(MessageBuilder.withPayload(s).build(), 1000); // the third message won't be sent
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

}



PriorityChannel 支援訊息進 channel 之後可以排隊
Example 預設會用 Message 的 header 的 PRIORITY
public class DefaultPriorityChannelMain {

    public static void main(String[] params) throws InterruptedException {
        PriorityChannel channel = new PriorityChannel();
        channel.send(MessageBuilder.withPayload("MSG1").setPriority(1).build());
        channel.send(MessageBuilder.withPayload("MSG2").setPriority(2).build());
        channel.send(MessageBuilder.withPayload("MSG3").setPriority(3).build());
        System.out.println(channel.receive()); // priority:3
        System.out.println(channel.receive()); // priority:2
        System.out.println(channel.receive()); // priority:1
    }

}
Example 可以指定自己的 Comparator
public class PriorityChannelCustomComparatorMain {

    public static void main(String[] params) {
        PriorityChannel channel = new PriorityChannel(Comparator.comparingLong(m -> (Long) m.getHeaders().get("MYVAL")));
        channel.send(MessageBuilder.withPayload("MSG1").setHeader("MYVAL", 3L).build());
        channel.send(MessageBuilder.withPayload("MSG2").setHeader("MYVAL", 2L).build());
        channel.send(MessageBuilder.withPayload("MSG3").setHeader("MYVAL", 1L).build());
        System.out.println(channel.receive()); // MYVAL:1
        System.out.println(channel.receive()); // MYVAL:2
        System.out.println(channel.receive()); // MYVAL:3
    }

}
RendezvousChannel synchronized QueueChannel, 內部 Queue 實作是 SynchronousQueue
Example 等到有人在等待訊息來, 不然會送不出去
public class RendezvousChannelMain {

    public static void main(String[] params) {
        RendezvousChannel channel = new RendezvousChannel();
        receive(channel);
        System.out.println("send 1");
        channel.send(MessageBuilder.withPayload("MSG1").build());
        System.out.println("send 2 and block until message was received");
        channel.send(MessageBuilder.withPayload("MSG2").build());
        System.out.println("won't reach here");
    }

    private static void receive(QueueChannel channel) {
        new Thread(() -> System.out.println("receive:" + channel.receive())).start();
    }

}

DirectChannel 提供 Pub-Sub 的 interface, 用 MessageHandler 收訊息, 但像 QueueChannel 一樣只會有一個 MessageHandler 收到.
Example
public class DirectChannelMain {

    public static void main(String[] params) {
        DirectChannel channel = new DirectChannel();
        channel.subscribe(new MyMessageHandler(1));
        channel.subscribe(new MyMessageHandler(2));
        channel.subscribe(new MyMessageHandler(3));
        channel.send(MessageBuilder.withPayload("MSG1").build());
        channel.send(MessageBuilder.withPayload("MSG2").build());
        channel.send(MessageBuilder.withPayload("MSG3").build());
    }

    static class MyMessageHandler implements MessageHandler {

        private final int id;

        MyMessageHandler(int id) {
            this.id = id;
        }

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            System.out.println("handler " + id + " receive:" + message);
        }
    }
}

ExecutorChannel 跟 DirectChannel 一樣, 但可以提供 Executor, 所以 sender 不會被 block
Example
public class ExecutorChannelMain {
    public static void main(String[] params) {
        ExecutorChannel channel = new ExecutorChannel(Executors.newCachedThreadPool());
        channel.subscribe(new MyMessageHandler(1));
        channel.subscribe(new MyMessageHandler(2));
        channel.subscribe(new MyMessageHandler(3));
        channel.send(MessageBuilder.withPayload("MSG1").build());
        channel.send(MessageBuilder.withPayload("MSG2").build());
        channel.send(MessageBuilder.withPayload("MSG3").build());
    }

    static class MyMessageHandler implements MessageHandler {

        private final int id;

        MyMessageHandler(int id) {
            this.id = id;
        }

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            System.out.println("handler " + id + " receive:" + message);
        }
    }
}

NullChannel send 的時候永遠說成功但其實不會送, receive 的時候永遠得到 null
Publish-Subscribe Channel 一個訊息送出去, 每個 MessageHandler 都會收到訊息
Example
public class PubsubChannelMain {

    public static void main(String[] params) {
        PublishSubscribeChannel channel = new PublishSubscribeChannel();
        channel.subscribe(new MyMessageHandler(1));
        channel.subscribe(new MyMessageHandler(2));
        channel.subscribe(new MyMessageHandler(3));
        channel.send(MessageBuilder.withPayload("MSG1").build());
        channel.send(MessageBuilder.withPayload("MSG2").build());
        channel.send(MessageBuilder.withPayload("MSG3").build());
    }

    static class MyMessageHandler implements MessageHandler {

        private final int id;

        MyMessageHandler(int id) {
            this.id = id;
        }

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            System.out.println("handler " + id + " receive:" + message);
        }
    }

}
Channel Inteceptors 可以攔截送出或接收到的訊息
Example
public class MessageInteceptorMain {

    public static void main(String[] params) {
        QueueChannel channel = new QueueChannel();
        channel.addInterceptor(new ChannelInterceptor() {
            @Override
            public boolean preReceive(MessageChannel channel) {
                System.out.println("pre receive");
                return true;
            }

            @Override
            public Message<?> postReceive(Message<?> message, MessageChannel channel) {
                System.out.println("post receive:" + message);
                return message;
            }

            @Override
            public Message<?> preSend(Message<?> message, MessageChannel channel) {
                System.out.println("presend:" + message);
                return message;
            }

            @Override
            public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
                System.out.println("post sned:" + message);
            }
        });
        channel.send(MessageBuilder.withPayload("test").build());
        System.out.println("receive:" + channel.receive());
    }

}

MessageTemplate 提供一些基本操作例如 sendAndReceive
Example sendAndReceive
public class MessagingTemplateMain {

    public static void main(String[] params) {
        QueueChannel channel = new QueueChannel();
        MessagingTemplate template = new MessagingTemplate(channel);
        new Thread(() -> {
            Message<?> message = channel.receive();
            System.out.println("receive:" + message);
            MessageChannel c = (MessageChannel) message.getHeaders().getReplyChannel();
            c.send(MessageBuilder.withPayload("OK!").build());
        }).start();
        template.setReceiveTimeout(10000);
        System.out.println("send and receive:" + template.sendAndReceive(MessageBuilder.withPayload("test").build()));

    }

}


Netty Example - server echo user input message


Reference
Maven
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.10.Final</version>
</dependency>
Description
這個例子跟 guide 上的不太相同, 除了原本的 echo 行為.
另外有做讓人 input message, 從 client 送給 server 做 echo.
MainClass
public class EchoServerMain {

    public static void main(String[] params) throws Exception {
        new Thread(() -> {
            try {
                new EchoServer(1234).run();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
        TimeUnit.SECONDS.sleep(1);
        new EchoClient("localhost", 1234).connectRunAndExit();
    }

    public static class EchoClient {

        private final String ip;
        private final int port;

        EchoClient(String ip, int port) {
            this.port = port;
            this.ip = ip;
        }

        public void connectRunAndExit() throws InterruptedException {
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(workerGroup);
                b.channel(NioSocketChannel.class);
                b.handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new EchoClientHandler());
                    }
                });

                ChannelFuture f = b.connect(ip, port).sync();

                Channel ch = f.channel();
                System.out.println("scan...");
                Scanner scanner = new Scanner(System.in);
                while (scanner.hasNextLine()) {
                    String msg = scanner.nextLine();
                    System.out.println("Input:" + msg);
                    ByteBuf msgBuffer = Unpooled.wrappedBuffer(msg.getBytes(CharsetUtil.UTF_8));
                    ch.writeAndFlush(msgBuffer);
                }


                // Wait until the connection is closed.
                f.channel().closeFuture().sync();
            } finally {
                workerGroup.shutdownGracefully().sync();
            }
        }
    }


    public static class EchoServer {
        private int port;

        public EchoServer(int port) {
            this.port = port;
        }

        public void run() throws Exception {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(group)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                socketChannel.pipeline().addLast(new EchoServerHandler());
                            }
                        });

                ChannelFuture f = b.bind(port).sync();
                System.out.println("bind done");
                f.channel().closeFuture().sync();
                System.out.println("close done");
            } finally {
                group.shutdownGracefully().sync();
            }
        }
    }


}
EchoClientHandler
public class EchoClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
        System.out.println("Client receive:" + ((ByteBuf)msg).toString(Charset.defaultCharset()));
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
    }
}

EchoServerHandler
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        System.out.println("triggered:" + evt);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("server receive:" + msg);
        ctx.writeAndFlush(msg);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("connection active:" + ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("connection inactive:" + ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

Output

別名演算法 Alias Method

 題目 每個伺服器支援不同的 TPM (transaction per minute) 當 request 來的時候, 系統需要馬上根據 TPM 的能力隨機找到一個適合的 server. 雖然稱為 "隨機", 但還是需要有 TPM 作為權重. 解法 別名演算法...