init mongodb data in docker

 1. Given file in project folder ./mongo/docker-entrypoint-initdb.d:/initdb.sh

```

echo '=====================================>'

mongo --eval 'db.getSiblingDB("testqq").createUser({"user": "admin", "pwd": "admin", roles: [{"role": "readWrite","db": "testqq"}]});'

mongo --eval 'db.getSiblingDB("testqq").users.insert({"username" : "admin", "password" : "admin", "email" : "admin@gmail.com"});'

echo '<======================================'

```


2. Given docker-compose.yml

```

services:

  mongo:

    image: mongo:4.4

    ports:

      - "27017:27017"

    volumes:

    - "./mongo/docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d"

```


3. Start docker-compose

```

$ docker-compose up

```


4. Check log, can find following log

```

mongo_1  | /usr/local/bin/docker-entrypoint.sh: running /docker-entrypoint-initdb.d/initdb.sh

mongo_1  | =====================================>

mongo_1  | MongoDB shell version v4.4.3

mongo_1  | connecting to: mongodb://127.0.0.1:27017/?compressors=disabled&gssapiServiceName=mongodb

mongo_1  | {"t":{"$date":"2021-01-25T16:48:42.786+00:00"},"s":"I",  "c":"NETWORK",  "id":22943,   "ctx":"listener","msg":"Connection accepted","attr":{"remote":"127.0.0.1:46062","connectionId":2,"connectionCount":1}}

mongo_1  | {"t":{"$date":"2021-01-25T16:48:42.787+00:00"},"s":"I",  "c":"NETWORK",  "id":51800,   "ctx":"conn2","msg":"client metadata","attr":{"remote":"127.0.0.1:46062","client":"conn2","doc":{"application":{"name":"MongoDB Shell"},"driver":{"name":"MongoDB Internal Client","version":"4.4.3"},"os":{"type":"Linux","name":"Ubuntu","architecture":"x86_64","version":"18.04"}}}}

mongo_1  | Implicit session: session { "id" : UUID("14d8434a-7c74-4509-a5de-d55a4c63bdf4") }

mongo_1  | MongoDB server version: 4.4.3

mongo_1  | {"t":{"$date":"2021-01-25T16:48:42.835+00:00"},"s":"I",  "c":"STORAGE",  "id":20320,   "ctx":"conn2","msg":"createCollection","attr":{"namespace":"admin.system.users","uuidDisposition":"generated","uuid":{"uuid":{"$uuid":"e8923719-dd5a-4a94-a417-bb32a7ca4ebc"}},"options":{}}}

mongo_1  | {"t":{"$date":"2021-01-25T16:48:42.854+00:00"},"s":"I",  "c":"INDEX",    "id":20345,   "ctx":"conn2","msg":"Index build: done building","attr":{"buildUUID":null,"namespace":"admin.system.users","index":"_id_","commitTimestamp":{"$timestamp":{"t":0,"i":0}}}}

mongo_1  | {"t":{"$date":"2021-01-25T16:48:42.854+00:00"},"s":"I",  "c":"INDEX",    "id":20345,   "ctx":"conn2","msg":"Index build: done building","attr":{"buildUUID":null,"namespace":"admin.system.users","index":"user_1_db_1","commitTimestamp":{"$timestamp":{"t":0,"i":0}}}}

mongo_1  | Successfully added user: {

mongo_1  | "user" : "admin",

mongo_1  | "roles" : [

mongo_1  | {

mongo_1  | "role" : "readWrite",

mongo_1  | "db" : "testqq"

mongo_1  | }

mongo_1  | ]

mongo_1  | }

mongo_1  | {"t":{"$date":"2021-01-25T16:48:42.859+00:00"},"s":"I",  "c":"NETWORK",  "id":22944,   "ctx":"conn2","msg":"Connection ended","attr":{"remote":"127.0.0.1:46062","connectionId":2,"connectionCount":0}}

mongo_1  | MongoDB shell version v4.4.3

mongo_1  | connecting to: mongodb://127.0.0.1:27017/?compressors=disabled&gssapiServiceName=mongodb

mongo_1  | {"t":{"$date":"2021-01-25T16:48:42.926+00:00"},"s":"I",  "c":"NETWORK",  "id":22943,   "ctx":"listener","msg":"Connection accepted","attr":{"remote":"127.0.0.1:46064","connectionId":3,"connectionCount":1}}

mongo_1  | {"t":{"$date":"2021-01-25T16:48:42.927+00:00"},"s":"I",  "c":"NETWORK",  "id":51800,   "ctx":"conn3","msg":"client metadata","attr":{"remote":"127.0.0.1:46064","client":"conn3","doc":{"application":{"name":"MongoDB Shell"},"driver":{"name":"MongoDB Internal Client","version":"4.4.3"},"os":{"type":"Linux","name":"Ubuntu","architecture":"x86_64","version":"18.04"}}}}

mongo_1  | Implicit session: session { "id" : UUID("b0df77be-4789-4cb7-a1d0-6b64717b0207") }

mongo_1  | MongoDB server version: 4.4.3

mongo_1  | {"t":{"$date":"2021-01-25T16:48:42.937+00:00"},"s":"I",  "c":"STORAGE",  "id":20320,   "ctx":"conn3","msg":"createCollection","attr":{"namespace":"testqq.users","uuidDisposition":"generated","uuid":{"uuid":{"$uuid":"fb550551-8a86-4f98-9a93-b324e057084d"}},"options":{}}}

mongo_1  | {"t":{"$date":"2021-01-25T16:48:42.950+00:00"},"s":"I",  "c":"INDEX",    "id":20345,   "ctx":"conn3","msg":"Index build: done building","attr":{"buildUUID":null,"namespace":"testqq.users","index":"_id_","commitTimestamp":{"$timestamp":{"t":0,"i":0}}}}

mongo_1  | WriteResult({ "nInserted" : 1 })

mongo_1  | {"t":{"$date":"2021-01-25T16:48:42.957+00:00"},"s":"I",  "c":"NETWORK",  "id":22944,   "ctx":"conn3","msg":"Connection ended","attr":{"remote":"127.0.0.1:46064","connectionId":3,"connectionCount":0}}

mongo_1  | <======================================

```


