顯示具有 cassandra 標籤的文章。 顯示所有文章
顯示具有 cassandra 標籤的文章。 顯示所有文章

Spring Data Reactive Cassandra CRUD

 Git branch


Cassandra commands
sudo docker run --name test-cassandra -p 9042:9042 -d cassandra:latest
sudo docker run --name test-cassandra-2 --link test-cassandra:cassandra -d cassandra:latest

docker exec -it test-cassandra /bin/bash

cqlsh> CREATE KEYSPACE test WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'datacenter1' : 3 } AND DURABLE_WRITES = false;

cqlsh> use test;

cqlsh:test> CREATE TABLE person ( id text PRIMARY KEY, name text );

Person.java
@Data
public class Person {

    @PrimaryKey private String id;
    private String name;

}

ReactivePersonRepository.java
public interface ReactivePersonRepository extends ReactiveCassandraRepository<Person, String> {}

ReactivePersonService.java
@Service
public class ReactivePersonService {

    @Autowired
    private ReactivePersonRepository reactivePersonRepository;

    public Mono<Person> save(Person person) {
        return reactivePersonRepository.save(person);
    }

    public Mono<Person> findById(String id) {
        return reactivePersonRepository.findById(id);
    }

    public Flux<Person> findAll() {
        return reactivePersonRepository.findAll();
    }

    public Mono<Void> deleteById(String id) {
        return reactivePersonRepository.deleteById(id);
    }

    public Mono<Void> deleteAll() {
        return reactivePersonRepository.deleteAll();
    }

}

ReactivePersonController.java
@RestController
@RequestMapping("/api/v2/")
public class ReactivePersonController {

    @Autowired
    private ReactivePersonService reactivePersonService;

    @PostMapping(value = "/person", consumes = "application/json")
    public Mono<Person> createPerson(@RequestBody Person person) {
        System.out.println("create person" + person);
        person.setId(UUID.randomUUID().toString());
        return reactivePersonService.save(person);
    }

    @GetMapping(value = "/person")
    public Mono<Person> getPerson(@RequestParam String id) {
        return reactivePersonService.findById(id);
    }

    @GetMapping(value = "/persons")
    public Flux<Person> getAllPersons() {
        return reactivePersonService.findAll();
    }

    @DeleteMapping("/person/{id}")
    public Mono<Void> deletePerson(@PathVariable String id) {
        return reactivePersonService.deleteById(id);
    }

