We use Cassandra 2.0.17.
Sometimes we suffer IO problem in system, we can use "iotop" to know process IO loading.
But sometimes we need more detail, such as "io-write loading for each table"
Cassandra log is very good for us to know about it.
I write a program to scan it, generate a csv file, and then copy-paste it to Google Sheet for further analyzing, such as generate a graphic.

This program will generate some files (I write this file long time ago, please inform me if file content is wrong):
  • output-compact-mb-xxx: Write loading by compaction
  • output-compact-mb-agg-xxx: Write loading per minutes by compaction
  • output-queue-xxx: Queue size of flush memory event
  • output-xxx: Flush memory size by column family
  • output-xxx-avg: Flush memory size per minutes by colmnfamily

This program can scan file pattern "system.log" and "system.log.*"

I often check output-xxx file, content is as follows:
column 1 - column family name
column 2 - flush event time stamp
column 3 - flush memory (MB)
mytable,2019-03-24 16:16:28,11


Here is the program, please include commons-lang and commons-io as dependency:
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.math.NumberUtils;


import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;


import static org.apache.commons.lang3.StringUtils.substringAfter;
import static org.apache.commons.lang3.StringUtils.substringBetween;


public class ScanLog {
    private static final String fileSufix = "";
    private static final String LOG_FOLDER = "{CassandraPathContainsSystemLog}\\cassandra\\";
    private static final String OUTPUT_FOLDER = "\\OutputFoldlerYouWantGenerateCsv\\";


    public static void main(String[] params) throws IOException, ParseException {
        File folder = new File(LOG_FOLDER);
        Map<String, List<CompactSize>> cf2CompactSizes = toCompactSizes(folder);
        System.out.println("cf2CompactSizes.size:" + cf2CompactSizes.size());
        FileUtils.write(new File(OUTPUT_FOLDER + "output-compact-mb-" + fileSufix + ".csv"), toCompactSizesString(cf2CompactSizes), "UTF-8");


        // Compact avg per minutes
        List<CompactSize> aggCompactSizes = aggregate(toCompactSizeList(folder));
        System.out.println("aggCompactSizes.size:" + aggCompactSizes.size());
        FileUtils.write(new File(OUTPUT_FOLDER + "output-compact-mb-agg-" + fileSufix + ".csv"), toCompactSizesString(aggCompactSizes), "UTF-8");


        // Overall Flush
        List<FlushEvent> events = toEvents(folder);
        System.out.println("events.size:" + events.size());
        FileUtils.write(new File(OUTPUT_FOLDER + "output-queue-" + fileSufix + ".csv"), toString(events), "UTF-8");


        // Flush by CF
        Map<String, List<Enqueue>> cfToE = cfToE(folder);
        System.out.println("cfToE.size:" + cfToE.size());
        Map<String, LinkedHashMap<String, Long>> cfToTimeToAvg = cfToTimeToAvg(cfToE);
        System.out.println("cfToTimeToAvg.size:" + cfToTimeToAvg.size());


        FileUtils.write(new File(OUTPUT_FOLDER + "output-" + fileSufix + ".csv"), toString(cfToE), "UTF-8");
        FileUtils.write(new File(OUTPUT_FOLDER + "output-" + fileSufix + "-avg.csv"), toAvgString(cfToTimeToAvg), "UTF-8");
    }


