Scan log and paste to Google sheet to know the Flush pressure

Introduction

After a performance issue happen, we have less information about system status at that time.
There is flush log in Cassandra, so I write code to parse it and write to output file.
Thus I can paste the parse result to google sheet or excel to draw charts such as:

Code

public  class  ScanLog  {

  private  static  final  String  fileSufix  =  "10";
  private  static  final  String  LOG_FOLDER  =  "/cassandraLogPath";
  private  static  final  String  OUTPUT_FOLDER  =  "/outputpath";

  public  static  void  main(String[]  params)  throws  IOException,  ParseException  {
    File  folder  =  new  File(LOG_FOLDER);
    Map<String,List<CompactSize>>  cf2CompactSizes  =  toCompactSizes(folder);
    FileUtils.write(new  File(OUTPUT_FOLDER  +  "output-compact-mb-"  +  fileSufix  +  ".csv"),  toCompactSizesString(cf2CompactSizes),  "UTF-8");
    List<CompactSize>  aggCompactSizes  =  aggregate(toCompactSizeList(folder));
    FileUtils.write(new  File(OUTPUT_FOLDER  +  "output-compact-mb-agg-"  +  fileSufix  +  ".csv"),  toCompactSizesString(aggCompactSizes),  "UTF-8");
    List<FlushEvent>  events  =  toEvents(folder);
    FileUtils.write(new  File(OUTPUT_FOLDER  +  "output-queue-"  +  fileSufix  +  ".csv"),  toString(events),  "UTF-8");
    Map<String,  List<Enqueue>>  cfToE  =  cfToE(folder);
    Map<String,  LinkedHashMap<String,Long>>  cfToTimeToAvg  =  cfToTimeToAvg(cfToE);
    FileUtils.write(new  File(OUTPUT_FOLDER  +  "output-"  +  fileSufix  +  ".csv"),  toString(cfToE),  "UTF-8");
    FileUtils.write(new  File(OUTPUT_FOLDER  +  "output-"  +  fileSufix  +  "-avg.csv"),  toAvgString(cfToTimeToAvg),  "UTF-8");
  }

  private  static  List<CompactSize>  aggregate(List<CompactSize>  compactSizes)  {
    LinkedHashMap<String,List<CompactSize>>  minutes2CompactSizes  =  new  LinkedHashMap<>();
    compactSizes.forEach(compactSize  ->  {
      String  timeMinutes  =  compactSize.getTimeMinutes();
      if  (!minutes2CompactSizes.containsKey(timeMinutes))  {
        minutes2CompactSizes.put(timeMinutes,  new  ArrayList<>());
      }
      minutes2CompactSizes.get(timeMinutes).add(compactSize);
    });  
    LinkedHashMap<String,CompactSize>  minutes2AggregatedSize  =  new  LinkedHashMap<>();
    minutes2CompactSizes.entrySet().forEach(entry  ->  {
      String  minutes  =  entry.getKey();
      entry.getValue().forEach(size  ->  {
        if  (!minutes2AggregatedSize.containsKey(size.getTimeMinutes()))  {
          minutes2AggregatedSize.put(size.getTimeMinutes(),new  CompactSize(size.getTime(),size.getCf(),size.getDataSize()));
        }  else  {
          CompactSize  total  =  minutes2AggregatedSize.get(size.getTimeMinutes());
          CompactSize  newTotal  =  new  CompactSize(size.getTime(),size.getCf(),  size.getDataSize()  +  total.getDataSize());
          minutes2AggregatedSize.put(size.getTimeMinutes(),  newTotal);
        }
      });
    });
    List<CompactSize>  result  =  new  ArrayList<>();
    minutes2AggregatedSize.entrySet().forEach(entry  ->  {
      result.add(entry.getValue());
    });
    Collections.sort(result,  Comparator.comparingLong(CompactSize::getTimeMillis));
    return  result;
  }
  
  private  static  String  toCompactSizesString(List<CompactSize>  cf2compactSizes)  {
    StringBuilder  sb  =  new  StringBuilder();
    cf2compactSizes.forEach(size  ->  {
      sb.append(size.getCf()  +  ","  +  size.getTime()  +  ","  +  size.getDataSizeInMB()).append("\n");
    });
    return  sb.toString();
  }
  
