Java Concurrent CyclicBarrier

Introduction

CyclicBarrier 很久沒用都忘了, 來複習一下..

Description

簡單說就是很多個 thread 去作事情, 作完之後就 await.
CyclicBarrier 等作完的 thread 達到指定的個數後, await 就結束.

Code

public class CyclicBarrierMain {

    public static void main(String[] params) throws InterruptedException {
        int runner = 5;
        CountDownLatch gameOver = new CountDownLatch(runner);
        CyclicBarrier b = new CyclicBarrier(runner, () -> {
            System.out.println("barrier done");
        });
        IntStream.range(0,runner).forEach(idx -> {
            new Thread() {
                @Override
                public void run() {
                    try {
                        System.out.println("wait " + idx);
                        b.await();
                        System.out.println("count down " + idx);
                        gameOver.countDown();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }.start();
        });
        gameOver.await();
        System.out.println("game over");
    }

}

Java Concurrent Phaser

Introduction

今天才知道 Java 7 有提供一個 Phaser class 來處理更複雜的 CountDownLatch 需求.

Simple Phaser

想用 Phaser, 首先你的 threads 工作要有分階段, 一個階段沒做完就不能進行下個階段.
有了這個限制後就可以照下面的流程做事情
  1. 先註冊, 看有幾個 thread 要做事情.
    => 透過呼叫 Phaser#register 去註冊
  2. 做完之後呼叫 arrive, 告訴 Phaser 有事情做完了
  3. 當每個 thread 都把事情做完, Phaser 就會進入下一個階段, 稱做 advance
  4. 當進入下一階段, Phaser 的 onAdvance method 就會被呼叫,
    這個 method 是留給我們 developer 去實做
以上. 簡單的概念, 覺得看好久都沒看懂, 希望看這篇的人可以簡單看懂.
接著是範例, 這範例是

  1. 假設有幾個人賽跑
  2. 首先要到達起跑線 
  3. 都到了以後才開跑 
  4. 每個人都到達終點舊比賽結束.

public class PhaserRacingGame {

    public static void main(String[] params) {
        Random r = new Random();
        Phaser perGamePhase = new Phaser() {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("phase:" + phase + ", registeredParties:" + registeredParties + ", arrived:" + getArrivedParties());
                switch (phase) {
                    case 0:
                        System.out.println("start!");
                        break;
                    case 1:
                        System.out.println("game finish");
                        break;
                    default:
                        System.out.println("unexpected:" + phase);
                }
                return super.onAdvance(phase, registeredParties);
            }
        };
        IntStream.range(0,5).forEach(idx -> {
            perGamePhase.register();
            new Thread(() -> {
                System.out.println(idx + " arrive starting line and wait for start");
                perGamePhase.arriveAndAwaitAdvance();
                System.out.println(idx + " run!! startTime:" + System.currentTimeMillis());
                long spendTime = r.nextInt(1000);
                sleep(spendTime);
                perGamePhase.arrive();
            }).start();
        });
        sleep(5000);
    }

    private static void sleep(long ms) {
        try {
            TimeUnit.MILLISECONDS.sleep(ms);
        } catch (InterruptedException e) {
        }
    }

}

Parent Phaser

再來比較複雜的是 Parent Phaser, Parent Phaser 的目的是減少 synchronization 的時候搶 lock 的碰撞. 
使用 Parent Phaser 的時候要注意的是: advance 是發生在 Parent Phaser 而不是 Child Phaser, 
使用 Parent Phaser 的時候, 是可以確保每個 Child Phaser 準備號都進入下一個 phase 的時候, 已經先到達的 Child Phaser 才可以更進一步 arrive.
比方說世足十六強賽, 要等所有能進入八強賽的隊伍都出線之後才可以開始進行八強賽.
那 Parent Phaser 就是賽程, 十六強賽則是有八個 Child Phaser.
以下範例是看出特性, 方便 copy paste 觀察
public class ParentPhaser {

    public static void main(String[] params) {
        Phaser parent = new Phaser() {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("parent phase:" + phase + ", registeredParties:" + registeredParties);
                return super.onAdvance(phase, registeredParties);
            }
        };
        Phaser child1 = new Phaser(parent, 4) {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("child phase:" + phase + ", registeredParties:" + registeredParties);
                return super.onAdvance(phase, registeredParties);
            }
        };
        Phaser child2 = new Phaser(parent, 3) {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("child phase:" + phase + ", registeredParties:" + registeredParties);
                return super.onAdvance(phase, registeredParties);
            }
        };
        IntStream.range(0,10).forEach(idx -> {
            child1.arrive();
            child1.arrive();
            child1.arrive();
            child1.arrive();
//            child1.arrive(); // cause error, child1 need wait for child2 finish
            child2.arrive();
            child2.arrive();
            child2.arrive();
        });
    }


}

別名演算法 Alias Method

 題目 每個伺服器支援不同的 TPM (transaction per minute) 當 request 來的時候, 系統需要馬上根據 TPM 的能力隨機找到一個適合的 server. 雖然稱為 "隨機", 但還是需要有 TPM 作為權重. 解法 別名演算法...