Spring Integration - Basic Terms

Reference

Terms

org.springframework.integration.Message<T>

用來封裝訊息, 成員包含 MessageHeader 與 Payload

org.springframework.integration.MessageHeaders

MessageHeaders 是 immutable 的, 包含一些預設的 attribute: id, timestamp, correlation id, and priority

Payloads

用來裝 message body, 可以自訂 transformer 來傳遞封包

Message Channel

Message Channel 用來傳遞封包, 也用來 decouple producer 與 consumer.
Message Channel 有兩種模式: 
  1. point to point, 一個封包只會被一個 consumer 收到
  2. public/subscribe: 一個封包會被多個 consumer 收到

Message Endpoints

endpoint 泛指 Spring Integration 裡面的各種 component.
  • Message Adapter: 資料可從外部系統透過 adapter 送進 Spring Integration
  • Transformer: 轉換訊息
  • Filter: 決定 Message 是否傳給 Message Channel
  • Router: 透過 Message 的內容判斷要送給哪一個 Message Channel
  • Splitter: 把一個 Message 切成好幾個轉送給不同適合的 Message Channel
  • Aggregator: 把多個訊息合併成一個
  • Service activator: Message Channel 與 Service instance 之間的介面

Example

spring-context.xml


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-5.0.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">
        <context:component-scan base-package="examples" />
        <int:channel id="input"/>
        <int:channel id="output">
            <int:queue capacity="10"/>
        </int:channel>
        <int:service-activator input-channel="input"
                           output-channel="output"
                           ref="messageHandler"/>
</beans>

examples/MessageHandler.java

@Component
public class MessageHandler {
    @ServiceActivator
    public String handleMessage(String message) {
        System.out.println("Received message: " + message);
        return "MESSAGE:" + message;
    }
}

examples/Application.java

public class Application {
    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring-context.xml");
        context.start();

        MessageChannel input = context.getBean("input", MessageChannel.class );
        PollableChannel output = context.getBean("output", PollableChannel.class );

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNextLine()) {
            input.send(MessageBuilder.withPayload(scanner.nextLine()).build());
        }
    }
}

Run, input message and send to MessageHandler to print


Off Heap Cache - OHCache - Simple Example

Reference

What

OHC 是一個 off heap 的 cache library

Why

如果是用 map 做 cache, 資料都在 heap 裡面, heap 裡面的問題就是
  1. 會被 GC, GC 就會週期性的對 JVM 效能產生影響
  2. 如果要 persist 需要另外實作
如果是放在 off heap, 比方說用 mmap, 或是用 MappedByteBuffer access 的記憶體, 就是放在 heap memory 之外, 由作業系統控制著. 
好處是記憶體量不用被 heap size 限制, 缺點是要自己小心控制, 不然會造成系統問題.'

How

Maven

<dependency>
 <groupId>org.caffinitas.ohc</groupId>
 <artifactId>ohc-core</artifactId>
 <version>0.4.4</version>
</dependency>

Codes

public class SimpleMain {

    public static void main(String[] params) {
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNextLine()) {
            if (scanner.nextLine().trim().equals("GO")) {
                System.out.println(Runtime.getRuntime().freeMemory()/1024/1024 + "/" + Runtime.getRuntime().totalMemory()/1024/1024);
                OHCache<String,String> c = OHCacheBuilder.<String,String>newBuilder()
                        .keySerializer(new StringSerializer())
                        .valueSerializer(new StringSerializer())
                        .build();
                c.put("A", "A");
                System.out.println(c.get("A"));
                IntStream.range(0, 1000_000).forEach(idx -> {
                    c.put(UUID.randomUUID().toString(), UUID.randomUUID().toString());
                });
                System.out.println(Runtime.getRuntime().freeMemory()/1024/1024 + "/" + Runtime.getRuntime().totalMemory()/1024/1024);
            } else {
                System.out.println("input 'GO' then click enter");
            }
        }
    }

    private static class StringSerializer implements CacheSerializer<String> {

        @Override
        public void serialize(String s, ByteBuffer byteBuffer) {
            byteBuffer.put(s.getBytes());
        }

        @Override
        public String deserialize(ByteBuffer byteBuffer) {
            return StandardCharsets.UTF_8.decode(byteBuffer).toString();
        }

        @Override
        public int serializedSize(String s) {
            return s.getBytes().length;
        }
    }

}

