Netty Example - server echo user input message


Reference
Maven
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.10.Final</version>
</dependency>
Description
這個例子跟 guide 上的不太相同, 除了原本的 echo 行為.
另外有做讓人 input message, 從 client 送給 server 做 echo.
MainClass
public class EchoServerMain {

    public static void main(String[] params) throws Exception {
        new Thread(() -> {
            try {
                new EchoServer(1234).run();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
        TimeUnit.SECONDS.sleep(1);
        new EchoClient("localhost", 1234).connectRunAndExit();
    }

    public static class EchoClient {

        private final String ip;
        private final int port;

        EchoClient(String ip, int port) {
            this.port = port;
            this.ip = ip;
        }

        public void connectRunAndExit() throws InterruptedException {
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(workerGroup);
                b.channel(NioSocketChannel.class);
                b.handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new EchoClientHandler());
                    }
                });

                ChannelFuture f = b.connect(ip, port).sync();

                Channel ch = f.channel();
                System.out.println("scan...");
                Scanner scanner = new Scanner(System.in);
                while (scanner.hasNextLine()) {
                    String msg = scanner.nextLine();
                    System.out.println("Input:" + msg);
                    ByteBuf msgBuffer = Unpooled.wrappedBuffer(msg.getBytes(CharsetUtil.UTF_8));
                    ch.writeAndFlush(msgBuffer);
                }


                // Wait until the connection is closed.
                f.channel().closeFuture().sync();
            } finally {
                workerGroup.shutdownGracefully().sync();
            }
        }
    }


    public static class EchoServer {
        private int port;

        public EchoServer(int port) {
            this.port = port;
        }

        public void run() throws Exception {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(group)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                socketChannel.pipeline().addLast(new EchoServerHandler());
                            }
                        });

                ChannelFuture f = b.bind(port).sync();
                System.out.println("bind done");
                f.channel().closeFuture().sync();
                System.out.println("close done");
            } finally {
                group.shutdownGracefully().sync();
            }
        }
    }


}
EchoClientHandler
public class EchoClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
        System.out.println("Client receive:" + ((ByteBuf)msg).toString(Charset.defaultCharset()));
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
    }
}

