前言
某天,Cassandra 停止監聽 thrift client 的連線,直到手動重啟才恢復。
檢查 Cassandra log 之後,發現遇到了 OutOfMemoryError
ERROR [Thrift-Selector_27] 2020-03-31 06:00:44,020 TDisruptorServer.java (line 391) run() exiting due to uncaught error
java.lang.OutOfMemoryError: Direct buffer memory
    at java.nio.Bits.reserveMemory(Bits.java:695)
    at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
    at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
    at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
    at sun.nio.ch.IOUtil.write(IOUtil.java:58)
    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
    at org.apache.thrift.transport.TNonblockingSocket.write(TNonblockingSocket.java:164)
    at com.thinkaurelius.thrift.util.mem.Buffer.writeTo(Buffer.java:104)
    at com.thinkaurelius.thrift.util.mem.FastMemoryOutputTransport.streamTo(FastMemoryOutputTransport.java:112)
    at com.thinkaurelius.thrift.Message.write(Message.java:222)
    at com.thinkaurelius.thrift.TDisruptorServer$SelectorThread.handleWrite(TDisruptorServer.java:598)
    at com.thinkaurelius.thrift.TDisruptorServer$SelectorThread.processKey(TDisruptorServer.java:569)
    at com.thinkaurelius.thrift.TDisruptorServer$AbstractSelectorThread.select(TDisruptorServer.java:423)
    at com.thinkaurelius.thrift.TDisruptorServer$AbstractSelectorThread.run(TDisruptorServer.java:383)

DirectByteBuffer.<init>
DirectByteBuffer 用來處理 off heap 記憶體。
可以看到它在建構子中需要透過 Bits.reserveMemory(size, cap) 來預留記憶體
// My env: jdk_1.8.0_211
class DirectByteBuffer extends MappedByteBuffer implements DirectBuffer {
...
    DirectByteBuffer(int cap) {  // package-private
        super(-1, 0, cap, cap);
        boolean pa = VM.isDirectMemoryPageAligned();
        int ps = Bits.pageSize();
        long size = Math.max(1L, (long)cap + (pa ? ps : 0));
        Bits.reserveMemory(size, cap);

        long base = 0;
        try {
            base = unsafe.allocateMemory(size);
        } catch (OutOfMemoryError x) {
            Bits.unreserveMemory(size, cap);
            throw x;
        }
        unsafe.setMemory(base, size, (byte) 0);
        if (pa && (base % ps != 0)) {
            // Round up to page boundary
            address = base + ps - (base & (ps - 1));
        } else {
            address = base;
        }
        cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
        att = null;
...
}

Bits.reserveMemory
Bits.java 用來記錄記憶體的使用狀況。
在確認 Bits 的 reserveMemory 成功之前,不會透過 unsafe class 分配記憶體。
在這段程式碼中,會看到 "totalCapacity" 用來檢查容量是否足夠。
它會拿來跟 "maxMemory" 做比較
"maxMemory" 來自 VM.maxDirectMemory()
"VM.maxDirectMemory" 的值可以在啟動 JVM 時透過 "-XX:MaxDirectMemorySize=<size>" 來設定
class Bits {     
    // -- Direct memory management --

