We use Cassandra 2.0.17.
Sometimes we suffer IO problem in system, we can use "iotop" to know process IO loading.
But sometimes we need more detail, such as "io-write loading for each table"
Sometimes we suffer IO problem in system, we can use "iotop" to know process IO loading.
But sometimes we need more detail, such as "io-write loading for each table"
Cassandra log is very good for us to know about it.
I write a program to scan it, generate a csv file, and then copy-paste it to Google Sheet for further analyzing, such as generate a graphic.
This program will generate some files (I write this file long time ago, please inform me if file content is wrong):
- output-compact-mb-xxx: Write loading by compaction
- output-compact-mb-agg-xxx: Write loading per minutes by compaction
- output-queue-xxx: Queue size of flush memory event
- output-xxx: Flush memory size by column family
- output-xxx-avg: Flush memory size per minutes by colmnfamily
This program can scan file pattern "system.log" and "system.log.*"
I often check output-xxx file, content is as follows:
column 1 - column family name
column 2 - flush event time stamp
column 3 - flush memory (MB)
mytable,2019-03-24 16:16:28,11
Here is the program, please include commons-lang and commons-io as dependency:
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.math.NumberUtils;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.commons.lang3.StringUtils.substringAfter;
import static org.apache.commons.lang3.StringUtils.substringBetween;
public class ScanLog {
private static final String fileSufix = "";
private static final String LOG_FOLDER = "{CassandraPathContainsSystemLog}\\cassandra\\";
private static final String OUTPUT_FOLDER = "\\OutputFoldlerYouWantGenerateCsv\\";
public static void main(String[] params) throws IOException, ParseException {
File folder = new File(LOG_FOLDER);
Map<String, List<CompactSize>> cf2CompactSizes = toCompactSizes(folder);
System.out.println("cf2CompactSizes.size:" + cf2CompactSizes.size());
FileUtils.write(new File(OUTPUT_FOLDER + "output-compact-mb-" + fileSufix + ".csv"), toCompactSizesString(cf2CompactSizes), "UTF-8");
// Compact avg per minutes
List<CompactSize> aggCompactSizes = aggregate(toCompactSizeList(folder));
System.out.println("aggCompactSizes.size:" + aggCompactSizes.size());
FileUtils.write(new File(OUTPUT_FOLDER + "output-compact-mb-agg-" + fileSufix + ".csv"), toCompactSizesString(aggCompactSizes), "UTF-8");
// Overall Flush
List<FlushEvent> events = toEvents(folder);
System.out.println("events.size:" + events.size());
FileUtils.write(new File(OUTPUT_FOLDER + "output-queue-" + fileSufix + ".csv"), toString(events), "UTF-8");
// Flush by CF
Map<String, List<Enqueue>> cfToE = cfToE(folder);
System.out.println("cfToE.size:" + cfToE.size());
Map<String, LinkedHashMap<String, Long>> cfToTimeToAvg = cfToTimeToAvg(cfToE);
System.out.println("cfToTimeToAvg.size:" + cfToTimeToAvg.size());
FileUtils.write(new File(OUTPUT_FOLDER + "output-" + fileSufix + ".csv"), toString(cfToE), "UTF-8");
FileUtils.write(new File(OUTPUT_FOLDER + "output-" + fileSufix + "-avg.csv"), toAvgString(cfToTimeToAvg), "UTF-8");
}
private static List<CompactSize> aggregate(List<CompactSize> compactSizes) {
LinkedHashMap<String, List<CompactSize>> minutes2CompactSizes = new LinkedHashMap<>();
compactSizes.forEach(compactSize -> {
String timeMinutes = compactSize.getTimeMinutes();
if (!minutes2CompactSizes.containsKey(timeMinutes)) {
minutes2CompactSizes.put(timeMinutes, new ArrayList<>());
}
minutes2CompactSizes.get(timeMinutes).add(compactSize);
});
LinkedHashMap<String, CompactSize> minutes2AggregatedSize = new LinkedHashMap<>();
minutes2CompactSizes.entrySet().forEach(entry -> {
String minutes = entry.getKey();
entry.getValue().forEach(size -> {
if (!minutes2AggregatedSize.containsKey(size.getTimeMinutes())) {
minutes2AggregatedSize.put(size.getTimeMinutes(), new CompactSize(size.getTime(), size.getCf(), size.getDataSize()));
} else {
CompactSize total = minutes2AggregatedSize.get(size.getTimeMinutes());
CompactSize newTotal = new CompactSize(size.getTime(), size.getCf(), size.getDataSize() + total.getDataSize());
minutes2AggregatedSize.put(size.getTimeMinutes(), newTotal);
}
});
});
List<CompactSize> result = new ArrayList<>();
minutes2AggregatedSize.entrySet().forEach(entry -> {
result.add(entry.getValue());
});
Collections.sort(result, Comparator.comparingLong(CompactSize::getTimeMillis));
return result;
}
private static String toCompactSizesString(List<CompactSize> cf2compactSizes) {
StringBuilder sb = new StringBuilder();
cf2compactSizes.forEach(size -> {
sb.append(size.getCf() + "," + size.getTime() + "," + size.getDataSizeInMB()).append("\n");
});
return sb.toString();
}
private static String toCompactSizesString(Map<String, List<CompactSize>> cf2compactSizes) {
StringBuilder sb = new StringBuilder();
cf2compactSizes.keySet().forEach(cf -> {
cf2compactSizes.get(cf).forEach(size -> {
sb.append(size.getCf() + "," + size.getTime() + "," + size.getDataSizeInMB()).append("\n");
});
});
return sb.toString();
}
private static List<CompactSize> toCompactSizeList(File folder) {
List<CompactSize> result = new ArrayList<>();
Arrays.stream(folder.listFiles())
.filter(file -> file.getName().contains("system.log"))
.forEach(file -> {
try {
for (String line : FileUtils.readLines(file)) {
if (CompactSize.isMyPattern(line)) {
CompactSize compactSize = new CompactSize(line);
result.add(compactSize);
}
}
} catch (Exception ex) {
ex.printStackTrace();
}
});
Collections.sort(result, Comparator.comparingLong(CompactSize::getTimeMillis));
return result;
}
private static Map<String, List<CompactSize>> toCompactSizes(File folder) {
Map<String, List<CompactSize>> cf2CompactSizes = new HashMap<>();
Arrays.stream(folder.listFiles())
.filter(file -> file.getName().contains("system.log"))
.forEach(file -> {
try {
for (String line : FileUtils.readLines(file)) {
if (CompactSize.isMyPattern(line)) {
CompactSize compactSize = new CompactSize(line);
if (!cf2CompactSizes.containsKey(compactSize.getCf())) {
cf2CompactSizes.put(compactSize.getCf(), new ArrayList<>());
}
cf2CompactSizes.get(compactSize.getCf()).add(compactSize);
}
}
} catch (Exception ex) {
ex.printStackTrace();
}
});
cf2CompactSizes.values().forEach(compactSizes ->
Collections.sort(compactSizes, Comparator.comparingLong(CompactSize::getTimeMillis)));
return cf2CompactSizes;
}
private static class CompactSize {
private final String time;
private final String cf;
private final long dataSize;
CompactSize(String time, String cf, long dataSize) {
this.time = time;
this.cf = cf;
this.dataSize = dataSize;
}
CompactSize(String line) {
this.time = substringBetween(line, "] ", ",");
this.cf = substringBetween(line, "/data/ng/db/data/wsg/", "/");
this.dataSize = NumberUtils.toInt(StringUtils.replace(substringBetween(line, "]. ", " bytes"), ",", ""));
}
public static boolean isMyPattern(String line) {
return StringUtils.contains(line, "CompactionTask.java (line 302) Compacted")
&& StringUtils.contains(line, "[/data/ng/db/data/wsg/");
}
public long getDataSizeInMB() {
return dataSize // bytes
/ 1024 // KB
/ 1024; // MB
}
public long getDataSize() {
return dataSize;
}
public String getCf() {
return cf;
}
public String getTimeMinutes() {
long millis = getTimeMillis();
return sdfToMinutes.format(new Date(millis));
}
public Long getTimeMillis() {
return toMillis(getTime());
}
public String getTime() {
return time;
}
}
private static String toString(List<FlushEvent> events) {
StringBuilder sb = new StringBuilder();
AtomicInteger waitForWrite = new AtomicInteger();
AtomicInteger writing = new AtomicInteger();
events.forEach(e -> {
switch (e.getType()) {
case Enqueue:
waitForWrite.incrementAndGet();
break;
case Write:
writing.incrementAndGet();
waitForWrite.decrementAndGet();
break;
case Complete:
writing.decrementAndGet();
break;
default:
throw new RuntimeException("Shouldn't happen");
}
sb.append(e.getTime() + "," + waitForWrite + "," + writing).append("\n");
});
return sb.toString();
}
private static List<FlushEvent> toEvents(File folder) {
List<FlushEvent> events = new ArrayList<>();
Arrays.stream(folder.listFiles())
.filter(file -> file.getName().contains("system.log"))
.forEach(file -> {
try {
for (String line : FileUtils.readLines(file)) {
Optional<FlushEventType> type = FlushEventType.getByPattern(line);
type.ifPresent(t -> {
events.add(new FlushEvent(line));
});
}
} catch (Exception ex) {
ex.printStackTrace();
}
});
Collections.sort(events, Comparator.comparingLong(FlushEvent::getTimeMillis));
return events;
}
private static String toAvgString(Map<String, LinkedHashMap<String, Long>> cfToTimeToAvg) {
StringBuilder sb = new StringBuilder();
cfToTimeToAvg.keySet().forEach(cf -> {
cfToTimeToAvg.get(cf).forEach((time, avg) -> {
sb.append(cf + "," + time + "," + avg).append("\n");
});
});
return sb.toString();
}
private static String toString(Map<String, List<Enqueue>> cfToE) {
StringBuilder sb = new StringBuilder();
cfToE.values().stream().forEach(eList -> {
eList.stream().forEach(e -> {
try {
sb.append(e.getCf() + "," + e.getTime() + "," + e.serializedMB).append("\n");
} catch (Exception ex) {
ex.printStackTrace();
}
});
});
return sb.toString();
}
private static Map<String, LinkedHashMap<String, Long>> cfToTimeToAvg(Map<String, List<Enqueue>> cfToE) {
Map<String, LinkedHashMap<String, Long>> cfToTimeToAvg = new HashMap<>();
cfToE.keySet().forEach(cf -> {
if (!cfToTimeToAvg.containsKey(cf)) {
cfToTimeToAvg.put(cf, new LinkedHashMap<>());
}
LinkedHashMap<String, List<Long>> timeToMBList = new LinkedHashMap<>();
cfToE.get(cf).forEach(enqueue -> {
if (!timeToMBList.containsKey(enqueue.getTimeMinutes())) {
timeToMBList.put(enqueue.getTimeMinutes(), new ArrayList<>());
}
timeToMBList.get(enqueue.getTimeMinutes()).add(enqueue.serializedMB);
});
AtomicReference<String> previousTime = new AtomicReference<>();
LinkedHashMap<String, Long> timeToAvg = cfToTimeToAvg.get(cf);
timeToMBList.keySet().forEach(time -> {
try {
if (previousTime.get() != null) {
AtomicLong totalMB = new AtomicLong();
timeToMBList.get(time).forEach(mb -> {
totalMB.addAndGet(mb);
});
long currentTimeMillis = sdfToMinutes.parse(time).getTime();
long previousTimeMillis = sdfToMinutes.parse(previousTime.get()).getTime();
long gapMinutes = TimeUnit.MILLISECONDS.toMinutes(currentTimeMillis - previousTimeMillis);
timeToAvg.put(time, (totalMB.longValue() / gapMinutes));
}
previousTime.set(time);
} catch (Exception ex) {
ex.printStackTrace();
}
});
});
return cfToTimeToAvg;
}
private static Map<String, List<Enqueue>> cfToE(File folder) {
Map<String, List<Enqueue>> cfToE = new HashMap<>();
Arrays.stream(folder.listFiles())
.filter(f -> f.getName().contains("system.log"))
.forEach(file -> {
try {
List<String> lines = IOUtils.readLines(new FileInputStream(file));
for (String line : lines) {
if (StringUtils.contains(line, "Enqueuing flush of Memtable-")) {
Enqueue e = new Enqueue(line);
if (e.serializedMB <= 1) {
continue;
}
if (!cfToE.containsKey(e.getCf())) {
cfToE.put(e.getCf(), new ArrayList<>());
}
cfToE.get(e.getCf()).add(e);
}
}
} catch (Exception ex) {
ex.printStackTrace();
}
});
cfToE.keySet().forEach(k -> {
cfToE.get(k).sort((e1, e2) -> {
try {
return (int) (e1.getTimeMillis() - e2.getTimeMillis());
} catch (Exception ex) {
ex.printStackTrace();
return 0;
}
});
});
return cfToE;
}
private enum FlushEventType {
Enqueue("Enqueuing flush of Memtable-"),
Write("Writing Memtable-"),
Complete("Completed flushing ");
private final String pattern;
FlushEventType(String pattern) {
this.pattern = pattern;
}
public String getPattern() {
return pattern;
}
public static Optional<FlushEventType> getByPattern(String line) {
return Arrays.stream(values())
.filter(type -> StringUtils.contains(line, type.getPattern()))
.findFirst();
}
}
private static class FlushEvent {
private final FlushEventType type;
private final String time;
FlushEvent(String line) {
this.time = substringBetween(line, "] ", ",");
this.type = FlushEventType.getByPattern(line).get(); // must success
}
public FlushEventType getType() {
return type;
}
public Long getTimeMillis() {
return toMillis(getTime());
}
public String getTime() {
return time;
}
}
private static class Enqueue {
private final String time;
private final String cf;
private final int ops;
private final long serializedMB;
Enqueue(String line) throws ParseException {
this.time = substringBetween(line, "] ", ",");
this.cf = substringBetween(line, "Enqueuing flush of Memtable-", "@");
this.ops = NumberUtils.toInt(substringBetween(line, "serialized/live bytes, ", " ops)"));
this.serializedMB = NumberUtils.toInt(substringBetween(substringAfter(line, "Enqueuing flush of Memtable-"), "(", "/")) / 1024 / 1024;
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("time", time)
.append("cf", cf)
.append("ops", ops)
.append("serializedMB", serializedMB)
.toString();
}
public String getTimeMinutes() {
long millis = getTimeMillis();
return sdfToMinutes.format(new Date(millis));
}
public Long getTimeMillis() {
return toMillis(getTime());
}
public String getTime() {
return time;
}
public String getCf() {
return cf;
}
}
private static final SimpleDateFormat sdfToMinutes = new SimpleDateFormat("yyyy-MM-dd HH:mm");
private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:SS");
private static long toMillis(String time) {
try {
Date date = sdf.parse(time);
return date.getTime();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}
沒有留言:
張貼留言