    @DeleteMapping("/persons")
    public Mono<Void> deleteAll() {
        return reactivePersonService.deleteAll();
    }

Convert CQL query result file hex to ascii

Under Cassandra-2.0.17, I want to query a table by cqlsh

First of all, I put query in a file: ~/query.cql
select * from "myData" LIMIT 50000;

Then use cqlsh to execute command, output to file
cqlsh -k mykeyspace -f ~/query.cql > ~/query.out

or 

cqlsh -k wsg -e 'select * from "mykeyspace" LIMIT 50000;' > ~/query.out

Query result format looks like as follows
key                                                                        | 5f6261636b7570546167                                         
----------------------------------------------------------------------------+--------------------------------------------------------------
0x36346265303431622d376335652d346530332d393964352d373135613939306636666532 |  bbb
0x61666637623538372d643537632d346662662d613265662d666262363164626163363138 |  aaa

This table contains many hex string in column name and values, so I convert hex to ASCII code by following code.
(Not sure only start with 5f and 0x is hex, in my case I only need handle this two)
public void convert() throws IOException {
    StringBuilder copy = new StringBuilder();
    File file = new File("C:\\query.out");
    for (String line: FileUtils.readLines(file, "UTF-8")) {
        if (line.startsWith("--")) continue;
        String[] splitted = StringUtils.splitString(line, " ");
        if (splitted.length < 2) continue;
        for (int i = 0; i < splitted.length; i++) {
            if (splitted[i].startsWith("5f") || splitted[i].startsWith("0x")) {
                splitted[i] = hexToAscii(splitted[i].substring(2));
            }
        }
        copy.append(Arrays.stream(splitted).filter(s -> !s.equals("|")).collect(Collectors.joining(","))).append("\n");
    }
    FileUtils.write(new File("C:\\query.copy"), copy.toString(), "UTF-8");
}


private static String hexToAscii(String hexStr) {
    StringBuilder output = new StringBuilder("");

    try {
        for (int i = 0; i < hexStr.length(); i += 2) {
            String str = hexStr.substring(i, i + 2);
            output.append((char) Integer.parseInt(str, 16));
        }
    } catch (NumberFormatException ex) {
        return hexStr;
    }

    return output.toString();
}

Scan Cassandra-2.0.17 log to check write data loading


We use Cassandra 2.0.17.
Sometimes we suffer IO problem in system, we can use "iotop" to know process IO loading.
But sometimes we need more detail, such as "io-write loading for each table"
Cassandra log is very good for us to know about it.
I write a program to scan it, generate a csv file, and then copy-paste it to Google Sheet for further analyzing, such as generate a graphic.

This program will generate some files (I write this file long time ago, please inform me if file content is wrong):
  • output-compact-mb-xxx: Write loading by compaction
  • output-compact-mb-agg-xxx: Write loading per minutes by compaction
  • output-queue-xxx: Queue size of flush memory event
  • output-xxx: Flush memory size by column family
  • output-xxx-avg: Flush memory size per minutes by colmnfamily

This program can scan file pattern "system.log" and "system.log.*"

I often check output-xxx file, content is as follows:
column 1 - column family name
column 2 - flush event time stamp
column 3 - flush memory (MB)
mytable,2019-03-24 16:16:28,11


Here is the program, please include commons-lang and commons-io as dependency:
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.math.NumberUtils;


import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;


import static org.apache.commons.lang3.StringUtils.substringAfter;
import static org.apache.commons.lang3.StringUtils.substringBetween;


public class ScanLog {
    private static final String fileSufix = "";
    private static final String LOG_FOLDER = "{CassandraPathContainsSystemLog}\\cassandra\\";
    private static final String OUTPUT_FOLDER = "\\OutputFoldlerYouWantGenerateCsv\\";


    public static void main(String[] params) throws IOException, ParseException {
        File folder = new File(LOG_FOLDER);
        Map<String, List<CompactSize>> cf2CompactSizes = toCompactSizes(folder);
        System.out.println("cf2CompactSizes.size:" + cf2CompactSizes.size());
        FileUtils.write(new File(OUTPUT_FOLDER + "output-compact-mb-" + fileSufix + ".csv"), toCompactSizesString(cf2CompactSizes), "UTF-8");


        // Compact avg per minutes
        List<CompactSize> aggCompactSizes = aggregate(toCompactSizeList(folder));
        System.out.println("aggCompactSizes.size:" + aggCompactSizes.size());
        FileUtils.write(new File(OUTPUT_FOLDER + "output-compact-mb-agg-" + fileSufix + ".csv"), toCompactSizesString(aggCompactSizes), "UTF-8");


        // Overall Flush
        List<FlushEvent> events = toEvents(folder);
        System.out.println("events.size:" + events.size());
        FileUtils.write(new File(OUTPUT_FOLDER + "output-queue-" + fileSufix + ".csv"), toString(events), "UTF-8");


        // Flush by CF
        Map<String, List<Enqueue>> cfToE = cfToE(folder);
        System.out.println("cfToE.size:" + cfToE.size());
        Map<String, LinkedHashMap<String, Long>> cfToTimeToAvg = cfToTimeToAvg(cfToE);
        System.out.println("cfToTimeToAvg.size:" + cfToTimeToAvg.size());


        FileUtils.write(new File(OUTPUT_FOLDER + "output-" + fileSufix + ".csv"), toString(cfToE), "UTF-8");
        FileUtils.write(new File(OUTPUT_FOLDER + "output-" + fileSufix + "-avg.csv"), toAvgString(cfToTimeToAvg), "UTF-8");
    }