Output


這張是輸入了好幾次 GO, 每次都會輸入一百萬次 key value 到 cache.

jconsole 與系統資源使用圖可以看到 heap 只用不到 100MB, 但實際上已經用了 600MB.
就是都放在 off heap 中

ForkJoinPool - Thread Management

Reference

Question

原本以為 new ForkJoinPool(2) 像這樣的宣告, 是讓 ForkJoinPool 最多維持 2 個 thread 來執行.
可是當實際觀察的時候, 卻發現有非常多個 thread 被叫起來跑.
比方說原本 fork -> join 的範例, 如果把 thread name 印出來, 就會發現有很多個 thread 被叫起來.
而且每秒印出的 pool 也會發現 pool size 變大.

Codes: Thread size exceed parallel level

public class ForkJoinPoolManyThreadMain {

    static final CountDownLatch latch = new CountDownLatch(1);
    private static final AtomicInteger threadCount = new AtomicInteger();

    public static void main(String[] params) throws ExecutionException, InterruptedException {
        ForkJoinPool pool = new ForkJoinPool(2);
        ScheduledExecutorService s = Executors.newScheduledThreadPool(1);
        s.scheduleAtFixedRate(() -> {
            System.out.println(pool); }, 0, 1, TimeUnit.SECONDS);
        pool.execute(new MainAction());
        latch.await();
        s.shutdown();
        pool.shutdown();
    }


    private static class MainAction extends RecursiveAction {

        @Override
        protected void compute() {
            List<SubAction> actions = IntStream.range(0,100).mapToObj(idx -> new SubAction()).collect(toList());
            actions.forEach(SubAction::fork);
            actions.forEach(SubAction::join);
            latch.countDown();
        }
    }

    private static class SubAction extends RecursiveAction {