  private  static  String  toCompactSizesString(Map<String,List<CompactSize>>  cf2compactSizes)  {
    StringBuilder  sb  =  new  StringBuilder();
    cf2compactSizes.keySet().forEach(cf  ->  {
      cf2compactSizes.get(cf).forEach(size  ->  {
        sb.append(size.getCf()  +  ","  +  size.getTime()  +  ","  +  size.getDataSizeInMB()).append("\n");
      });
    });
    return  sb.toString();
  }

  private  static  List<CompactSize>  toCompactSizeList(File  folder)  {
    List<CompactSize>  result  =  new  ArrayList<>();
    Arrays.stream(folder.listFiles())
        .filter(file  ->  file.getName().contains("system.log"))
        .forEach(file  ->  {
          try  {
            for  (String  line:  FileUtils.readLines(file))  {
              if  (  CompactSize.isMyPattern(line)  )  {
                CompactSize  compactSize  =  new  CompactSize(line);
                result.add(compactSize);
              }
            }
          }  catch  (Exception  ex)  {
            ex.printStackTrace();
          }
        });
    Collections.sort(result,  Comparator.comparingLong(CompactSize::getTimeMillis));
    return  result;
  }
  
  private  static  Map<String,List<CompactSize>>  toCompactSizes(File  folder)  {
    Map<String,List<CompactSize>>  cf2CompactSizes  =  new  HashMap<>();
    Arrays.stream(folder.listFiles())
        .filter(file  ->  file.getName().contains("system.log"))
        .forEach(file  ->  {
          try  {
            for  (String  line:  FileUtils.readLines(file))  {
              if  (  CompactSize.isMyPattern(line)  )  {
                CompactSize  compactSize  =  new  CompactSize(line);
                if  (!cf2CompactSizes.containsKey(compactSize.getCf()))  {
                  cf2CompactSizes.put(compactSize.getCf(),  new  ArrayList<>());
                }
                cf2CompactSizes.get(compactSize.getCf()).add(compactSize);
              }
            }
          }  catch  (Exception  ex)  {
            ex.printStackTrace();
          }
        });
    cf2CompactSizes.values().forEach(compactSizes  ->  
        Collections.sort(compactSizes,  Comparator.comparingLong(CompactSize::getTimeMillis)));
    return  cf2CompactSizes;
  }
  
  private  static  class  CompactSize  {
    private  final  String  time;
    private  final  String  cf;
    private  final  long  dataSize;
    
    CompactSize(String  time,  String  cf,  long  dataSize)  {
      this.time  =  time;
      this.cf  =  cf;
      this.dataSize  =  dataSize;
    }
    
    CompactSize(String  line)  {
      this.time  =  substringBetween(line,  "]  ",",");
      this.cf  =  substringBetween(line,  "/data/ng/db/data/wsg/",  "/");
      this.dataSize  =  NumberUtils.toInt(StringUtils.replace(substringBetween(line,  "].    ",  "  bytes"),",",""));
    }

    public  static  boolean  isMyPattern(String  line)  {
      return  StringUtils.contains(line,  "CompactionTask.java  (line  302)  Compacted")
          &&  StringUtils.contains(line,  "[/data/ng/db/data/wsg/");
    }
    
    public  long  getDataSizeInMB()  {
      return  dataSize  //  bytes  
          /  1024    //  KB
          /  1024;  //  MB
    }
    
    public  long  getDataSize()  {
      return  dataSize;
    }

    public  String  getCf()  {
      return  cf;
    }

    public  String  getTimeMinutes()  {
      long  millis  =  getTimeMillis();
      return  sdfToMinutes.format(new  Date(millis));
    }
    
    public  Long  getTimeMillis()  {
      return  toMillis(getTime());
    }
    
    public  String  getTime()  {
      return  time;
    }
  }
  
  private  static  String  toString(List<FlushEvent>  events)  {
    StringBuilder  sb  =  new  StringBuilder();
    AtomicInteger  waitForWrite  =  new  AtomicInteger();
    AtomicInteger  writing  =  new  AtomicInteger();
    events.forEach(e  ->  {
      switch  (  e.getType()  )  {
        case  Enqueue:
          waitForWrite.incrementAndGet();
          break;
        case  Write:
          writing.incrementAndGet();
          waitForWrite.decrementAndGet();
          break;
        case  Complete:
          writing.decrementAndGet();
          break;
        default:  
          throw  new  RuntimeException("Shouldn't  happen");
      }
      sb.append(e.getTime()  +  ","  +  waitForWrite  +  ","  +  writing).append("\n");
    });
    return  sb.toString();
  }
  