    private static List<CompactSize> aggregate(List<CompactSize> compactSizes) {
        LinkedHashMap<String, List<CompactSize>> minutes2CompactSizes = new LinkedHashMap<>();
        compactSizes.forEach(compactSize -> {
            String timeMinutes = compactSize.getTimeMinutes();
            if (!minutes2CompactSizes.containsKey(timeMinutes)) {
                minutes2CompactSizes.put(timeMinutes, new ArrayList<>());
            }
            minutes2CompactSizes.get(timeMinutes).add(compactSize);
        });
        LinkedHashMap<String, CompactSize> minutes2AggregatedSize = new LinkedHashMap<>();
        minutes2CompactSizes.entrySet().forEach(entry -> {
            String minutes = entry.getKey();
            entry.getValue().forEach(size -> {
                if (!minutes2AggregatedSize.containsKey(size.getTimeMinutes())) {
                    minutes2AggregatedSize.put(size.getTimeMinutes(), new CompactSize(size.getTime(), size.getCf(), size.getDataSize()));
                } else {
                    CompactSize total = minutes2AggregatedSize.get(size.getTimeMinutes());
                    CompactSize newTotal = new CompactSize(size.getTime(), size.getCf(), size.getDataSize() + total.getDataSize());
                    minutes2AggregatedSize.put(size.getTimeMinutes(), newTotal);
                }
            });
        });
        List<CompactSize> result = new ArrayList<>();
        minutes2AggregatedSize.entrySet().forEach(entry -> {
            result.add(entry.getValue());
        });
        Collections.sort(result, Comparator.comparingLong(CompactSize::getTimeMillis));
        return result;
    }


    private static String toCompactSizesString(List<CompactSize> cf2compactSizes) {
        StringBuilder sb = new StringBuilder();
        cf2compactSizes.forEach(size -> {
            sb.append(size.getCf() + "," + size.getTime() + "," + size.getDataSizeInMB()).append("\n");
        });
        return sb.toString();
    }


    private static String toCompactSizesString(Map<String, List<CompactSize>> cf2compactSizes) {
        StringBuilder sb = new StringBuilder();
        cf2compactSizes.keySet().forEach(cf -> {
            cf2compactSizes.get(cf).forEach(size -> {
                sb.append(size.getCf() + "," + size.getTime() + "," + size.getDataSizeInMB()).append("\n");
            });
        });
        return sb.toString();
    }