    private static List<CompactSize> aggregate(List<CompactSize> compactSizes) {
        LinkedHashMap<String, List<CompactSize>> minutes2CompactSizes = new LinkedHashMap<>();
        compactSizes.forEach(compactSize -> {
            String timeMinutes = compactSize.getTimeMinutes();
            if (!minutes2CompactSizes.containsKey(timeMinutes)) {
                minutes2CompactSizes.put(timeMinutes, new ArrayList<>());
            }
            minutes2CompactSizes.get(timeMinutes).add(compactSize);
        });
        LinkedHashMap<String, CompactSize> minutes2AggregatedSize = new LinkedHashMap<>();
        minutes2CompactSizes.entrySet().forEach(entry -> {
            String minutes = entry.getKey();
            entry.getValue().forEach(size -> {
                if (!minutes2AggregatedSize.containsKey(size.getTimeMinutes())) {
                    minutes2AggregatedSize.put(size.getTimeMinutes(), new CompactSize(size.getTime(), size.getCf(), size.getDataSize()));
                } else {
                    CompactSize total = minutes2AggregatedSize.get(size.getTimeMinutes());
                    CompactSize newTotal = new CompactSize(size.getTime(), size.getCf(), size.getDataSize() + total.getDataSize());
                    minutes2AggregatedSize.put(size.getTimeMinutes(), newTotal);
                }
            });
        });
        List<CompactSize> result = new ArrayList<>();
        minutes2AggregatedSize.entrySet().forEach(entry -> {
            result.add(entry.getValue());
        });
        Collections.sort(result, Comparator.comparingLong(CompactSize::getTimeMillis));
        return result;
    }


    private static String toCompactSizesString(List<CompactSize> cf2compactSizes) {
        StringBuilder sb = new StringBuilder();
        cf2compactSizes.forEach(size -> {
            sb.append(size.getCf() + "," + size.getTime() + "," + size.getDataSizeInMB()).append("\n");
        });
        return sb.toString();
    }


    private static String toCompactSizesString(Map<String, List<CompactSize>> cf2compactSizes) {
        StringBuilder sb = new StringBuilder();
        cf2compactSizes.keySet().forEach(cf -> {
            cf2compactSizes.get(cf).forEach(size -> {
                sb.append(size.getCf() + "," + size.getTime() + "," + size.getDataSizeInMB()).append("\n");
            });
        });
        return sb.toString();
    }


