Java Concurrent - ForkJoinPool

ExecutorService and Problem

用一個 ExecutorService submit task 後, 如果 task 會 submit 其他的 task, 這些 task 都會放進 ExecutorService 的 queue 中.
這樣就沒辦法做到 "第一個 task 與其產生的 subtask 完成後就印一行 log"

ForkJoinPool and Purpose

使用了 ForkJoinPool 就可以透過 task 的 fork 與 join 讓 task 變成樹狀結構.
  1. 建立 ForkJoinPool
  2. 用 ForkJoinPool submit task
  3. task 建立幾個新的 subtask
  4. 呼叫 subtask 的 fork
  5. 呼叫 subtask 的 join
  6. 執行到 parent 的 task 結束
如此就能讓 task 自然分群

Reference and Sample

Code

public class ForkJoinPoolMain {

    private static AtomicInteger actionCounter = new AtomicInteger();
    public static void main(String[] params) throws ExecutionException, InterruptedException {
        ForkJoinPool pool = new ForkJoinPool();
        System.out.println("main:" + pool.submit(new MyAction(new int[]{1,2,3,4,5,6,7,8,9,10,11})).get());
    }

    private static class MyAction extends RecursiveTask<Integer> {
        // Extends RecursiveAction if no return value required

        private int count;
        private int[] numbers;

        public MyAction(int[] numbers) {
            this.count = actionCounter.incrementAndGet();
            this.numbers = numbers;
        }

        @Override
        protected Integer compute() {
            if (numbers.length > 2) {
                System.out.println(count + " split " + Arrays.toString(numbers) + " to 2 arrays");
                List<MyAction> actions = new ArrayList<>();
                actions.add(new MyAction(Arrays.copyOfRange(numbers, 0, numbers.length / 2)));
                actions.add(new MyAction(Arrays.copyOfRange(numbers, numbers.length / 2, numbers.length)));
                actions.forEach(MyAction::fork);
                return actions.stream().mapToInt(MyAction::join).sum();
            } else if (numbers.length == 2) {
                System.out.println(count + " add " + numbers[0] + " and " + numbers[1]);
                return numbers[0] + numbers[1];
            }
            System.out.println(count + " just return " + numbers[0]);
            return numbers[0];
        }
    }

}
下篇:ForkJoinPool 沒處理好也會 deadlock

沒有留言:

張貼留言

別名演算法 Alias Method

 題目 每個伺服器支援不同的 TPM (transaction per minute) 當 request 來的時候, 系統需要馬上根據 TPM 的能力隨機找到一個適合的 server. 雖然稱為 "隨機", 但還是需要有 TPM 作為權重. 解法 別名演算法...