ForkJoinPool - Thread Management

Reference

Question

原本以為 new ForkJoinPool(2) 像這樣的宣告, 是讓 ForkJoinPool 最多維持 2 個 thread 來執行.
可是當實際觀察的時候, 卻發現有非常多個 thread 被叫起來跑.
比方說原本 fork -> join 的範例, 如果把 thread name 印出來, 就會發現有很多個 thread 被叫起來.
而且每秒印出的 pool 也會發現 pool size 變大.

Codes: Thread size exceed parallel level

public class ForkJoinPoolManyThreadMain {

    static final CountDownLatch latch = new CountDownLatch(1);
    private static final AtomicInteger threadCount = new AtomicInteger();

    public static void main(String[] params) throws ExecutionException, InterruptedException {
        ForkJoinPool pool = new ForkJoinPool(2);
        ScheduledExecutorService s = Executors.newScheduledThreadPool(1);
        s.scheduleAtFixedRate(() -> {
            System.out.println(pool); }, 0, 1, TimeUnit.SECONDS);
        pool.execute(new MainAction());
        latch.await();
        s.shutdown();
        pool.shutdown();
    }


    private static class MainAction extends RecursiveAction {

        @Override
        protected void compute() {
            List<SubAction> actions = IntStream.range(0,100).mapToObj(idx -> new SubAction()).collect(toList());
            actions.forEach(SubAction::fork);
            actions.forEach(SubAction::join);
            latch.countDown();
        }
    }

    private static class SubAction extends RecursiveAction {

        @Override
        protected void compute() {
            try {
                System.out.println(Thread.currentThread().getName() + " sleep 1 seconds");
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }


}

Output

從印出來的 log 就可以發現: thread 超過 2 個到達 14 個. pool size 實際上也到達 16





Requirement

發現這件事情的時候覺得無法理解, 同時也發現原本以為 main task 的 thread 是跟被 join 的 thread 合再一起了. 結果看起來是有很多新的 thread 被產生出來執行. 如此我們怎麼確保不會有過多的 thread 被建立出來? 

Suggestion: Customized ThreadFactory? (Don't do this)

因為 ForkJoinPool 裡的這段程式, 網路上有人建議是提供自己的 thread factory, 如果超過指定的 thread 數量就回傳 null. 如此就被 ForkJoinPool 當成一個錯誤而繼續使用既有的 thread.

這個做法的確可以控制建立的 thread 數量 , 不過回傳 null 對 ForkJoinPool 來說是一個錯誤, 以 ForkJoinPool 的規則來說這個時候是要 createWorker 的, 不應該出現錯誤而回傳 null.



How: Join after done

突然想到會產生新的 thread 是不是因為 join 導致 MainAction 停住, 但又不是正在執行, 所以為了避免 starvation 使 ForkJoinPool 需要建立一個新的 thread. 所以試著先判斷 isDone, done 才 join. 結果就可以了.  (但還不清楚原因)

Working Code

public class ForkJoinPoolFixedThreadMain {

    public static final CountDownLatch latch = new CountDownLatch(1);

    public static void main(String[] params) throws ExecutionException, InterruptedException {
        ForkJoinPool pool = new ForkJoinPool();
        MainAction action = new MainAction();
        ScheduledExecutorService s = Executors.newScheduledThreadPool(1);
        s.scheduleAtFixedRate(() -> System.out.println(pool), 0, 1, TimeUnit.SECONDS);
        pool.execute(action);
        latch.await();
        s.shutdown();
    }

    private static class MainAction extends RecursiveAction {

        @Override
        protected void compute() {
            List<SubAction> actions = IntStream.range(0,100).mapToObj(idx -> new SubAction()).collect(toList());
            actions.forEach(SubAction::fork);
            while(!actions.isEmpty()) {
                for (Iterator<SubAction> i = actions.iterator(); i.hasNext(); ) {
                    SubAction action = i.next();
                    if (action.isDone()) {
                        action.join();
                        i.remove();
                    }
                }
            }
            latch.countDown();
        }
    }

    private static class SubAction extends RecursiveAction {

        @Override
        protected void compute() {
            long start = System.currentTimeMillis();
            while (true) {
                if (System.currentTimeMillis() - start > 1000) {
                    break;
                }
            }
        }
    }


}

Output

因為呼叫 new ForkJoinPool() 所以 pool size 最多就維持在 8 (我的電腦有 8 core)

沒有留言:

張貼留言

別名演算法 Alias Method

 題目 每個伺服器支援不同的 TPM (transaction per minute) 當 request 來的時候, 系統需要馬上根據 TPM 的能力隨機找到一個適合的 server. 雖然稱為 "隨機", 但還是需要有 TPM 作為權重. 解法 別名演算法...