5. Access mongo and check db

```

cds % docker exec -it cds_mongo_1 bash

root@186eb2a0f48d:/# mongo

MongoDB shell version v4.4.3

connecting to: mongodb://127.0.0.1:27017/?compressors=disabled&gssapiServiceName=mongodb

Implicit session: session { "id" : UUID("59729a8b-ecb1-439a-ab9f-63cc06f30e54") }

MongoDB server version: 4.4.3

Welcome to the MongoDB shell.

For interactive help, type "help".

For more comprehensive documentation, see

https://docs.mongodb.com/

Questions? Try the MongoDB Developer Community Forums

https://community.mongodb.com

---

The server generated these startup warnings when booting:

        2021-01-25T16:52:03.600+00:00: Using the XFS filesystem is strongly recommended with the WiredTiger storage engine. See http://dochub.mongodb.org/core/prodnotes-filesystem

        2021-01-25T16:52:04.496+00:00: Access control is not enabled for the database. Read and write access to data and configuration is unrestricted

---

---

        Enable MongoDB's free cloud-based monitoring service, which will then receive and display

        metrics about your deployment (disk utilization, CPU, operation statistics, etc).


        The monitoring data will be available on a MongoDB website with a unique URL accessible to you

        and anyone you share the URL with. MongoDB may use this information to make product

        improvements and to suggest MongoDB products and deployment options to you.


        To enable free monitoring, run the following command: db.enableFreeMonitoring()

        To permanently disable this reminder, run the following command: db.disableFreeMonitoring()

---

> db.getSiblingDB('testqq').getUsers()

[

{

"_id" : "testqq.admin",

"userId" : UUID("886d3d0f-7457-4724-8ae6-d9494382bce4"),

"user" : "admin",

"db" : "testqq",

"roles" : [

{

"role" : "readWrite",

"db" : "testqq"

}

],

"mechanisms" : [

"SCRAM-SHA-1",

"SCRAM-SHA-256"

]

}

]

>

```

Start Kafka by docker-compose

  1. docker-compose.yml