        @Override
        protected void compute() {
            try {
                System.out.println(Thread.currentThread().getName() + " sleep 1 seconds");
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }


}

Output

從印出來的 log 就可以發現: thread 超過 2 個到達 14 個. pool size 實際上也到達 16





Requirement

發現這件事情的時候覺得無法理解, 同時也發現原本以為 main task 的 thread 是跟被 join 的 thread 合再一起了. 結果看起來是有很多新的 thread 被產生出來執行. 如此我們怎麼確保不會有過多的 thread 被建立出來? 

Suggestion: Customized ThreadFactory? (Don't do this)

因為 ForkJoinPool 裡的這段程式, 網路上有人建議是提供自己的 thread factory, 如果超過指定的 thread 數量就回傳 null. 如此就被 ForkJoinPool 當成一個錯誤而繼續使用既有的 thread.

這個做法的確可以控制建立的 thread 數量 , 不過回傳 null 對 ForkJoinPool 來說是一個錯誤, 以 ForkJoinPool 的規則來說這個時候是要 createWorker 的, 不應該出現錯誤而回傳 null.



How: Join after done

突然想到會產生新的 thread 是不是因為 join 導致 MainAction 停住, 但又不是正在執行, 所以為了避免 starvation 使 ForkJoinPool 需要建立一個新的 thread. 所以試著先判斷 isDone, done 才 join. 結果就可以了.  (但還不清楚原因)

Working Code

public class ForkJoinPoolFixedThreadMain {

    public static final CountDownLatch latch = new CountDownLatch(1);

    public static void main(String[] params) throws ExecutionException, InterruptedException {
        ForkJoinPool pool = new ForkJoinPool();
        MainAction action = new MainAction();
        ScheduledExecutorService s = Executors.newScheduledThreadPool(1);
        s.scheduleAtFixedRate(() -> System.out.println(pool), 0, 1, TimeUnit.SECONDS);
        pool.execute(action);
        latch.await();
        s.shutdown();
    }

    private static class MainAction extends RecursiveAction {

        @Override
        protected void compute() {
            List<SubAction> actions = IntStream.range(0,100).mapToObj(idx -> new SubAction()).collect(toList());
            actions.forEach(SubAction::fork);
            while(!actions.isEmpty()) {
                for (Iterator<SubAction> i = actions.iterator(); i.hasNext(); ) {
                    SubAction action = i.next();
                    if (action.isDone()) {
                        action.join();
                        i.remove();
                    }
                }
            }
            latch.countDown();
        }
    }

    private static class SubAction extends RecursiveAction {

        @Override
        protected void compute() {
            long start = System.currentTimeMillis();
            while (true) {
                if (System.currentTimeMillis() - start > 1000) {
                    break;
                }
            }
        }
    }


}

Output

因為呼叫 new ForkJoinPool() 所以 pool size 最多就維持在 8 (我的電腦有 8 core)

ForkJoinPool - Error Handling

Reference

Error Handling

如果 fork 出去的 task/action 有 exception, 呼叫 join 的時候要注意 catch exception.
不然 task/action 就死掉失控了.
而且可怕的是: 不會有 log.

Codes

這段程式是說每個 action 有個 id, 奇數 id 會報 exception.

Exception without catch when join

先是不要 catch 的情況
public class ForkJoinPoolErrorMain {

    static final CountDownLatch latch = new CountDownLatch(1);

    public static void main(String[] params) throws InterruptedException {
        ForkJoinPool pool = new ForkJoinPool(2);
        ScheduledExecutorService s = Executors.newScheduledThreadPool(1);
        s.scheduleAtFixedRate(() -> System.out.println(pool), 0, 1, TimeUnit.SECONDS);
        pool.submit(new ErrorAction(MAIN_TASK_ID));
        latch.await(10, TimeUnit.SECONDS);
        pool.shutdown();
        s.shutdown();
    }

    static class ErrorAction extends RecursiveAction {

        public static final int MAIN_TASK_ID = -1;
        private final int id;

        ErrorAction(int id) {
            this.id = id;
        }

        private boolean isSubTask() {
            return id >= 0;
        }

        @Override
        protected void compute() {
            if (isSubTask()) {
                if (id % 2 == 1) {
                    throw new IllegalStateException("Error when id is odd number");
                }
            } else {
                List<ErrorAction> actions = IntStream.range(0,10).mapToObj(idx -> new ErrorAction(idx)).collect(Collectors.toList());
                actions.forEach(ErrorAction::fork);
                actions.forEach(action -> {
                    action.join();
                });
                latch.countDown();
            }
            System.out.println("compute done. id=" + id);
        }
    }

}
輸出如下, 不過一個 exception 也沒有.
main action 也沒有完成.(main action id = -1)











Exception with catch statement

有 catch 的程式如下
public class ForkJoinPoolErrorMain {

    static final CountDownLatch latch = new CountDownLatch(1);

    public static void main(String[] params) throws InterruptedException {
        ForkJoinPool pool = new ForkJoinPool(2);
        ScheduledExecutorService s = Executors.newScheduledThreadPool(1);
        s.scheduleAtFixedRate(() -> System.out.println(pool), 0, 1, TimeUnit.SECONDS);
        pool.submit(new ErrorAction(MAIN_TASK_ID));
        latch.await(10, TimeUnit.SECONDS);
        pool.shutdown();
        s.shutdown();
    }

    static class ErrorAction extends RecursiveAction {

        public static final int MAIN_TASK_ID = -1;
        private final int id;

        ErrorAction(int id) {
            this.id = id;
        }

        private boolean isSubTask() {
            return id >= 0;
        }

        @Override
        protected void compute() {
            if (isSubTask()) {
                if (id % 2 == 1) {
                    throw new IllegalStateException("Error when id is odd number");
                }
            } else {
                List<ErrorAction> actions = IntStream.range(0,10).mapToObj(idx -> new ErrorAction(idx)).collect(Collectors.toList());
                actions.forEach(ErrorAction::fork);
                actions.forEach(action -> {
                    try {
                        action.join();
                    } catch (Exception ex) {
                        System.err.println(ex + ", id=" + action.id);
                    }
                });
                latch.countDown();
            }
            System.out.println("compute done. id=" + id);
        }
    }

}

這個程式就可以印 error, 而且 maintask 可以執行完成











Exception with ExecutorService

如果是 ExecutorService, 就算是丟 RuntimeException.
ExecutorService 還是會把 exception 印到 console 上.
public class ExecutorServiceWithExceptionMain {

    public static void main(String[] params) {
        ExecutorService ex = Executors.newSingleThreadExecutor();
        ex.execute(() -> {
            throw new IllegalStateException("test throw illegalState");
        });
        ex.shutdown();
    }


}
即使是直接丟 exception 也會印到 console.

ForkJoinPool - Work stealing


Reference

前篇: Java Concurrency - ForkJoinPool 的 deadlock
下篇: ForkJoinPool - Error Handling

Introduction

ForkJoinPool 實作 work stealing 的概念, 簡單說明一下

Work-stealing

  1. 每個 thread 會有一個 queue, queue 裡面放的是 CPU bound 的工作
  2. queue 裡面的工作可能會產生新的能平行作業的工作
  3. 新產生的工作會放在 queue 裡面
  4. 如果有 thread 把 queue 裡面的工作做完了, 就會去別的 queue 拿工作來處理 (steal)

Java Concurrency - ForkJoinPool 的 deadlock

Reference

Will ForkJoinTask encounter deadlock problem?

在看 ForkJoinTask 的時候, 看到 ForkJoinTask 可以先 fork 去執行, 再來呼叫 join 等待結束.
如圖, 些動作總共需要幾個 thread?
原本想:
1. MainTask 一個 thread, 兩個 subtask 各一個 thread 執行.
2. 當兩個 subtask 要 join 的時候, MainTask 會等待
如此一來, 若 ForkJoinPool 的 thread 只有一個, MainTask 佔一個, 那 subtask 要 fork 再 join 不就沒有 thread 可以處理?

測試之後發現, MainTask 的 thread 其實會在呼叫 join 的時候就被交出去.
因此當只有一個 thread, MainTask 的 thread 會執行到呼叫 subtask.join 的時候就離開, 讓出 thread 去執行 subtask 的任務.
舉例來說
public class ForkJoinSingleThreadMain {

    private static final CountDownLatch latch = new CountDownLatch(1);

    public static void main(String[] params) throws InterruptedException {
        ForkJoinPool pool = new ForkJoinPool(1);
        pool.submit(new MainTask());
        latch.await();
    }

    private static class MainTask extends RecursiveAction {

        @Override
        protected void compute() {
            Subtask subtask1 = new Subtask(1);
            Subtask subtask2 = new Subtask(2);
            System.out.println(Thread.currentThread().getName() + " fork subtask1");
            subtask1.fork();
            System.out.println(Thread.currentThread().getName() + " fork subtask2");
            subtask2.fork();
            System.out.println(Thread.currentThread().getName() + " join subtask1");
            subtask1.join();
            System.out.println(Thread.currentThread().getName() + " join subtask2");
            subtask2.join();
            System.out.println(Thread.currentThread().getName() + " join done");
            latch.countDown();
        }
    }

    private static class Subtask extends RecursiveAction {

        private final int id;

        Subtask(int id) {
            this.id = id;
        }

        @Override
        protected void compute() {
            System.out.println(Thread.currentThread().getName() + ":" + id + " compute");
            try {
                TimeUnit.MILLISECONDS.sleep(300);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread().getName() + ":" + id + " compute done");
        }
    }


}
這個 class 的執行結果是:
可以看到當 MainTask 呼叫 join 的時候就把 thread 交出去了.

那會 deadlock 嗎?
其實就是看 subtask 是否要搶 lock, 如果有就可能會 deadlock.
下面這個例子就可以製造 deadlock.
不過 ForkJoinPool 不能宣告為 new ForkJoinPool(1) 因為這樣只會有一個 thread 執行
至少要 new ForkJoinPool(2) 以上, 才可以有兩個 thread 搶 lock 造成 deadlock

public class ForkJoinPoolDeadlockMain {

    private static final CountDownLatch l = new CountDownLatch(1);
    private static final Object lock1 = new Object();
    private static final Object lock2 = new Object();

    public static void main(String[] params) throws InterruptedException {
        ForkJoinPool pool = new ForkJoinPool(2);
        pool.submit(ForkJoinTask.adapt(new Runnable(){
            @Override
            public void run() {
                Obj1Locker locker1 = new Obj1Locker();
                Obj2Locker locker2 = new Obj2Locker();
                locker1.fork();
                locker2.fork();
                locker1.join();
                locker2.join();
                l.countDown();
            }
        }));
        l.await();
    }

    private static class Obj1Locker extends RecursiveAction {

        @Override
        public void compute() {
            synchronized (lock1) {
                System.out.println("obj1Locker got lock1");
                sleep();
                synchronized (lock2) {
                    System.out.println("obj1Locker got lock2");
                }
            }
        }


    }

    private static class Obj2Locker extends RecursiveAction {

        @Override
        public void compute() {
            synchronized (lock2) {
                System.out.println("obj2Locker got lock2");
                sleep();
                synchronized (lock1) {
                    System.out.println("obj2Locker got lock1");
                }
            }
        }


    }

    private static void sleep() {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
        }
    }

}

Java Concurrent - ForkJoinPool

ExecutorService and Problem

用一個 ExecutorService submit task 後, 如果 task 會 submit 其他的 task, 這些 task 都會放進 ExecutorService 的 queue 中.
這樣就沒辦法做到 "第一個 task 與其產生的 subtask 完成後就印一行 log"

ForkJoinPool and Purpose

使用了 ForkJoinPool 就可以透過 task 的 fork 與 join 讓 task 變成樹狀結構.
  1. 建立 ForkJoinPool
  2. 用 ForkJoinPool submit task
  3. task 建立幾個新的 subtask
  4. 呼叫 subtask 的 fork
  5. 呼叫 subtask 的 join
  6. 執行到 parent 的 task 結束
如此就能讓 task 自然分群

Reference and Sample

Code

public class ForkJoinPoolMain {