EchoServerHandler
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        System.out.println("triggered:" + evt);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("server receive:" + msg);
        ctx.writeAndFlush(msg);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("connection active:" + ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("connection inactive:" + ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

Output

RabbitMQ Example

Reference

Example

一個 thread 持續收, main thread 持續送,
在 4 core PC 上每秒約5000+ 個訊息
package amqp;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author <a href="https://github.com/shooeugenesea">isaac</a>
 */
public class ProducerConsumerMain {

    private final static AtomicInteger sentMsg = new AtomicInteger();
    private final static AtomicInteger receiveMsg = new AtomicInteger();
    private final static String MSG_5K;
    private final static String QUEUE_NAME = "hello";

    static {
        StringBuilder sb = new StringBuilder();
        for ( int i = 0; i < 5000; i++ ) {
            sb.append(String.valueOf(i));
        }
        MSG_5K = sb.toString();
    }

    public static void main(String[] params) throws IOException, TimeoutException {
        new Thread(){
            @Override
            public void run() {
                try {
                    ConnectionFactory factory = new ConnectionFactory();
                    factory.setHost("localhost");
                    Connection connection = factory.newConnection();
                    Channel channel = connection.createChannel();

                    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

                    Consumer consumer = new DefaultConsumer(channel) {
                        @Override
                        public void handleDelivery(String consumerTag, Envelope envelope,
                                                   AMQP.BasicProperties properties, byte[] body)
                                throws IOException {
                            String message = new String(body, "UTF-8");

                            receiveMsg.incrementAndGet();
//                            System.out.println(" [x] Received message '" + message.length() + "'");
                        }
                    };
                    channel.basicConsume(QUEUE_NAME, true, consumer);
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }.start();

        Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                System.out.println("receiveMsg:" + receiveMsg.getAndSet(0) + ", sentMsg:" + sentMsg.getAndSet(0));
            }
        }, 0, 1, TimeUnit.SECONDS);

        send();
    }

    private static void send() {
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            String message = MSG_5K;
            for ( ;; ) {
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                sentMsg.incrementAndGet();
//            System.out.println(" [x] Sent '" + message + "'");
            }

//            channel.close();
//            connection.close();
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

}

Google Protobuf - Basic Example

Reference

Example

Install protobuf

  1. Download from Git: https://github.com/protocolbuffers/protobuf
  2. Install protobuf
  3. Setup protobuf folder to PATH
  4. Try command: protoc

Define proto

syntax = "proto2";

package examples;

option java_package = "examples";
option java_outer_classname = "AddressBookProtos";

message Person {
    required string name = 1;
    required int32 id = 2;
    optional string email = 3;

    enum PhoneType {
        MOBILE = 0;
        HOME = 1;
        WORK = 2;
    }

    message PhoneNumber {
        required string number = 1;
        optional PhoneType type = 2 [default = HOME];
    }

    repeated PhoneNumber phones = 4;
}

message AddressBook {
    repeated Person people = 1;
}

Generate Java Code

Execute command: 

protoc -I /Users/liaoisaac/projects/study-practice/src/main/resources -java_out=/Users/liaoisaac/projects/study-practice/src/main/java/ /Users/liaoisaac/projects/study-practice/src/main/resources/addressbook.proto
Java code will be generated in "/Users/liaoisaac/projects/study-practice/src/main/java/"

Example with Maven Plugin

Introduce Maven

Define Maven plugin: protobuf-maven-plugin
(注意我把 output dir 改到我想要的 generated-java folder 了)



<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <groupId>shooeugenesea</groupId>
 <artifactId>study-examples</artifactId>
 <version>0.0.1-SNAPSHOT</version>
 <dependencies>
  <dependency>
   <groupId>junit</groupId>
   <artifactId>junit</artifactId>
   <version>4.11</version>
   <scope>test</scope>
  </dependency>
  <dependency>
   <groupId>com.google.protobuf</groupId>
   <artifactId>protobuf-java</artifactId>
   <version>3.3.0</version>
  </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.7</version>
        </dependency>
    </dependencies>
 <build>
  <plugins>
   <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>3.7.0</version>
    <configuration>
     <source>1.8</source>
     <target>1.8</target>
    </configuration>
   </plugin>
   <plugin>
    <groupId>org.xolstice.maven.plugins</groupId>
    <artifactId>protobuf-maven-plugin</artifactId>
    <version>0.5.1</version>
    <configuration>
     <protocExecutable>/usr/local/bin/protoc</protocExecutable>
                    <outputDirectory>${basedir}/src/main/generated-java</outputDirectory>
    </configuration>
    <executions>
     <execution>
      <goals>
       <goal>compile</goal>
       <goal>test-compile</goal>
      </goals>
     </execution>
    </executions>
   </plugin>
   <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>3.1</version>
    <configuration>
     <verbose>true</verbose>
     <fork>true</fork>
     <compilerVersion>1.8</compilerVersion>
    </configuration>
   </plugin>
   <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>3.3</version>
    <configuration>
     <source>1.8</source>
     <target>1.8</target>
    </configuration>
   </plugin>
  </plugins>
 </build>
</project>
設定好之後執行 mvn clean package 就可以根據 proto generate 出 java code

Read Write generated objects

package examples;

import java.io.*;

public class ReadWriteProtobufMain {

    public static void main(String[] params) throws IOException {
        AddressBookProtos.Person person = AddressBookProtos.Person.newBuilder()
                .setEmail("email@com")
                .setName("myname")
                .setId(123)
                .addPhones(AddressBookProtos.Person.PhoneNumber.newBuilder()
                        .setType(AddressBookProtos.Person.PhoneType.HOME)
                        .setNumber("12345678")
                        .build())
                .build();


        try (PipedOutputStream out = new PipedOutputStream();
             PipedInputStream in = new PipedInputStream(out)) {
            System.out.println("write Person:\n" + person);
            person.writeTo(out);
            out.close();

            AddressBookProtos.Person readPerson = AddressBookProtos.Person.newBuilder().mergeFrom(in).build();
            System.out.println("read Person:\n" + readPerson);
        }

    }

}

Output


別名演算法 Alias Method

 題目 每個伺服器支援不同的 TPM (transaction per minute) 當 request 來的時候, 系統需要馬上根據 TPM 的能力隨機找到一個適合的 server. 雖然稱為 "隨機", 但還是需要有 TPM 作為權重. 解法 別名演算法...