    private static List<CompactSize> toCompactSizeList(File folder) {
        List<CompactSize> result = new ArrayList<>();
        Arrays.stream(folder.listFiles())
                .filter(file -> file.getName().contains("system.log"))
                .forEach(file -> {
                    try {
                        for (String line : FileUtils.readLines(file)) {
                            if (CompactSize.isMyPattern(line)) {
                                CompactSize compactSize = new CompactSize(line);
                                result.add(compactSize);
                            }
                        }
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                });
        Collections.sort(result, Comparator.comparingLong(CompactSize::getTimeMillis));
        return result;
    }


    private static Map<String, List<CompactSize>> toCompactSizes(File folder) {
        Map<String, List<CompactSize>> cf2CompactSizes = new HashMap<>();
        Arrays.stream(folder.listFiles())
                .filter(file -> file.getName().contains("system.log"))
                .forEach(file -> {
                    try {
                        for (String line : FileUtils.readLines(file)) {
                            if (CompactSize.isMyPattern(line)) {
                                CompactSize compactSize = new CompactSize(line);
                                if (!cf2CompactSizes.containsKey(compactSize.getCf())) {
                                    cf2CompactSizes.put(compactSize.getCf(), new ArrayList<>());
                                }
                                cf2CompactSizes.get(compactSize.getCf()).add(compactSize);
                            }
                        }
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                });
        cf2CompactSizes.values().forEach(compactSizes ->
                Collections.sort(compactSizes, Comparator.comparingLong(CompactSize::getTimeMillis)));
        return cf2CompactSizes;
    }


    private static class CompactSize {
        private final String time;
        private final String cf;
        private final long dataSize;


        CompactSize(String time, String cf, long dataSize) {
            this.time = time;
            this.cf = cf;
            this.dataSize = dataSize;
        }


        CompactSize(String line) {
            this.time = substringBetween(line, "] ", ",");
            this.cf = substringBetween(line, "/data/ng/db/data/wsg/", "/");
            this.dataSize = NumberUtils.toInt(StringUtils.replace(substringBetween(line, "].  ", " bytes"), ",", ""));
        }


        public static boolean isMyPattern(String line) {
            return StringUtils.contains(line, "CompactionTask.java (line 302) Compacted")
                    && StringUtils.contains(line, "[/data/ng/db/data/wsg/");
        }


        public long getDataSizeInMB() {
            return dataSize // bytes
                    / 1024  // KB
                    / 1024; // MB
        }


        public long getDataSize() {
            return dataSize;
        }


        public String getCf() {
            return cf;
        }


        public String getTimeMinutes() {
            long millis = getTimeMillis();
            return sdfToMinutes.format(new Date(millis));
        }


        public Long getTimeMillis() {
            return toMillis(getTime());
        }


        public String getTime() {
            return time;
        }
    }


    private static String toString(List<FlushEvent> events) {
        StringBuilder sb = new StringBuilder();
        AtomicInteger waitForWrite = new AtomicInteger();
        AtomicInteger writing = new AtomicInteger();
        events.forEach(e -> {
            switch (e.getType()) {
                case Enqueue:
                    waitForWrite.incrementAndGet();
                    break;
                case Write:
                    writing.incrementAndGet();
                    waitForWrite.decrementAndGet();
                    break;
                case Complete:
                    writing.decrementAndGet();
                    break;
                default:
                    throw new RuntimeException("Shouldn't happen");
            }
            sb.append(e.getTime() + "," + waitForWrite + "," + writing).append("\n");
        });
        return sb.toString();
    }


    private static List<FlushEvent> toEvents(File folder) {
        List<FlushEvent> events = new ArrayList<>();
        Arrays.stream(folder.listFiles())
                .filter(file -> file.getName().contains("system.log"))
                .forEach(file -> {
                    try {
                        for (String line : FileUtils.readLines(file)) {
                            Optional<FlushEventType> type = FlushEventType.getByPattern(line);
                            type.ifPresent(t -> {
                                events.add(new FlushEvent(line));
                            });
                        }
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                });
        Collections.sort(events, Comparator.comparingLong(FlushEvent::getTimeMillis));
        return events;
    }


    private static String toAvgString(Map<String, LinkedHashMap<String, Long>> cfToTimeToAvg) {
        StringBuilder sb = new StringBuilder();
        cfToTimeToAvg.keySet().forEach(cf -> {
            cfToTimeToAvg.get(cf).forEach((time, avg) -> {
                sb.append(cf + "," + time + "," + avg).append("\n");
            });
        });
        return sb.toString();
    }


    private static String toString(Map<String, List<Enqueue>> cfToE) {
        StringBuilder sb = new StringBuilder();
        cfToE.values().stream().forEach(eList -> {
            eList.stream().forEach(e -> {
                try {
                    sb.append(e.getCf() + "," + e.getTime() + "," + e.serializedMB).append("\n");
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            });
        });
        return sb.toString();
    }


    private static Map<String, LinkedHashMap<String, Long>> cfToTimeToAvg(Map<String, List<Enqueue>> cfToE) {
        Map<String, LinkedHashMap<String, Long>> cfToTimeToAvg = new HashMap<>();
        cfToE.keySet().forEach(cf -> {
            if (!cfToTimeToAvg.containsKey(cf)) {
                cfToTimeToAvg.put(cf, new LinkedHashMap<>());
            }
            LinkedHashMap<String, List<Long>> timeToMBList = new LinkedHashMap<>();
            cfToE.get(cf).forEach(enqueue -> {
                if (!timeToMBList.containsKey(enqueue.getTimeMinutes())) {
                    timeToMBList.put(enqueue.getTimeMinutes(), new ArrayList<>());
                }
                timeToMBList.get(enqueue.getTimeMinutes()).add(enqueue.serializedMB);
            });
            AtomicReference<String> previousTime = new AtomicReference<>();
            LinkedHashMap<String, Long> timeToAvg = cfToTimeToAvg.get(cf);
            timeToMBList.keySet().forEach(time -> {
                try {
                    if (previousTime.get() != null) {
                        AtomicLong totalMB = new AtomicLong();
                        timeToMBList.get(time).forEach(mb -> {
                            totalMB.addAndGet(mb);
                        });
                        long currentTimeMillis = sdfToMinutes.parse(time).getTime();
                        long previousTimeMillis = sdfToMinutes.parse(previousTime.get()).getTime();
                        long gapMinutes = TimeUnit.MILLISECONDS.toMinutes(currentTimeMillis - previousTimeMillis);
                        timeToAvg.put(time, (totalMB.longValue() / gapMinutes));
                    }
                    previousTime.set(time);
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            });
        });
        return cfToTimeToAvg;
    }


    private static Map<String, List<Enqueue>> cfToE(File folder) {
        Map<String, List<Enqueue>> cfToE = new HashMap<>();
        Arrays.stream(folder.listFiles())
                .filter(f -> f.getName().contains("system.log"))
                .forEach(file -> {
                    try {
                        List<String> lines = IOUtils.readLines(new FileInputStream(file));
                        for (String line : lines) {
                            if (StringUtils.contains(line, "Enqueuing flush of Memtable-")) {
                                Enqueue e = new Enqueue(line);
                                if (e.serializedMB <= 1) {
                                    continue;
                                }
                                if (!cfToE.containsKey(e.getCf())) {
                                    cfToE.put(e.getCf(), new ArrayList<>());
                                }
                                cfToE.get(e.getCf()).add(e);
                            }
                        }
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                });
        cfToE.keySet().forEach(k -> {
            cfToE.get(k).sort((e1, e2) -> {
                try {
                    return (int) (e1.getTimeMillis() - e2.getTimeMillis());
                } catch (Exception ex) {
                    ex.printStackTrace();
                    return 0;
                }
            });
        });
        return cfToE;
    }


    private enum FlushEventType {
        Enqueue("Enqueuing flush of Memtable-"),
        Write("Writing Memtable-"),
        Complete("Completed flushing ");
        private final String pattern;


        FlushEventType(String pattern) {
            this.pattern = pattern;
        }


        public String getPattern() {
            return pattern;
        }


        public static Optional<FlushEventType> getByPattern(String line) {
            return Arrays.stream(values())
                    .filter(type -> StringUtils.contains(line, type.getPattern()))
                    .findFirst();
        }
    }


    private static class FlushEvent {
        private final FlushEventType type;
        private final String time;


        FlushEvent(String line) {
            this.time = substringBetween(line, "] ", ",");
            this.type = FlushEventType.getByPattern(line).get(); // must success
        }


        public FlushEventType getType() {
            return type;
        }


        public Long getTimeMillis() {
            return toMillis(getTime());
        }


        public String getTime() {
            return time;
        }
    }


    private static class Enqueue {
        private final String time;
        private final String cf;
        private final int ops;
        private final long serializedMB;


        Enqueue(String line) throws ParseException {
            this.time = substringBetween(line, "] ", ",");
            this.cf = substringBetween(line, "Enqueuing flush of Memtable-", "@");
            this.ops = NumberUtils.toInt(substringBetween(line, "serialized/live bytes, ", " ops)"));
            this.serializedMB = NumberUtils.toInt(substringBetween(substringAfter(line, "Enqueuing flush of Memtable-"), "(", "/")) / 1024 / 1024;
        }


        @Override
        public String toString() {
            return new ToStringBuilder(this)
                    .append("time", time)
                    .append("cf", cf)
                    .append("ops", ops)
                    .append("serializedMB", serializedMB)
                    .toString();
        }


        public String getTimeMinutes() {
            long millis = getTimeMillis();
            return sdfToMinutes.format(new Date(millis));
        }


        public Long getTimeMillis() {
            return toMillis(getTime());
        }


        public String getTime() {
            return time;
        }


        public String getCf() {
            return cf;
        }


    }


    private static final SimpleDateFormat sdfToMinutes = new SimpleDateFormat("yyyy-MM-dd HH:mm");
    private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:SS");


    private static long toMillis(String time) {
        try {
            Date date = sdf.parse(time);
            return date.getTime();
        } catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }


}


別名演算法 Alias Method

 題目 每個伺服器支援不同的 TPM (transaction per minute) 當 request 來的時候, 系統需要馬上根據 TPM 的能力隨機找到一個適合的 server. 雖然稱為 "隨機", 但還是需要有 TPM 作為權重. 解法 別名演算法...