    private static AtomicInteger actionCounter = new AtomicInteger();
    public static void main(String[] params) throws ExecutionException, InterruptedException {
        ForkJoinPool pool = new ForkJoinPool();
        System.out.println("main:" + pool.submit(new MyAction(new int[]{1,2,3,4,5,6,7,8,9,10,11})).get());
    }

    private static class MyAction extends RecursiveTask<Integer> {
        // Extends RecursiveAction if no return value required

        private int count;
        private int[] numbers;

        public MyAction(int[] numbers) {
            this.count = actionCounter.incrementAndGet();
            this.numbers = numbers;
        }

        @Override
        protected Integer compute() {
            if (numbers.length > 2) {
                System.out.println(count + " split " + Arrays.toString(numbers) + " to 2 arrays");
                List<MyAction> actions = new ArrayList<>();
                actions.add(new MyAction(Arrays.copyOfRange(numbers, 0, numbers.length / 2)));
                actions.add(new MyAction(Arrays.copyOfRange(numbers, numbers.length / 2, numbers.length)));
                actions.forEach(MyAction::fork);
                return actions.stream().mapToInt(MyAction::join).sum();
            } else if (numbers.length == 2) {
                System.out.println(count + " add " + numbers[0] + " and " + numbers[1]);
                return numbers[0] + numbers[1];
            }
            System.out.println(count + " just return " + numbers[0]);
            return numbers[0];
        }
    }

}
下篇:ForkJoinPool 沒處理好也會 deadlock

別名演算法 Alias Method

 題目 每個伺服器支援不同的 TPM (transaction per minute) 當 request 來的時候, 系統需要馬上根據 TPM 的能力隨機找到一個適合的 server. 雖然稱為 "隨機", 但還是需要有 TPM 作為權重. 解法 別名演算法...