services:  
  zookeeper-server:
    image: bitnami/zookeeper:latest
    ports:
    - "2181:2181"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
    
  kafka-server:
    image: bitnami/kafka:latest
    ports:
    - "9092:9092"
    environment:
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092

  1. Start Kafka
docker-compose up -d
  1. Stop kafka
docker-compose down

Kafka Client assign and reset offset

 1. Producer send messages

```

private static void sendMessages(Properties kafkaProps, String topic) {

    for (int i = 0; i < MSG_COUNT; i++) {

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps)) {

            ProducerRecord<String, String> record = new ProducerRecord<>(topic, "mykey", "myval" + i);

            System.out.println("offset:" + producer.send(record).get(10, TimeUnit.SECONDS).offset());

        } catch (Throwable ex) {

            ex.printStackTrace();

        }

    }

}

```


2. Assign topic to consumer

```

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerProps("localhost:9092"))) {

            consumer.assign(consumer.partitionsFor(TOPIC).stream().map(p -> new TopicPartition(TOPIC, p.partition())).collect(Collectors.toList()));            

            resetOffset(consumer); // will NOT trigger java.lang.IllegalStateException: No current assignment for partition

```


3. Consume messages

```

private static int consumeAllMessages(KafkaConsumer<String, String> consumer) {

        int total = 0;

        while (true) {

            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));

            if (records.count() > 0) {

                total += records.count();

                System.out.println("receive:" + records.count() + " messages.");

            } else {

                System.out.println("receive no message");

                break;

            }

        }

        return total;

    }

```


4. Reset offset

```

private static void resetOffset(KafkaConsumer<String, String> consumer) {

        consumer.partitionsFor(TOPIC).forEach(p -> {

            System.out.println("Seek offset 0 in partition " + p.partition());

            consumer.seek(new TopicPartition(TOPIC, p.partition()), 0);

        });

    }

```


5. Consume all messages

```

private static int consumeAllMessages(KafkaConsumer<String, String> consumer) {

        int total = 0;

        while (true) {

            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));

            if (records.count() > 0) {

                total += records.count();

                System.out.println("receive:" + records.count() + " messages.");

            } else {

                System.out.println("receive no message");

                break;

            }

        }

        return total;

    }

```


Full code: 

Kafka Consumer specify offset

1. Producer send messages

```

private static void sendMessages(Properties kafkaProps, String topic) {

    for (int i = 0; i < MSG_COUNT; i++) {

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps)) {

            ProducerRecord<String, String> record = new ProducerRecord<>(topic, "mykey", "myval" + i);

            System.out.println("offset:" + producer.send(record).get(10, TimeUnit.SECONDS).offset());

        } catch (Throwable ex) {

            ex.printStackTrace();

        }

    }

}

```


2. Subscribe topic

```

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerProps("localhost:9092"))) {

            consumer.subscribe(Arrays.asList(TOPIC));

```


3. Consume messages

```

private static int consumeAllMessages(KafkaConsumer<String, String> consumer) {

        int total = 0;

        while (true) {

            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));

            if (records.count() > 0) {

                total += records.count();

                System.out.println("receive:" + records.count() + " messages.");

            } else {

                System.out.println("receive no message");

                break;

            }

        }

        return total;

    }

```


4. Reset offset

```

private static void resetOffset(KafkaConsumer<String, String> consumer) {

        consumer.partitionsFor(TOPIC).forEach(p -> {

            System.out.println("Seek offset 0 in partition " + p.partition());

            consumer.seek(new TopicPartition(TOPIC, p.partition()), 0);

        });

    }

```


5. Consume all messages

```

private static int consumeAllMessages(KafkaConsumer<String, String> consumer) {

        int total = 0;

        while (true) {

            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));

            if (records.count() > 0) {

                total += records.count();

                System.out.println("receive:" + records.count() + " messages.");

            } else {

                System.out.println("receive no message");

                break;

            }

        }

        return total;

    }

```


Full codes: ConsumerSubscribeAndAssignOffsetLaterMain.java 



別名演算法 Alias Method

 題目 每個伺服器支援不同的 TPM (transaction per minute) 當 request 來的時候, 系統需要馬上根據 TPM 的能力隨機找到一個適合的 server. 雖然稱為 "隨機", 但還是需要有 TPM 作為權重. 解法 別名演算法...