    private static List<CompactSize> toCompactSizeList(File folder) {
        List<CompactSize> result = new ArrayList<>();
        Arrays.stream(folder.listFiles())
                .filter(file -> file.getName().contains("system.log"))
                .forEach(file -> {
                    try {
                        for (String line : FileUtils.readLines(file)) {
                            if (CompactSize.isMyPattern(line)) {
                                CompactSize compactSize = new CompactSize(line);
                                result.add(compactSize);
                            }
                        }
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                });
        Collections.sort(result, Comparator.comparingLong(CompactSize::getTimeMillis));
        return result;
    }


    private static Map<String, List<CompactSize>> toCompactSizes(File folder) {
        Map<String, List<CompactSize>> cf2CompactSizes = new HashMap<>();
        Arrays.stream(folder.listFiles())
                .filter(file -> file.getName().contains("system.log"))
                .forEach(file -> {
                    try {
                        for (String line : FileUtils.readLines(file)) {
                            if (CompactSize.isMyPattern(line)) {
                                CompactSize compactSize = new CompactSize(line);
                                if (!cf2CompactSizes.containsKey(compactSize.getCf())) {
                                    cf2CompactSizes.put(compactSize.getCf(), new ArrayList<>());
                                }
                                cf2CompactSizes.get(compactSize.getCf()).add(compactSize);
                            }
                        }
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                });
        cf2CompactSizes.values().forEach(compactSizes ->
                Collections.sort(compactSizes, Comparator.comparingLong(CompactSize::getTimeMillis)));
        return cf2CompactSizes;
    }


    private static class CompactSize {
        private final String time;
        private final String cf;
        private final long dataSize;


        CompactSize(String time, String cf, long dataSize) {
            this.time = time;
            this.cf = cf;
            this.dataSize = dataSize;
        }


        CompactSize(String line) {
            this.time = substringBetween(line, "] ", ",");
            this.cf = substringBetween(line, "/data/ng/db/data/wsg/", "/");
            this.dataSize = NumberUtils.toInt(StringUtils.replace(substringBetween(line, "].  ", " bytes"), ",", ""));
        }


        public static boolean isMyPattern(String line) {
            return StringUtils.contains(line, "CompactionTask.java (line 302) Compacted")
                    && StringUtils.contains(line, "[/data/ng/db/data/wsg/");
        }


        public long getDataSizeInMB() {
            return dataSize // bytes
                    / 1024  // KB
                    / 1024; // MB
        }


        public long getDataSize() {
            return dataSize;
        }


        public String getCf() {
            return cf;
        }


        public String getTimeMinutes() {
            long millis = getTimeMillis();
            return sdfToMinutes.format(new Date(millis));
        }


        public Long getTimeMillis() {
            return toMillis(getTime());
        }


        public String getTime() {
            return time;
        }
    }


    private static String toString(List<FlushEvent> events) {
        StringBuilder sb = new StringBuilder();
        AtomicInteger waitForWrite = new AtomicInteger();
        AtomicInteger writing = new AtomicInteger();
        events.forEach(e -> {
            switch (e.getType()) {
                case Enqueue:
                    waitForWrite.incrementAndGet();
                    break;
                case Write:
                    writing.incrementAndGet();
                    waitForWrite.decrementAndGet();
                    break;
                case Complete:
                    writing.decrementAndGet();
                    break;
                default:
                    throw new RuntimeException("Shouldn't happen");
            }
            sb.append(e.getTime() + "," + waitForWrite + "," + writing).append("\n");
        });
        return sb.toString();
    }


    private static List<FlushEvent> toEvents(File folder) {
        List<FlushEvent> events = new ArrayList<>();
        Arrays.stream(folder.listFiles())
                .filter(file -> file.getName().contains("system.log"))
                .forEach(file -> {
                    try {
                        for (String line : FileUtils.readLines(file)) {
                            Optional<FlushEventType> type = FlushEventType.getByPattern(line);
                            type.ifPresent(t -> {
                                events.add(new FlushEvent(line));
                            });
                        }
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                });
        Collections.sort(events, Comparator.comparingLong(FlushEvent::getTimeMillis));
        return events;
    }


    private static String toAvgString(Map<String, LinkedHashMap<String, Long>> cfToTimeToAvg) {
        StringBuilder sb = new StringBuilder();
        cfToTimeToAvg.keySet().forEach(cf -> {
            cfToTimeToAvg.get(cf).forEach((time, avg) -> {
                sb.append(cf + "," + time + "," + avg).append("\n");
            });
        });
        return sb.toString();
    }


    private static String toString(Map<String, List<Enqueue>> cfToE) {
        StringBuilder sb = new StringBuilder();
        cfToE.values().stream().forEach(eList -> {
            eList.stream().forEach(e -> {
                try {
                    sb.append(e.getCf() + "," + e.getTime() + "," + e.serializedMB).append("\n");
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            });
        });
        return sb.toString();
    }


    private static Map<String, LinkedHashMap<String, Long>> cfToTimeToAvg(Map<String, List<Enqueue>> cfToE) {
        Map<String, LinkedHashMap<String, Long>> cfToTimeToAvg = new HashMap<>();
        cfToE.keySet().forEach(cf -> {
            if (!cfToTimeToAvg.containsKey(cf)) {
                cfToTimeToAvg.put(cf, new LinkedHashMap<>());
            }
            LinkedHashMap<String, List<Long>> timeToMBList = new LinkedHashMap<>();
            cfToE.get(cf).forEach(enqueue -> {
                if (!timeToMBList.containsKey(enqueue.getTimeMinutes())) {
                    timeToMBList.put(enqueue.getTimeMinutes(), new ArrayList<>());
                }
                timeToMBList.get(enqueue.getTimeMinutes()).add(enqueue.serializedMB);
            });
            AtomicReference<String> previousTime = new AtomicReference<>();
            LinkedHashMap<String, Long> timeToAvg = cfToTimeToAvg.get(cf);
            timeToMBList.keySet().forEach(time -> {
                try {
                    if (previousTime.get() != null) {
                        AtomicLong totalMB = new AtomicLong();
                        timeToMBList.get(time).forEach(mb -> {
                            totalMB.addAndGet(mb);
                        });
                        long currentTimeMillis = sdfToMinutes.parse(time).getTime();
                        long previousTimeMillis = sdfToMinutes.parse(previousTime.get()).getTime();
                        long gapMinutes = TimeUnit.MILLISECONDS.toMinutes(currentTimeMillis - previousTimeMillis);
                        timeToAvg.put(time, (totalMB.longValue() / gapMinutes));
                    }
                    previousTime.set(time);
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            });
        });
        return cfToTimeToAvg;
    }


    private static Map<String, List<Enqueue>> cfToE(File folder) {
        Map<String, List<Enqueue>> cfToE = new HashMap<>();
        Arrays.stream(folder.listFiles())
                .filter(f -> f.getName().contains("system.log"))
                .forEach(file -> {
                    try {
                        List<String> lines = IOUtils.readLines(new FileInputStream(file));
                        for (String line : lines) {
                            if (StringUtils.contains(line, "Enqueuing flush of Memtable-")) {
                                Enqueue e = new Enqueue(line);
                                if (e.serializedMB <= 1) {
                                    continue;
                                }
                                if (!cfToE.containsKey(e.getCf())) {
                                    cfToE.put(e.getCf(), new ArrayList<>());
                                }
                                cfToE.get(e.getCf()).add(e);
                            }
                        }
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                });
        cfToE.keySet().forEach(k -> {
            cfToE.get(k).sort((e1, e2) -> {
                try {
                    return (int) (e1.getTimeMillis() - e2.getTimeMillis());
                } catch (Exception ex) {
                    ex.printStackTrace();
                    return 0;
                }
            });
        });
        return cfToE;
    }


    private enum FlushEventType {
        Enqueue("Enqueuing flush of Memtable-"),
        Write("Writing Memtable-"),
        Complete("Completed flushing ");
        private final String pattern;


        FlushEventType(String pattern) {
            this.pattern = pattern;
        }


        public String getPattern() {
            return pattern;
        }


        public static Optional<FlushEventType> getByPattern(String line) {
            return Arrays.stream(values())
                    .filter(type -> StringUtils.contains(line, type.getPattern()))
                    .findFirst();
        }
    }


    private static class FlushEvent {
        private final FlushEventType type;
        private final String time;


        FlushEvent(String line) {
            this.time = substringBetween(line, "] ", ",");
            this.type = FlushEventType.getByPattern(line).get(); // must success
        }


        public FlushEventType getType() {
            return type;
        }


        public Long getTimeMillis() {
            return toMillis(getTime());
        }


        public String getTime() {
            return time;
        }
    }


    private static class Enqueue {
        private final String time;
        private final String cf;
        private final int ops;
        private final long serializedMB;


        Enqueue(String line) throws ParseException {
            this.time = substringBetween(line, "] ", ",");
            this.cf = substringBetween(line, "Enqueuing flush of Memtable-", "@");
            this.ops = NumberUtils.toInt(substringBetween(line, "serialized/live bytes, ", " ops)"));
            this.serializedMB = NumberUtils.toInt(substringBetween(substringAfter(line, "Enqueuing flush of Memtable-"), "(", "/")) / 1024 / 1024;
        }


        @Override
        public String toString() {
            return new ToStringBuilder(this)
                    .append("time", time)
                    .append("cf", cf)
                    .append("ops", ops)
                    .append("serializedMB", serializedMB)
                    .toString();
        }


        public String getTimeMinutes() {
            long millis = getTimeMillis();
            return sdfToMinutes.format(new Date(millis));
        }


        public Long getTimeMillis() {
            return toMillis(getTime());
        }


        public String getTime() {
            return time;
        }


        public String getCf() {
            return cf;
        }


    }


    private static final SimpleDateFormat sdfToMinutes = new SimpleDateFormat("yyyy-MM-dd HH:mm");
    private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:SS");


    private static long toMillis(String time) {
        try {
            Date date = sdf.parse(time);
            return date.getTime();
        } catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }


}