  private  static  List<FlushEvent>  toEvents(File  folder)  {
    List<FlushEvent>  events  =  new  ArrayList<>();
    Arrays.stream(folder.listFiles())
        .filter(file  ->  file.getName().contains("system.log"))
        .forEach(file  ->  {
          try  {
            for  (String  line:  FileUtils.readLines(file))  {
              Optional<FlushEventType>  type  =  FlushEventType.getByPattern(line);
              type.ifPresent(t  ->  {
                events.add(new  FlushEvent(line));
              });
            }
          }  catch  (Exception  ex)  {
            ex.printStackTrace();
          }
          });
    Collections.sort(events,  Comparator.comparingLong(FlushEvent::getTimeMillis));
    return  events;
  }

  private  static  String  toAvgString(Map<String,  LinkedHashMap<String,Long>>  cfToTimeToAvg)  {
    StringBuilder  sb  =  new  StringBuilder();
    cfToTimeToAvg.keySet().forEach(cf  ->  {
      cfToTimeToAvg.get(cf).forEach((time,avg)  ->  {
        sb.append(cf  +  ","  +  time  +  ","  +  avg).append("\n");
      });
    });
    return  sb.toString();
  }

  private  static  String  toString(Map<String,  List<Enqueue>>  cfToE)  {
    StringBuilder  sb  =  new  StringBuilder();
    cfToE.values().stream().forEach(eList  ->  {
      eList.stream().forEach(e  ->  {
        try  {
          sb.append(e.getCf()  +  ","  +  e.getTime()  +  ","  +  e.serializedMB).append("\n");
        }  catch  (Exception  ex)  {
          ex.printStackTrace();
        }
      });
    });
    return  sb.toString();
  }

  private  static  Map<String,  LinkedHashMap<String,Long>>  cfToTimeToAvg(Map<String,  List<Enqueue>>  cfToE)  {
    Map<String,  LinkedHashMap<String,Long>>  cfToTimeToAvg  =  new  HashMap<>();
    cfToE.keySet().forEach(cf  ->  {
      if  (!cfToTimeToAvg.containsKey(cf))  {
        cfToTimeToAvg.put(cf,  new  LinkedHashMap<>());
      }
      LinkedHashMap<String,List<Long>>  timeToMBList  =  new  LinkedHashMap<>();
      cfToE.get(cf).forEach(enqueue  ->  {
        if  (!timeToMBList.containsKey(enqueue.getTimeMinutes()))  {
          timeToMBList.put(enqueue.getTimeMinutes(),  new  ArrayList<>());
        }
        timeToMBList.get(enqueue.getTimeMinutes()).add(enqueue.serializedMB);
      });
      AtomicReference<String>  previousTime  =  new  AtomicReference<>();
      LinkedHashMap<String,Long>  timeToAvg  =  cfToTimeToAvg.get(cf);
      timeToMBList.keySet().forEach(time  ->  {
        try  {
          if  (previousTime.get()  !=  null)  {
            AtomicLong  totalMB  =  new  AtomicLong();
            timeToMBList.get(time).forEach(mb  ->  {
              totalMB.addAndGet(mb);
            });
            long  currentTimeMillis  =  sdfToMinutes.parse(time).getTime();
            long  previousTimeMillis  =  sdfToMinutes.parse(previousTime.get()).getTime();
            long  gapMinutes  =  TimeUnit.MILLISECONDS.toMinutes(currentTimeMillis  -  previousTimeMillis);
            timeToAvg.put(time,  (totalMB.longValue()  /  gapMinutes));
          }
          previousTime.set(time);
        }  catch  (Exception  ex)  {
          ex.printStackTrace();
        }
      });
    });
    return  cfToTimeToAvg;
  }