    // A user-settable upper limit on the maximum amount of allocatable
    // direct buffer memory.  This value may be changed during VM
    // initialization if it is launched with "-XX:MaxDirectMemorySize=<size>".
    private static volatile long maxMemory = VM.maxDirectMemory();
    private static final AtomicLong reservedMemory = new AtomicLong();
    private static final AtomicLong totalCapacity = new AtomicLong();
    private static final AtomicLong count = new AtomicLong();
    private static volatile boolean memoryLimitSet = false;
    // max. number of sleeps during try-reserving with exponentially
    // increasing delay before throwing OutOfMemoryError:
    // 1, 2, 4, 8, 16, 32, 64, 128, 256 (total 511 ms ~ 0.5 s)
    // which means that OOME will be thrown after 0.5 s of trying
    private static final int MAX_SLEEPS = 9;
    
    
    // These methods should be called whenever direct memory is allocated or
    // freed.  They allow the user to control the amount of direct memory
    // which a process may access.  All sizes are specified in bytes.
    static void reserveMemory(long size, int cap) {
    
        if (!memoryLimitSet && VM.isBooted()) {
            maxMemory = VM.maxDirectMemory();
            memoryLimitSet = true;
        }

        // optimist!
        if (tryReserveMemory(size, cap)) {
            return;
        }

        final JavaLangRefAccess jlra = SharedSecrets.getJavaLangRefAccess();

        // retry while helping enqueue pending Reference objects
        // which includes executing pending Cleaner(s) which includes
        // Cleaner(s) that free direct buffer memory
        while (jlra.tryHandlePendingReference()) {
            if (tryReserveMemory(size, cap)) {
                return;
            }
        }

        // trigger VM's Reference processing
        System.gc();

        // a retry loop with exponential back-off delays
        // (this gives VM some time to do it's job)
        boolean interrupted = false;
        try {
            long sleepTime = 1;
            int sleeps = 0;
            while (true) {
                if (tryReserveMemory(size, cap)) {
                    return;
                }
                if (sleeps >= MAX_SLEEPS) {
                    break;
                }
                if (!jlra.tryHandlePendingReference()) {
                    try {
                        Thread.sleep(sleepTime);
                        sleepTime <<= 1;
                        sleeps++;
                    } catch (InterruptedException e) {
                        interrupted = true;
                    }
                }
            }

            // no luck
            throw new OutOfMemoryError("Direct buffer memory");

        } finally {
            if (interrupted) {
                // don't swallow interrupts
                Thread.currentThread().interrupt();
            }
        }
    }

    private static boolean tryReserveMemory(long size, int cap) {
   
        // -XX:MaxDirectMemorySize limits the total capacity rather than the
        // actual memory usage, which will differ when buffers are page
        // aligned.
        long totalCap;
        while (cap <= maxMemory - (totalCap = totalCapacity.get())) {
            if (totalCapacity.compareAndSet(totalCap, totalCap + cap)) {
                reservedMemory.addAndGet(size);
                count.incrementAndGet();
                return true;
            }
        }
        return false;
    }

    static void unreserveMemory(long size, int cap) {
        long cnt = count.decrementAndGet();
        long reservedMem = reservedMemory.addAndGet(-size);
        long totalCap = totalCapacity.addAndGet(-cap);
        assert cnt >= 0 && reservedMem >= 0 && totalCap >= 0;
    }
...
}

VM.maxDirectMemory()
打開 VM.java,會發現預設的 maxDirectMemory 是 64MB

在 VM.java 中,可以看到
  1. 沒有設定 "sun.nio.MaxDirectMemorySize" 的話,使用預設的 64MB
  2. 將 "sun.nio.MaxDirectMemorySize" 設為 -1,則使用 Runtime.getRuntime().maxMemory()
  3. 其他情況使用使用者指定的記憶體大小
  4. MaxDirectMemorySize 是整個 process 共用的,也就是說當有很多 thread 同時建立有 capacity 的 DirectByteBuffer 時,超過 64MB 就很容易發生 OutOfMemoryError。
    如果負載量是預期內的,可以考慮加大這個值
public class VM {
...
    public static void saveAndRemoveProperties(Properties var0) {
        if (booted) {
            throw new IllegalStateException("System initialization has completed");
        } else {
            savedProps.putAll(var0);
            String var1 = (String)var0.remove("sun.nio.MaxDirectMemorySize");
            if (var1 != null) {
                if (var1.equals("-1")) {
                    directMemory = Runtime.getRuntime().maxMemory();
                } else {
                    long var2 = Long.parseLong(var1);
                    if (var2 > -1L) {
                        directMemory = var2;
                    }
                }
            }

            var1 = (String)var0.remove("sun.nio.PageAlignDirectMemory");
            if ("true".equals(var1)) {
                pageAlignDirectMemory = true;
            }

            var1 = var0.getProperty("sun.lang.ClassLoader.allowArraySyntax");
            allowArraySyntax = var1 == null ? defaultAllowArraySyntax : Boolean.parseBoolean(var1);
            var0.remove("java.lang.Integer.IntegerCache.high");
            var0.remove("sun.zip.disableMemoryMapping");
            var0.remove("sun.java.launcher.diag");
            var0.remove("sun.cds.enableSharedLookupCache");
        }
    }
...
}

關於 heap OutOfMemoryError
可以設定在 OutOfMemoryError 發生時執行某些動作
-XX:OnOutOfMemoryError=/restart.sh"
-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/yourpath"


Tags:

Updated: