Scan Cassandra-2.0.17 log to check write data loading


We use Cassandra 2.0.17.
Sometimes we suffer IO problem in system, we can use "iotop" to know process IO loading.
But sometimes we need more detail, such as "io-write loading for each table"
Cassandra log is very good for us to know about it.
I write a program to scan it, generate a csv file, and then copy-paste it to Google Sheet for further analyzing, such as generate a graphic.

This program will generate some files (I write this file long time ago, please inform me if file content is wrong):
  • output-compact-mb-xxx: Write loading by compaction
  • output-compact-mb-agg-xxx: Write loading per minutes by compaction
  • output-queue-xxx: Queue size of flush memory event
  • output-xxx: Flush memory size by column family
  • output-xxx-avg: Flush memory size per minutes by colmnfamily

This program can scan file pattern "system.log" and "system.log.*"

I often check output-xxx file, content is as follows:
column 1 - column family name
column 2 - flush event time stamp
column 3 - flush memory (MB)
mytable,2019-03-24 16:16:28,11


Here is the program, please include commons-lang and commons-io as dependency:
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.math.NumberUtils;


import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;


import static org.apache.commons.lang3.StringUtils.substringAfter;
import static org.apache.commons.lang3.StringUtils.substringBetween;


public class ScanLog {
    private static final String fileSufix = "";
    private static final String LOG_FOLDER = "{CassandraPathContainsSystemLog}\\cassandra\\";
    private static final String OUTPUT_FOLDER = "\\OutputFoldlerYouWantGenerateCsv\\";


    public static void main(String[] params) throws IOException, ParseException {
        File folder = new File(LOG_FOLDER);
        Map<String, List<CompactSize>> cf2CompactSizes = toCompactSizes(folder);
        System.out.println("cf2CompactSizes.size:" + cf2CompactSizes.size());
        FileUtils.write(new File(OUTPUT_FOLDER + "output-compact-mb-" + fileSufix + ".csv"), toCompactSizesString(cf2CompactSizes), "UTF-8");


        // Compact avg per minutes
        List<CompactSize> aggCompactSizes = aggregate(toCompactSizeList(folder));
        System.out.println("aggCompactSizes.size:" + aggCompactSizes.size());
        FileUtils.write(new File(OUTPUT_FOLDER + "output-compact-mb-agg-" + fileSufix + ".csv"), toCompactSizesString(aggCompactSizes), "UTF-8");


        // Overall Flush
        List<FlushEvent> events = toEvents(folder);
        System.out.println("events.size:" + events.size());
        FileUtils.write(new File(OUTPUT_FOLDER + "output-queue-" + fileSufix + ".csv"), toString(events), "UTF-8");


        // Flush by CF
        Map<String, List<Enqueue>> cfToE = cfToE(folder);
        System.out.println("cfToE.size:" + cfToE.size());
        Map<String, LinkedHashMap<String, Long>> cfToTimeToAvg = cfToTimeToAvg(cfToE);
        System.out.println("cfToTimeToAvg.size:" + cfToTimeToAvg.size());


        FileUtils.write(new File(OUTPUT_FOLDER + "output-" + fileSufix + ".csv"), toString(cfToE), "UTF-8");
        FileUtils.write(new File(OUTPUT_FOLDER + "output-" + fileSufix + "-avg.csv"), toAvgString(cfToTimeToAvg), "UTF-8");
    }


    private static List<CompactSize> aggregate(List<CompactSize> compactSizes) {
        LinkedHashMap<String, List<CompactSize>> minutes2CompactSizes = new LinkedHashMap<>();
        compactSizes.forEach(compactSize -> {
            String timeMinutes = compactSize.getTimeMinutes();
            if (!minutes2CompactSizes.containsKey(timeMinutes)) {
                minutes2CompactSizes.put(timeMinutes, new ArrayList<>());
            }
            minutes2CompactSizes.get(timeMinutes).add(compactSize);
        });
        LinkedHashMap<String, CompactSize> minutes2AggregatedSize = new LinkedHashMap<>();
        minutes2CompactSizes.entrySet().forEach(entry -> {
            String minutes = entry.getKey();
            entry.getValue().forEach(size -> {
                if (!minutes2AggregatedSize.containsKey(size.getTimeMinutes())) {
                    minutes2AggregatedSize.put(size.getTimeMinutes(), new CompactSize(size.getTime(), size.getCf(), size.getDataSize()));
                } else {
                    CompactSize total = minutes2AggregatedSize.get(size.getTimeMinutes());
                    CompactSize newTotal = new CompactSize(size.getTime(), size.getCf(), size.getDataSize() + total.getDataSize());
                    minutes2AggregatedSize.put(size.getTimeMinutes(), newTotal);
                }
            });
        });
        List<CompactSize> result = new ArrayList<>();
        minutes2AggregatedSize.entrySet().forEach(entry -> {
            result.add(entry.getValue());
        });
        Collections.sort(result, Comparator.comparingLong(CompactSize::getTimeMillis));
        return result;
    }


    private static String toCompactSizesString(List<CompactSize> cf2compactSizes) {
        StringBuilder sb = new StringBuilder();
        cf2compactSizes.forEach(size -> {
            sb.append(size.getCf() + "," + size.getTime() + "," + size.getDataSizeInMB()).append("\n");
        });
        return sb.toString();
    }


    private static String toCompactSizesString(Map<String, List<CompactSize>> cf2compactSizes) {
        StringBuilder sb = new StringBuilder();
        cf2compactSizes.keySet().forEach(cf -> {
            cf2compactSizes.get(cf).forEach(size -> {
                sb.append(size.getCf() + "," + size.getTime() + "," + size.getDataSizeInMB()).append("\n");
            });
        });
        return sb.toString();
    }


    private static List<CompactSize> toCompactSizeList(File folder) {
        List<CompactSize> result = new ArrayList<>();
        Arrays.stream(folder.listFiles())
                .filter(file -> file.getName().contains("system.log"))
                .forEach(file -> {
                    try {
                        for (String line : FileUtils.readLines(file)) {
                            if (CompactSize.isMyPattern(line)) {
                                CompactSize compactSize = new CompactSize(line);
                                result.add(compactSize);
                            }
                        }
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                });
        Collections.sort(result, Comparator.comparingLong(CompactSize::getTimeMillis));
        return result;
    }


    private static Map<String, List<CompactSize>> toCompactSizes(File folder) {
        Map<String, List<CompactSize>> cf2CompactSizes = new HashMap<>();
        Arrays.stream(folder.listFiles())
                .filter(file -> file.getName().contains("system.log"))
                .forEach(file -> {
                    try {
                        for (String line : FileUtils.readLines(file)) {
                            if (CompactSize.isMyPattern(line)) {
                                CompactSize compactSize = new CompactSize(line);
                                if (!cf2CompactSizes.containsKey(compactSize.getCf())) {
                                    cf2CompactSizes.put(compactSize.getCf(), new ArrayList<>());
                                }
                                cf2CompactSizes.get(compactSize.getCf()).add(compactSize);
                            }
                        }
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                });
        cf2CompactSizes.values().forEach(compactSizes ->
                Collections.sort(compactSizes, Comparator.comparingLong(CompactSize::getTimeMillis)));
        return cf2CompactSizes;
    }


    private static class CompactSize {
        private final String time;
        private final String cf;
        private final long dataSize;


        CompactSize(String time, String cf, long dataSize) {
            this.time = time;
            this.cf = cf;
            this.dataSize = dataSize;
        }


        CompactSize(String line) {
            this.time = substringBetween(line, "] ", ",");
            this.cf = substringBetween(line, "/data/ng/db/data/wsg/", "/");
            this.dataSize = NumberUtils.toInt(StringUtils.replace(substringBetween(line, "].  ", " bytes"), ",", ""));
        }


        public static boolean isMyPattern(String line) {
            return StringUtils.contains(line, "CompactionTask.java (line 302) Compacted")
                    && StringUtils.contains(line, "[/data/ng/db/data/wsg/");
        }


        public long getDataSizeInMB() {
            return dataSize // bytes
                    / 1024  // KB
                    / 1024; // MB
        }


        public long getDataSize() {
            return dataSize;
        }


        public String getCf() {
            return cf;
        }


        public String getTimeMinutes() {
            long millis = getTimeMillis();
            return sdfToMinutes.format(new Date(millis));
        }


        public Long getTimeMillis() {
            return toMillis(getTime());
        }


        public String getTime() {
            return time;
        }
    }


    private static String toString(List<FlushEvent> events) {
        StringBuilder sb = new StringBuilder();
        AtomicInteger waitForWrite = new AtomicInteger();
        AtomicInteger writing = new AtomicInteger();
        events.forEach(e -> {
            switch (e.getType()) {
                case Enqueue:
                    waitForWrite.incrementAndGet();
                    break;
                case Write:
                    writing.incrementAndGet();
                    waitForWrite.decrementAndGet();
                    break;
                case Complete:
                    writing.decrementAndGet();
                    break;
                default:
                    throw new RuntimeException("Shouldn't happen");
            }
            sb.append(e.getTime() + "," + waitForWrite + "," + writing).append("\n");
        });
        return sb.toString();
    }


    private static List<FlushEvent> toEvents(File folder) {
        List<FlushEvent> events = new ArrayList<>();
        Arrays.stream(folder.listFiles())
                .filter(file -> file.getName().contains("system.log"))
                .forEach(file -> {
                    try {
                        for (String line : FileUtils.readLines(file)) {
                            Optional<FlushEventType> type = FlushEventType.getByPattern(line);
                            type.ifPresent(t -> {
                                events.add(new FlushEvent(line));
                            });
                        }
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                });
        Collections.sort(events, Comparator.comparingLong(FlushEvent::getTimeMillis));
        return events;
    }


    private static String toAvgString(Map<String, LinkedHashMap<String, Long>> cfToTimeToAvg) {
        StringBuilder sb = new StringBuilder();
        cfToTimeToAvg.keySet().forEach(cf -> {
            cfToTimeToAvg.get(cf).forEach((time, avg) -> {
                sb.append(cf + "," + time + "," + avg).append("\n");
            });
        });
        return sb.toString();
    }


    private static String toString(Map<String, List<Enqueue>> cfToE) {
        StringBuilder sb = new StringBuilder();
        cfToE.values().stream().forEach(eList -> {
            eList.stream().forEach(e -> {
                try {
                    sb.append(e.getCf() + "," + e.getTime() + "," + e.serializedMB).append("\n");
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            });
        });
        return sb.toString();
    }


    private static Map<String, LinkedHashMap<String, Long>> cfToTimeToAvg(Map<String, List<Enqueue>> cfToE) {
        Map<String, LinkedHashMap<String, Long>> cfToTimeToAvg = new HashMap<>();
        cfToE.keySet().forEach(cf -> {
            if (!cfToTimeToAvg.containsKey(cf)) {
                cfToTimeToAvg.put(cf, new LinkedHashMap<>());
            }
            LinkedHashMap<String, List<Long>> timeToMBList = new LinkedHashMap<>();
            cfToE.get(cf).forEach(enqueue -> {
                if (!timeToMBList.containsKey(enqueue.getTimeMinutes())) {
                    timeToMBList.put(enqueue.getTimeMinutes(), new ArrayList<>());
                }
                timeToMBList.get(enqueue.getTimeMinutes()).add(enqueue.serializedMB);
            });
            AtomicReference<String> previousTime = new AtomicReference<>();
            LinkedHashMap<String, Long> timeToAvg = cfToTimeToAvg.get(cf);
            timeToMBList.keySet().forEach(time -> {
                try {
                    if (previousTime.get() != null) {
                        AtomicLong totalMB = new AtomicLong();
                        timeToMBList.get(time).forEach(mb -> {
                            totalMB.addAndGet(mb);
                        });
                        long currentTimeMillis = sdfToMinutes.parse(time).getTime();
                        long previousTimeMillis = sdfToMinutes.parse(previousTime.get()).getTime();
                        long gapMinutes = TimeUnit.MILLISECONDS.toMinutes(currentTimeMillis - previousTimeMillis);
                        timeToAvg.put(time, (totalMB.longValue() / gapMinutes));
                    }
                    previousTime.set(time);
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            });
        });
        return cfToTimeToAvg;
    }


    private static Map<String, List<Enqueue>> cfToE(File folder) {
        Map<String, List<Enqueue>> cfToE = new HashMap<>();
        Arrays.stream(folder.listFiles())
                .filter(f -> f.getName().contains("system.log"))
                .forEach(file -> {
                    try {
                        List<String> lines = IOUtils.readLines(new FileInputStream(file));
                        for (String line : lines) {
                            if (StringUtils.contains(line, "Enqueuing flush of Memtable-")) {
                                Enqueue e = new Enqueue(line);
                                if (e.serializedMB <= 1) {
                                    continue;
                                }
                                if (!cfToE.containsKey(e.getCf())) {
                                    cfToE.put(e.getCf(), new ArrayList<>());
                                }
                                cfToE.get(e.getCf()).add(e);
                            }
                        }
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                });
        cfToE.keySet().forEach(k -> {
            cfToE.get(k).sort((e1, e2) -> {
                try {
                    return (int) (e1.getTimeMillis() - e2.getTimeMillis());
                } catch (Exception ex) {
                    ex.printStackTrace();
                    return 0;
                }
            });
        });
        return cfToE;
    }


    private enum FlushEventType {
        Enqueue("Enqueuing flush of Memtable-"),
        Write("Writing Memtable-"),
        Complete("Completed flushing ");
        private final String pattern;


        FlushEventType(String pattern) {
            this.pattern = pattern;
        }


        public String getPattern() {
            return pattern;
        }


        public static Optional<FlushEventType> getByPattern(String line) {
            return Arrays.stream(values())
                    .filter(type -> StringUtils.contains(line, type.getPattern()))
                    .findFirst();
        }
    }


    private static class FlushEvent {
        private final FlushEventType type;
        private final String time;


        FlushEvent(String line) {
            this.time = substringBetween(line, "] ", ",");
            this.type = FlushEventType.getByPattern(line).get(); // must success
        }


        public FlushEventType getType() {
            return type;
        }


        public Long getTimeMillis() {
            return toMillis(getTime());
        }


        public String getTime() {
            return time;
        }
    }


    private static class Enqueue {
        private final String time;
        private final String cf;
        private final int ops;
        private final long serializedMB;


        Enqueue(String line) throws ParseException {
            this.time = substringBetween(line, "] ", ",");
            this.cf = substringBetween(line, "Enqueuing flush of Memtable-", "@");
            this.ops = NumberUtils.toInt(substringBetween(line, "serialized/live bytes, ", " ops)"));
            this.serializedMB = NumberUtils.toInt(substringBetween(substringAfter(line, "Enqueuing flush of Memtable-"), "(", "/")) / 1024 / 1024;
        }


        @Override
        public String toString() {
            return new ToStringBuilder(this)
                    .append("time", time)
                    .append("cf", cf)
                    .append("ops", ops)
                    .append("serializedMB", serializedMB)
                    .toString();
        }


        public String getTimeMinutes() {
            long millis = getTimeMillis();
            return sdfToMinutes.format(new Date(millis));
        }


        public Long getTimeMillis() {
            return toMillis(getTime());
        }


        public String getTime() {
            return time;
        }


        public String getCf() {
            return cf;
        }


    }


    private static final SimpleDateFormat sdfToMinutes = new SimpleDateFormat("yyyy-MM-dd HH:mm");
    private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:SS");


    private static long toMillis(String time) {
        try {
            Date date = sdf.parse(time);
            return date.getTime();
        } catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }


}


沒有留言:

張貼留言

別名演算法 Alias Method

 題目 每個伺服器支援不同的 TPM (transaction per minute) 當 request 來的時候, 系統需要馬上根據 TPM 的能力隨機找到一個適合的 server. 雖然稱為 "隨機", 但還是需要有 TPM 作為權重. 解法 別名演算法...