  private  static  Map<String,  List<Enqueue>>  cfToE(File  folder)  {
    Map<String,List<Enqueue>>  cfToE  =  new  HashMap<>();
    Arrays.stream(folder.listFiles())
        .filter(f  ->  f.getName().contains("system.log"))
        .forEach(file  ->  {
          try  {
            List<String>  lines  =  IOUtils.readLines(new  FileInputStream(file));
            for  (String  line:  lines)  {
              if  (StringUtils.contains(line,  "Enqueuing  flush  of  Memtable-"))  {
                Enqueue  e  =  new  Enqueue(line);
                if  (e.serializedMB  <=  1)  {
                  continue;
                }
                if  (!cfToE.containsKey(e.getCf()))  {
                  cfToE.put(e.getCf(),  new  ArrayList<>());
                }
                cfToE.get(e.getCf()).add(e);
              }
            }
          }  catch  (Exception  ex)  {
            ex.printStackTrace();
          }
        });
    cfToE.keySet().forEach(k  ->  {
      cfToE.get(k).sort((e1,e2)  ->  {
        try  {
          return  (int)(e1.getTimeMillis()-e2.getTimeMillis());
        }  catch  (Exception  ex)  {
          ex.printStackTrace();
          return  0;
        }
      });
    });
    return  cfToE;
  }

  private  enum  FlushEventType  {
    Enqueue("Enqueuing  flush  of  Memtable-"),
    Write("Writing  Memtable-"),
    Complete("Completed  flushing  ");
    private  final  String  pattern;
    FlushEventType(String  pattern)  {
      this.pattern  =  pattern;
    }

    public  String  getPattern()  {
      return  pattern;
    }

    public  static  Optional<FlushEventType>  getByPattern(String  line)  {
      return  Arrays.stream(values())
          .filter(type  ->  StringUtils.contains(line,  type.getPattern()))
          .findFirst();
    }
  }
  
  private  static  class  FlushEvent  {
    private  final  FlushEventType  type;
    private  final  String  time;
    FlushEvent(String  line)  {
      this.time  =  substringBetween(line,  "]  ",",");
      this.type  =  FlushEventType.getByPattern(line).get();  //  must  success
    }

    public  FlushEventType  getType()  {
      return  type;
    }

    public  Long  getTimeMillis()  {
      return  toMillis(getTime());
    }
    
    public  String  getTime()  {
      return  time;
    }
  }
  
  private  static  class  Enqueue  {
    private  final  String  time;
    private  final  String  cf;
    private  final  int  ops;
    private  final  long  serializedMB;

    Enqueue(String  line)  throws  ParseException  {
      this.time  =  substringBetween(line,  "]  ",",");
      this.cf  =  substringBetween(line,  "Enqueuing  flush  of  Memtable-",  "@");
      this.ops  =  NumberUtils.toInt(substringBetween(line,  "serialized/live  bytes,  ",  "  ops)"));
      this.serializedMB  =  NumberUtils.toInt(substringBetween(substringAfter(line,  "Enqueuing  flush  of  Memtable-"),  "(",  "/"))  /  1024  /  1024;
    }

    @Override
    public  String  toString()  {
      return  new  ToStringBuilder(this)
          .append("time",time)
          .append("cf",cf)
          .append("ops",ops)
          .append("serializedMB",  serializedMB)
          .toString();
    }

    public  String  getTimeMinutes()  {
      long  millis  =  getTimeMillis();
      return  sdfToMinutes.format(new  Date(millis));
    }

    public  Long  getTimeMillis()  {
      return  toMillis(getTime());
    }

    public  String  getTime()  {
      return  time;
    }

    public  String  getCf()  {
      return  cf;
    }

  }

  private  static  final  SimpleDateFormat  sdfToMinutes  =  new  SimpleDateFormat("yyyy-MM-dd  HH:mm");
  private  static  final  SimpleDateFormat  sdf  =  new  SimpleDateFormat("yyyy-MM-dd  HH:mm:SS");

  private  static  long  toMillis(String  time)  {
    try  {
      Date  date  =  sdf.parse(time);
      return  date.getTime();
    }  catch  (Exception  ex)  {
      throw  new  RuntimeException(ex);
    }
  }

}

別名演算法 Alias Method

 題目 每個伺服器支援不同的 TPM (transaction per minute) 當 request 來的時候, 系統需要馬上根據 TPM 的能力隨機找到一個適合的 server. 雖然稱為 "隨機", 但還是需要有 TPM 作為權重. 解法 別名演算法...