Introduction
After a performance issue happen, we have less information about system status at that time.
There is flush log in Cassandra, so I write code to parse it and write to output file.
Thus I can paste the parse result to google sheet or excel to draw charts such as:
There is flush log in Cassandra, so I write code to parse it and write to output file.
Thus I can paste the parse result to google sheet or excel to draw charts such as:
Code
public class ScanLog { private static final String fileSufix = "10"; private static final String LOG_FOLDER = "/cassandraLogPath"; private static final String OUTPUT_FOLDER = "/outputpath"; public static void main(String[] params) throws IOException, ParseException { File folder = new File(LOG_FOLDER); Map<String,List<CompactSize>> cf2CompactSizes = toCompactSizes(folder); FileUtils.write(new File(OUTPUT_FOLDER + "output-compact-mb-" + fileSufix + ".csv"), toCompactSizesString(cf2CompactSizes), "UTF-8"); List<CompactSize> aggCompactSizes = aggregate(toCompactSizeList(folder)); FileUtils.write(new File(OUTPUT_FOLDER + "output-compact-mb-agg-" + fileSufix + ".csv"), toCompactSizesString(aggCompactSizes), "UTF-8"); List<FlushEvent> events = toEvents(folder); FileUtils.write(new File(OUTPUT_FOLDER + "output-queue-" + fileSufix + ".csv"), toString(events), "UTF-8"); Map<String, List<Enqueue>> cfToE = cfToE(folder); Map<String, LinkedHashMap<String,Long>> cfToTimeToAvg = cfToTimeToAvg(cfToE); FileUtils.write(new File(OUTPUT_FOLDER + "output-" + fileSufix + ".csv"), toString(cfToE), "UTF-8"); FileUtils.write(new File(OUTPUT_FOLDER + "output-" + fileSufix + "-avg.csv"), toAvgString(cfToTimeToAvg), "UTF-8"); } private static List<CompactSize> aggregate(List<CompactSize> compactSizes) { LinkedHashMap<String,List<CompactSize>> minutes2CompactSizes = new LinkedHashMap<>(); compactSizes.forEach(compactSize -> { String timeMinutes = compactSize.getTimeMinutes(); if (!minutes2CompactSizes.containsKey(timeMinutes)) { minutes2CompactSizes.put(timeMinutes, new ArrayList<>()); } minutes2CompactSizes.get(timeMinutes).add(compactSize); }); LinkedHashMap<String,CompactSize> minutes2AggregatedSize = new LinkedHashMap<>(); minutes2CompactSizes.entrySet().forEach(entry -> { String minutes = entry.getKey(); entry.getValue().forEach(size -> { if (!minutes2AggregatedSize.containsKey(size.getTimeMinutes())) { minutes2AggregatedSize.put(size.getTimeMinutes(),new CompactSize(size.getTime(),size.getCf(),size.getDataSize())); } else { CompactSize total = minutes2AggregatedSize.get(size.getTimeMinutes()); CompactSize newTotal = new CompactSize(size.getTime(),size.getCf(), size.getDataSize() + total.getDataSize()); minutes2AggregatedSize.put(size.getTimeMinutes(), newTotal); } }); }); List<CompactSize> result = new ArrayList<>(); minutes2AggregatedSize.entrySet().forEach(entry -> { result.add(entry.getValue()); }); Collections.sort(result, Comparator.comparingLong(CompactSize::getTimeMillis)); return result; } private static String toCompactSizesString(List<CompactSize> cf2compactSizes) { StringBuilder sb = new StringBuilder(); cf2compactSizes.forEach(size -> { sb.append(size.getCf() + "," + size.getTime() + "," + size.getDataSizeInMB()).append("\n"); }); return sb.toString(); } private static String toCompactSizesString(Map<String,List<CompactSize>> cf2compactSizes) { StringBuilder sb = new StringBuilder(); cf2compactSizes.keySet().forEach(cf -> { cf2compactSizes.get(cf).forEach(size -> { sb.append(size.getCf() + "," + size.getTime() + "," + size.getDataSizeInMB()).append("\n"); }); }); return sb.toString(); } private static List<CompactSize> toCompactSizeList(File folder) { List<CompactSize> result = new ArrayList<>(); Arrays.stream(folder.listFiles()) .filter(file -> file.getName().contains("system.log")) .forEach(file -> { try { for (String line: FileUtils.readLines(file)) { if ( CompactSize.isMyPattern(line) ) { CompactSize compactSize = new CompactSize(line); result.add(compactSize); } } } catch (Exception ex) { ex.printStackTrace(); } }); Collections.sort(result, Comparator.comparingLong(CompactSize::getTimeMillis)); return result; } private static Map<String,List<CompactSize>> toCompactSizes(File folder) { Map<String,List<CompactSize>> cf2CompactSizes = new HashMap<>(); Arrays.stream(folder.listFiles()) .filter(file -> file.getName().contains("system.log")) .forEach(file -> { try { for (String line: FileUtils.readLines(file)) { if ( CompactSize.isMyPattern(line) ) { CompactSize compactSize = new CompactSize(line); if (!cf2CompactSizes.containsKey(compactSize.getCf())) { cf2CompactSizes.put(compactSize.getCf(), new ArrayList<>()); } cf2CompactSizes.get(compactSize.getCf()).add(compactSize); } } } catch (Exception ex) { ex.printStackTrace(); } }); cf2CompactSizes.values().forEach(compactSizes -> Collections.sort(compactSizes, Comparator.comparingLong(CompactSize::getTimeMillis))); return cf2CompactSizes; } private static class CompactSize { private final String time; private final String cf; private final long dataSize; CompactSize(String time, String cf, long dataSize) { this.time = time; this.cf = cf; this.dataSize = dataSize; } CompactSize(String line) { this.time = substringBetween(line, "] ",","); this.cf = substringBetween(line, "/data/ng/db/data/wsg/", "/"); this.dataSize = NumberUtils.toInt(StringUtils.replace(substringBetween(line, "]. ", " bytes"),",","")); } public static boolean isMyPattern(String line) { return StringUtils.contains(line, "CompactionTask.java (line 302) Compacted") && StringUtils.contains(line, "[/data/ng/db/data/wsg/"); } public long getDataSizeInMB() { return dataSize // bytes / 1024 // KB / 1024; // MB } public long getDataSize() { return dataSize; } public String getCf() { return cf; } public String getTimeMinutes() { long millis = getTimeMillis(); return sdfToMinutes.format(new Date(millis)); } public Long getTimeMillis() { return toMillis(getTime()); } public String getTime() { return time; } } private static String toString(List<FlushEvent> events) { StringBuilder sb = new StringBuilder(); AtomicInteger waitForWrite = new AtomicInteger(); AtomicInteger writing = new AtomicInteger(); events.forEach(e -> { switch ( e.getType() ) { case Enqueue: waitForWrite.incrementAndGet(); break; case Write: writing.incrementAndGet(); waitForWrite.decrementAndGet(); break; case Complete: writing.decrementAndGet(); break; default: throw new RuntimeException("Shouldn't happen"); } sb.append(e.getTime() + "," + waitForWrite + "," + writing).append("\n"); }); return sb.toString(); } private static List<FlushEvent> toEvents(File folder) { List<FlushEvent> events = new ArrayList<>(); Arrays.stream(folder.listFiles()) .filter(file -> file.getName().contains("system.log")) .forEach(file -> { try { for (String line: FileUtils.readLines(file)) { Optional<FlushEventType> type = FlushEventType.getByPattern(line); type.ifPresent(t -> { events.add(new FlushEvent(line)); }); } } catch (Exception ex) { ex.printStackTrace(); } }); Collections.sort(events, Comparator.comparingLong(FlushEvent::getTimeMillis)); return events; } private static String toAvgString(Map<String, LinkedHashMap<String,Long>> cfToTimeToAvg) { StringBuilder sb = new StringBuilder(); cfToTimeToAvg.keySet().forEach(cf -> { cfToTimeToAvg.get(cf).forEach((time,avg) -> { sb.append(cf + "," + time + "," + avg).append("\n"); }); }); return sb.toString(); } private static String toString(Map<String, List<Enqueue>> cfToE) { StringBuilder sb = new StringBuilder(); cfToE.values().stream().forEach(eList -> { eList.stream().forEach(e -> { try { sb.append(e.getCf() + "," + e.getTime() + "," + e.serializedMB).append("\n"); } catch (Exception ex) { ex.printStackTrace(); } }); }); return sb.toString(); } private static Map<String, LinkedHashMap<String,Long>> cfToTimeToAvg(Map<String, List<Enqueue>> cfToE) { Map<String, LinkedHashMap<String,Long>> cfToTimeToAvg = new HashMap<>(); cfToE.keySet().forEach(cf -> { if (!cfToTimeToAvg.containsKey(cf)) { cfToTimeToAvg.put(cf, new LinkedHashMap<>()); } LinkedHashMap<String,List<Long>> timeToMBList = new LinkedHashMap<>(); cfToE.get(cf).forEach(enqueue -> { if (!timeToMBList.containsKey(enqueue.getTimeMinutes())) { timeToMBList.put(enqueue.getTimeMinutes(), new ArrayList<>()); } timeToMBList.get(enqueue.getTimeMinutes()).add(enqueue.serializedMB); }); AtomicReference<String> previousTime = new AtomicReference<>(); LinkedHashMap<String,Long> timeToAvg = cfToTimeToAvg.get(cf); timeToMBList.keySet().forEach(time -> { try { if (previousTime.get() != null) { AtomicLong totalMB = new AtomicLong(); timeToMBList.get(time).forEach(mb -> { totalMB.addAndGet(mb); }); long currentTimeMillis = sdfToMinutes.parse(time).getTime(); long previousTimeMillis = sdfToMinutes.parse(previousTime.get()).getTime(); long gapMinutes = TimeUnit.MILLISECONDS.toMinutes(currentTimeMillis - previousTimeMillis); timeToAvg.put(time, (totalMB.longValue() / gapMinutes)); } previousTime.set(time); } catch (Exception ex) { ex.printStackTrace(); } }); }); return cfToTimeToAvg; } private static Map<String, List<Enqueue>> cfToE(File folder) { Map<String,List<Enqueue>> cfToE = new HashMap<>(); Arrays.stream(folder.listFiles()) .filter(f -> f.getName().contains("system.log")) .forEach(file -> { try { List<String> lines = IOUtils.readLines(new FileInputStream(file)); for (String line: lines) { if (StringUtils.contains(line, "Enqueuing flush of Memtable-")) { Enqueue e = new Enqueue(line); if (e.serializedMB <= 1) { continue; } if (!cfToE.containsKey(e.getCf())) { cfToE.put(e.getCf(), new ArrayList<>()); } cfToE.get(e.getCf()).add(e); } } } catch (Exception ex) { ex.printStackTrace(); } }); cfToE.keySet().forEach(k -> { cfToE.get(k).sort((e1,e2) -> { try { return (int)(e1.getTimeMillis()-e2.getTimeMillis()); } catch (Exception ex) { ex.printStackTrace(); return 0; } }); }); return cfToE; } private enum FlushEventType { Enqueue("Enqueuing flush of Memtable-"), Write("Writing Memtable-"), Complete("Completed flushing "); private final String pattern; FlushEventType(String pattern) { this.pattern = pattern; } public String getPattern() { return pattern; } public static Optional<FlushEventType> getByPattern(String line) { return Arrays.stream(values()) .filter(type -> StringUtils.contains(line, type.getPattern())) .findFirst(); } } private static class FlushEvent { private final FlushEventType type; private final String time; FlushEvent(String line) { this.time = substringBetween(line, "] ",","); this.type = FlushEventType.getByPattern(line).get(); // must success } public FlushEventType getType() { return type; } public Long getTimeMillis() { return toMillis(getTime()); } public String getTime() { return time; } } private static class Enqueue { private final String time; private final String cf; private final int ops; private final long serializedMB; Enqueue(String line) throws ParseException { this.time = substringBetween(line, "] ",","); this.cf = substringBetween(line, "Enqueuing flush of Memtable-", "@"); this.ops = NumberUtils.toInt(substringBetween(line, "serialized/live bytes, ", " ops)")); this.serializedMB = NumberUtils.toInt(substringBetween(substringAfter(line, "Enqueuing flush of Memtable-"), "(", "/")) / 1024 / 1024; } @Override public String toString() { return new ToStringBuilder(this) .append("time",time) .append("cf",cf) .append("ops",ops) .append("serializedMB", serializedMB) .toString(); } public String getTimeMinutes() { long millis = getTimeMillis(); return sdfToMinutes.format(new Date(millis)); } public Long getTimeMillis() { return toMillis(getTime()); } public String getTime() { return time; } public String getCf() { return cf; } } private static final SimpleDateFormat sdfToMinutes = new SimpleDateFormat("yyyy-MM-dd HH:mm"); private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:SS"); private static long toMillis(String time) { try { Date date = sdf.parse(time); return date.getTime(); } catch (Exception ex) { throw new RuntimeException(ex); } } }