Saturday, November 10, 2018

Kafka



Kafkacat utility:
jzeng@cloud-dev-one:~$ kafkacat -b 10.5.134.22:31090 -L
Metadata for all topics (from broker -1: 10.5.134.22:31090/bootstrap):
 3 brokers:
  broker 2 at 172.17.0.27:9092
  broker 1 at 172.17.0.26:9092
  broker 0 at 172.17.0.23:9092
 4 topics:
  topic "__confluent.support.metrics" with 1 partitions:
    partition 0, leader 1, replicas: 1,0, isrs: 1
  topic "__consumer_offsets" with 50 partitions:
    partition 23, leader 2, replicas: 0,2,1, isrs: 2,1
    partition 41, leader 2, replicas: 0,2,1, isrs: 2,1
    partition 32, leader 1, replicas: 0,1,2, isrs: 2,1
    partition 8, leader 1, replicas: 0,1,2, isrs: 2,1
    partition 17, leader 2, replicas: 0,2,1, isrs: 2,1
    partition 44, leader 1, replicas: 0,1,2, isrs: 2,1
    partition 35, leader 2, replicas: 0,2,1, isrs: 2,1
……………
    partition 15, leader 1, replicas: 1,0,2, isrs: 2,1
    partition 42, leader 1, replicas: 1,2,0, isrs: 2,1
    partition 24, leader 1, replicas: 1,2,0, isrs: 2,1
    partition 33, leader 1, replicas: 1,0,2, isrs: 2,1
    partition 6, leader 1, replicas: 1,2,0, isrs: 2,1
    partition 0, leader 1, replicas: 1,2,0, isrs: 2,1
  topic "test1" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
  topic "test" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1


Zookeeper:

Lets consider another example where goal is to store some configurations in a <K,V> format and make it available across cluster of machines. The <K,V> should be persistent aka disk based and should be HA or Replicated and fault tolerant. ZooKeeper is a 'natural' for this use case.

Once zNodes are created with the desired path you can use GET and SET to use it as a distributed <K,V> store or hashmap.

Zookeeper is designed as a high read, high throughput system for small keys.  It is not designed as a large data store to hold very large data values.

For Kafka, Zookeeper’s role is:

1.     Broker registration, with heartbeat mechanism to keep the list current.
2.     Maintaining a list of topics alongside
·       Their configuration (partitions, replication factor, additional configurations, etc)
·       The list of ISRs (in sync replicas) for partitions
3.     Performing leader elections in case some brokers go down.
4.     Storing the Kafka cluster id (randomly generated at 1st startup of cluster.
5.     Store ACLs if security is enabled
·       Topics
·       Consumer Groups
·       Users
6.     Quotas configuration if enabled


Zookeeper command line interface:

root@zookeeper:/bin# zookeeper-shell localhost:2181
Connecting to localhost:2181
Welcome to ZooKeeper!
JLine support is enabled
[zk: localhost:2181(CONNECTING) 0]
WATCHER::

WatchedEvent state:SyncConnected type:None path:null
Ls /
[schema_registry, cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
[zk: localhost:2181(CONNECTED) 15] ls /schema_registry
[schema_registry_master, schema_id_counter]
[zk: localhost:2181(CONNECTED) 16] get /schema_registry/schema_registry_master
{"host":"schema-registry","port":8081,"master_eligibility":true,"scheme":"http","version":1}

Zookeep 4 letter words:

conf (configuration)
cons (connections)
dump (leader only, not working for followers)
envi. (environment)
srvr (server)
stat (statistics)
wchs (watches)
mntr (monitoring a list of variables for health of the quorum)

For example:

[zk: localhost:2181(CONNECTED) 2] root@zookeeper:/bin# echo "conf" | nc localhost 2181
clientPort=2181
dataDir=/var/lib/zookeeper/data/version-2
dataLogDir=/var/lib/zookeeper/log/version-2
tickTime=2000
maxClientCnxns=60
minSessionTimeout=4000
maxSessionTimeout=40000
serverId=0

zookeeper files:

root@zookeeper:/var/lib/zookeeper# ls
data  log

zoonavigator (Zookeeper UI):

SJCMACJ15JHTD8:kafka-single-node jzeng$ docker-compose -f zoonavigator.docker-compose.yml up -d



Maven:

Add following to top level element of pom.xml:

<build>

  <plugins>

    <plugin>

      <groupId>org.apache.maven.plugins</groupId>

      <artifactId>maven-compiler-plugin</artifactId>

      <configuration>

        <source>1.8</source>

        <target>1.8</target>

      </configuration>

    </plugin>

<plugin>

  <groupId>org.apache.maven.plugins</groupId>

  <artifactId>maven-assembly-plugin</artifactId>

  <configuration>

    <descriptorRefs>

      <descriptorRef>jar-with-dependencies</descriptorRef>

    </descriptorRefs>

  </configuration>

  <executions>

    <execution>

      <id>assemble-all</id>

      <phase>package</phase>

      <goals>

        <goal>single</goal>

      </goals>

    </execution>

  </executions>

</plugin>

<plugin>

  <groupId>org.apache.maven.plugins</groupId>

  <artifactId>maven-jar-plugin</artifactId>

  <configuration>

    <archive>

      <manifest>

        <addClasspath>true</addClasspath>

        <mainClass>fully.qualified.MainClass</mainClass>

      </manifest>

    </archive>

  </configuration>

</plugin>
  </plugins>
</
build>

Add following to ‘dependencies’ element of pom.xml:

<properties>

  <kafka.version>0.11.0.2</kafka.version>

  <kafka.scala.version>2.10</kafka.scala.version>

  <confluent.version>3.2.1</confluent.version>

  <avro.version>1.7.7</avro.version>

  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

</properties>
           
<dependency>

  <groupId>org.apache.kafka</groupId>

  <artifactId>kafka-clients</artifactId>

  <version>0.10.2.1</version>

</dependency>

<dependency>

  <groupId>org.apache.avro</groupId>

  <artifactId>avro</artifactId>

  <version>${avro.version}</version>

</dependency>

<dependency>

  <groupId>org.apache.avro</groupId>

  <artifactId>avro-maven-plugin</artifactId>

  <version>${avro.version}</version>

</dependency>

<dependency>

  <groupId>io.confluent</groupId>

  <artifactId>kafka-avro-serializer</artifactId>

  <version>${confluent.version}</version>

</dependency>


SJCMACJ15JHTD8:Kafka-Producer jzeng$ pwd
/Users/jzeng/pan/kafka/java/samples/Kafka-Producer
SJCMACJ15JHTD8:Kafka-Producer jzeng$ mvn archetype:generate -DgroupId=com.pan.app -DartifactId=Kafka-Producer -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
SJCMACJ15JHTD8:Kafka-Producer jzeng$ mvn package
SJCMACJ15JHTD8:Kafka-Producer jzeng$ mvn install
………
[INFO] Installing /Users/jzeng/pan/kafka/java/samples/Kafka-Producer/target/Kafka-Producer-1.0-SNAPSHOT.jar to /Users/jzeng/.m2/repository/com/pan/app/Kafka-Producer/1.0-SNAPSHOT/Kafka-Producer-1.0-SNAPSHOT.jar
[INFO] Installing /Users/jzeng/pan/kafka/java/samples/Kafka-Producer/pom.xml to /Users/jzeng/.m2/repository/com/pan/app/Kafka-Producer/1.0-SNAPSHOT/Kafka-Producer-1.0-SNAPSHOT.pom
………

mvn dependency:resolve

Download source jar and java doc through maven:

mvn dependency:sources
mvn dependency:resolve -Dclassifier=javadoc


Kafka

Depending on the specific hardware and its performance characteristics, a single broker can easily handle thousands of partitions and millions of messages per second.


Installation of Confluent

Install Docker CE for Mac:


Download confluent from their website and unzip it.

Run docker-compose from the folder which has docker-compose.yml file (under cp-all-in-one folder)

SJCMACJ15JHTD8:cp-all-in-one jzeng$ docker-compose up -d

Or

SJCMACJ15JHTD8:cp-all-in-one jzeng$ docker-compose start

Sample docker-compose.yml file:

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.0.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-enterprise-kafka:5.0.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  schema-registry:
    image: confluentinc/cp-schema-registry:5.0.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'


docker network create confluent

docker run -d \
    --net=confluent \
    --name=zookeeper \
    -e ZOOKEEPER_CLIENT_PORT=2181 \
    confluentinc/cp-zookeeper:5.0.0

docker run -d -P \
    --net=confluent \
    --name=kafka \
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT \
-e KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka:21090,EXTERNAL://192.168.64.2:9020 \
-e KAFKA_LISTENERS=INTERNAL://0.0.0.0:19092,EXTERNAL://0.0.0.0:9092 \
-e KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL \
    -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
    confluentinc/cp-kafka:5.0.0

docker run \
  --net=confluent \
  --rm confluentinc/cp-kafka:5.0.0 \
  kafka-topics --create --topic foo --partitions 1 --replication-factor 1 \
  --if-not-exists --zookeeper zookeeper:2181

docker run \
  --net=confluent \
  --rm \
  confluentinc/cp-kafka:5.0.0 \
  kafka-topics --describe --topic foo --zookeeper zookeeper:2181


docker run \
  --net=confluent \
  --rm \
  confluentinc/cp-kafka:5.0.0 \
  bash -c "seq 42 | kafka-console-producer --request-required-acks 1 \
  --broker-list kafka:9092 --topic foo && echo 'Produced 42 messages.'"

docker run \
  --net=confluent \
  --rm \
  confluentinc/cp-kafka:5.0.0 \
  kafka-console-consumer --bootstrap-server kafka:9092 --topic foo --from-beginning --max-messages 42




Login to docker machine:

docker-machine ssh


Kafka settings and data files:

jzeng@cloud-dev-one:~$ docker ps | grep  confluent
85bc6a431874        confluentinc/cp-enterprise-control-center:5.0.0   "/etc/confluent/dock…"   10 days ago         Up 10 days          0.0.0.0:29021->9021/tcp                                                                                                                                      control-center
63d3b0b70cb9        confluentinc/ksql-examples:5.0.0                  "bash -c 'echo Waiti…"   10 days ago         Up 10 days                                                                                                                                                                       ksql-datagen
0cef7d5aab21        confluentinc/cp-ksql-server:5.0.0                 "/etc/confluent/dock…"   10 days ago         Up 10 days          0.0.0.0:28088->8088/tcp                                                                                                                                      ksql-server
5c3aafb10e81        confluentinc/cp-kafka-connect:5.0.0               "/etc/confluent/dock…"   10 days ago         Up 10 days          9092/tcp, 0.0.0.0:28083->8083/tcp                                                                                                                            connect
5a8f48e0eec0        confluentinc/cp-kafka-rest:latest                 "/etc/confluent/dock…"   10 days ago         Up 10 days          0.0.0.0:28082->8082/tcp                                                                                                                                      rest-proxy
2119e8586cce        confluentinc/cp-schema-registry:5.0.0             "/etc/confluent/dock…"   10 days ago         Up 10 days          0.0.0.0:28081->8081/tcp                                                                                                                                      schema-registry
c587480e6dc9        confluentinc/cp-enterprise-kafka:5.0.0            "/etc/confluent/dock…"   10 days ago         Up 10 days          9092/tcp, 0.0.0.0:21090->21090/tcp                                                                                                                           broker
b99bf867d36a        confluentinc/cp-zookeeper:5.0.0                   "/etc/confluent/dock…"   2 weeks ago         Up 10 days          2181/tcp, 2888/tcp, 3888/tcp                                    

jzeng@cloud-dev-one:~$ docker exec -it 51c5b8e177ae /bin/bash

root@broker:/# /bin/cat /etc/kafka/server.properties
……
log.dirs=/var/lib/kafka
……

root@broker:/# ls /var/lib/kafka/data
NewTopic-0
NewTopic-1
Topic-03792e3e-3a57-4dc8-a807-8ab4f2246d21-0
Topic-03792e3e-3a57-4dc8-a807-8ab4f2246d21-1
Topic-0dd3d18e-6954-410e-87de-963029bcce32-0
Topic-0dd3d18e-6954-410e-87de-963029bcce32-1
Topic-20064df9-ef83-4853-bb0b-a6354473904a-0
Topic-20064df9-ef83-4853-bb0b-a6354473904a-1
Topic-2043e083-676d-4371-81a2-72870fdb20d5-0
Topic-2043e083-676d-4371-81a2-72870fdb20d5-1
Topic-403432ac-0e2e-4bec-a84a-f57976598fdd-0
Topic-403432ac-0e2e-4bec-a84a-f57976598fdd-1
Topic-4c29e523-5f11-48a0-b6ca-7c2d00a3c011-0
Topic-4c29e523-5f11-48a0-b6ca-7c2d00a3c011-1
Topic-89ad125b-8af4-44b5-b237-158a47a03fe1-0
Topic-89ad125b-8af4-44b5-b237-158a47a03fe1-1
Topic-939583f2-0cac-4d41-828a-a50881688a81-0
Topic-939583f2-0cac-4d41-828a-a50881688a81-1
Topic-9cf270f0-4706-419b-9ca2-e5a43c86aa68-0
Topic-9cf270f0-4706-419b-9ca2-e5a43c86aa68-1
Topic-Mon_Oct_01_17_07_49_PDT_2018-0
Topic-Mon_Oct_01_17_07_49_PDT_2018-1
Topic-Mon_Oct_01_17_07_49_PDT_2018-2
Topic-Mon_Oct_01_17_10_56_PDT_2018-0
Topic-Mon_Oct_01_17_10_56_PDT_2018-1
Topic-Mon_Oct_01_17_10_56_PDT_2018-2 ß topic name and partition

root@broker:/# ls -l /var/lib/kafka/data/Topic-Mon_Oct_01_17_10_56_PDT_2018-1
total 8
-rw-r--r-- 1 root root 10485760 Oct  2 00:10 00000000000000000000.index
-rw-r--r-- 1 root root      200 Oct  2 00:10 00000000000000000000.log
-rw-r--r-- 1 root root 10485756 Oct  2 00:10 00000000000000000000.timeindex
-rw-r--r-- 1 root root        8 Oct  2 00:10 leader-epoch-checkpoint

root@broker:/# /bin/cat /var/lib/kafka/data/Topic-Mon_Oct_01_17_10_56_PDT_2018-1/00000000000000000000.log
???f2Sf2S???????????????SNAPPYwu8?f2???aش??YXwww.pan.cf07599d-d7fe-4e54-8439-cdf38c0cd4d8192.168.2.1
                                                                                                    extra1
                                                                                                          extra2
                                                                                                                extra3
                                                                                                                      extra4

Without Snappy:

?s?$f2<9?f2<9????????????????f2<9??????YXwww.pan.f6dd1983-d072-45f7-b2cf-932a15dc6109192.168.2.1
                                                                                                extra1
                                                                                                      extra2
                                                                                                            extra3
                                                                                                                  extra4





Dev1:


docker run \
  --net=cp-all-in-one_default \
  --rm \
  confluentinc/cp-enterprise-kafka:5.0.0 \
  kafka-topics --describe --topic foo --zookeeper zookeeper:2181

docker run \
  --net=cp-all-in-one_default \
  --rm \
  confluentinc/cp-enterprise-kafka:5.0.0 \
  bash -c "seq 4 | kafka-console-producer --request-required-acks 1 \
  --broker-list broker:9092 --topic foo && echo 'Produced 4 messages.'"


docker run \
  --net=cp-all-in-one_default \
  --rm \
  confluentinc/cp-enterprise-kafka:5.0.0 \
  kafka-console-consumer --bootstrap-server broker:9092 --topic foo --from-beginning --max-messages 42



Avro Schema Registry:

Avro has been chosen as the only supported data format from Confluent Schema Registry.

AVRO Schema Fully Compatible:

Only add fields with defaults.
Only remove fields that have defaults.


List all of the schemas:

jzeng@cloud-dev-one:~$ curl -X GET http://localhost:18081/subjects
["avroTopic","avroTopic-value"]

Get all version:

jzeng@cloud-dev-one:~$ curl -X GET http://localhost:18081/subjects/avroTopic-value/versions

Monitor traffic to schema registry:

SJCMACJ15JHTD8:~ jzeng$ sudo tcpdump -nnvXSs 0 -i en0 dst 10.5.134.22 and port 18081

When a new version of a schema is used, we can see following traffic from client to schema registry (default port 8081, but we are using port 18081):

16:03:50.047302 IP (tos 0x0, ttl 64, id 0, offset 0, flags [none], proto TCP (6), length 585)
    10.54.92.74.50067 > 10.5.134.22.18081: Flags [P.], cksum 0xf9d9 (correct), seq 3948769710:3948770243, ack 4201917593, win 4122, options [nop,nop,TS val 548686738 ecr 2862496893], length 533
      0x0000:  4500 0249 0000 0000 4006 8214 0a36 5c4a  E..I....@....6\J
      0x0010:  0a05 8616 c393 46a1 eb5d 71ae fa74 2c99  ......F..]q..t,.
      0x0020:  8018 101a f9d9 0000 0101 080a 20b4 4b92  ..............K.
      0x0030:  aa9e 3c7d 7b22 7363 6865 6d61 223a 227b  ..<}{"schema":"{
      0x0040:  5c22 7479 7065 5c22 3a5c 2272 6563 6f72  \"type\":\"recor
      0x0050:  645c 222c 5c22 6e61 6d65 5c22 3a5c 2270  d\",\"name\":\"p
      0x0060:  6167 655f 7669 7369 745c 222c 5c22 6e61  age_visit\",\"na
      0x0070:  6d65 7370 6163 655c 223a 5c22 6578 616d  mespace\":\"exam
      0x0080:  706c 652e 6176 726f 5c22 2c5c 2266 6965  ple.avro\",\"fie
      0x0090:  6c64 735c 223a 5b7b 5c22 6e61 6d65 5c22  lds\":[{\"name\"
      0x00a0:  3a5c 2274 696d 655c 222c 5c22 7479 7065  :\"time\",\"type
      0x00b0:  5c22 3a5c 226c 6f6e 675c 227d 2c7b 5c22  \":\"long\"},{\"
      0x00c0:  6e61 6d65 5c22 3a5c 2273 6974 655c 222c  name\":\"site\",
      0x00d0:  5c22 7479 7065 5c22 3a5c 2273 7472 696e  \"type\":\"strin
      0x00e0:  675c 227d 2c7b 5c22 6e61 6d65 5c22 3a5c  g\"},{\"name\":\
      0x00f0:  2269 705c 222c 5c22 7479 7065 5c22 3a5c  "ip\",\"type\":\
      0x0100:  2273 7472 696e 675c 227d 2c7b 5c22 6e61  "string\"},{\"na
      0x0110:  6d65 5c22 3a5c 2261 6464 6974 696f 6e61  me\":\"additiona
      0x0120:  6c69 6e66 6f5c 222c 5c22 7479 7065 5c22  linfo\",\"type\"
      0x0130:  3a5b 5c22 6e75 6c6c 5c22 2c5c 2273 7472  :[\"null\",\"str
      0x0140:  696e 675c 225d 2c5c 2264 6566 6175 6c74  ing\"],\"default
      0x0150:  5c22 3a6e 756c 6c7d 2c7b 5c22 6e61 6d65  \":null},{\"name
      0x0160:  5c22 3a5c 2261 6464 6974 696f 6e61 6c69  \":\"additionali
      0x0170:  6e66 6f32 5c22 2c5c 2274 7970 655c 223a  nfo2\",\"type\":
      0x0180:  5b5c 226e 756c 6c5c 222c 5c22 7374 7269  [\"null\",\"stri
      0x0190:  6e67 5c22 5d2c 5c22 6465 6661 756c 745c  ng\"],\"default\
      0x01a0:  223a 6e75 6c6c 7d2c 7b5c 226e 616d 655c  ":null},{\"name\
      0x01b0:  223a 5c22 6164 6469 7469 6f6e 616c 696e  ":\"additionalin
      0x01c0:  666f 335c 222c 5c22 7479 7065 5c22 3a5b  fo3\",\"type\":[
      0x01d0:  5c22 6e75 6c6c 5c22 2c5c 2273 7472 696e  \"null\",\"strin
      0x01e0:  675c 225d 2c5c 2264 6566 6175 6c74 5c22  g\"],\"default\"
      0x01f0:  3a6e 756c 6c7d 2c7b 5c22 6e61 6d65 5c22  :null},{\"name\"
      0x0200:  3a5c 2261 6464 6974 696f 6e61 6c69 6e66  :\"additionalinf
      0x0210:  6f34 5c22 2c5c 2274 7970 655c 223a 5b5c  o4\",\"type\":[\
      0x0220:  226e 756c 6c5c 222c 5c22 7374 7269 6e67  "null\",\"string
      0x0230:  5c22 5d2c 5c22 6465 6661 756c 745c 223a  \"],\"default\":
      0x0240:  6e75 6c6c 7d5d 7d22 7d                   null}]}"}



Consumer in long polling mode:
Long polling is a simple technique for reading data from a server. The client browser makes a normal request, but the server delays responding if it does not have any new data. Once new information becomes available, it is sent to the client, the client does something with the data and then starts a new long polling request. Thus the client always keeps one long polling request open to the server and gets new data as soon as it is available.


You don't have to enable it per say since it is the default behavior of the kafka consumer. What you need to set in your configuration is fetch.wait.max.ms.
Two values are important to achieve long polling:
·       fetch.min.bytes: The broker will wait for this amount of data to fill BEFORE it sends the response to the consumer client.  Default value is 1 byte.
·       fetch.wait.max.ms: The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes.  Default value is 500 ms.  To support long polling, we need to increase this number so we will not poll for the event very frequently when having no data to consume.

Look for more configuration options in Kafka Consumer Configs
The kafka user mailing list is also good option for questions like this.

More Partitions Requires More Open File Handles:

Each partition maps to a directory in the file system in the broker. Within that log directory, there will be two files (one for the index and another for the actual data) per log segment. Currently, in Kafka, each broker opens a file handle of both the index and the data file of every log segment. So, the more partitions, the higher that one needs to configure the open file handle limit in the underlying operating system. This is mostly just a configuration issue. We have seen production Kafka clusters running with more than 30 thousand open file handles per broker.

Number of Partition:

A rough formula for picking the number of partitions is based on throughput. You measure the throughout that you can achieve on a single partition for production (call it p) and consumption (call it c). Let’s say your target throughput is t. Then you need to have at least max(t/p, t/c) partitions. 
In general, one can produce at 10s of MB/sec on just a single partition as shown in this benchmark. The consumer throughput is often application dependent since it corresponds to how fast the consumer logic can process each message. So, you really need to measure it.


As a rule of thumb, if you care about latency, it’s probably a good idea to limit the number of partitions per broker to 100 x b x rwhere b is the number of brokers in a Kafka cluster and r is the replication factor.


Kafka fundamentals:

Pagechache-centric design:

sequential disk access can in some cases be faster than random memory access!  The performance of linear writes on a JBOD configuration with six 7200rpm SATA RAID-5 array is about 600MB/sec but the performance of random writes is only about 100k/sec—a difference of over 6000X.  Sequential disk access has the advantage that all operations are O(1) and reads do not block writes or each other. This has obvious performance advantages since the performance is completely decoupled from the data size—one server can now take full advantage of a number of cheap, low-rotational speed 1+TB SATA drives.

A modern OS will happily divert all free memory to disk caching with little performance penalty when the memory is reclaimed. All disk reads and writes will go through this unified cache (pagecache). 

Rather than maintain as much as possible in-memory and flush it all out to the filesystem in a panic when we run out of space, we invert that. All data is immediately written to a persistent log on the filesystem without necessarily flushing to disk. In effect this just means that it is transferred into the kernel's pagecache.


Kafka interaction with the local disk has two characteristic: 1) it appends new writes to an open file(i.e., the message-log-file) and 2) it usually reads continues area from disk.  This characteristic lets Kafka rely on OS disk cache for optimizing its reads from and writes to disk.  The OS cache is highly efficient, always enabled and it’s optimized for appends calls and for multi reads from a continues area on disk. More advantages of using OS level cache is that the Kafka’s processes uses little JVM’s heap space so avoiding all GC related issues, and in case the broker restart its does not need to run any cache-warm-up as the OS cache is not cleared when process terminates.


Batching messages:

Batching leads to larger network packets, larger sequential disk operations, contiguous memory blocks, and so on, all of which allows Kafka to turn a bursty stream of random message writes into linear writes that flow to the consumers. This simple optimization produces orders of magnitude up.

Two batch related producer parameters:

batch.size: batch size in bytes.  Default is 16384 (16k).  records sent to same partition will be batched.
linger.ms: wait for such milliseconds if total bytes of record is less than ‘batch.size’.  default is 0.

John’s comment: Batch should be handledd in producer.

Zero-Copy:

Modern unix operating systems offer a highly optimized code path for transferring data out of pagecache to a socket; in Linux this is done with the sendfile system call.

The Java transferTo() method transfers data from the file channel to the given writable byte channel. Internally, it depends on the underlying operating system's support for zero copy; in UNIX and various flavors of Linux, this call is routed to the sendfile() system call


End-to-end Batch Compression:

A batch of messages can be clumped together compressed and sent to the server in this form. This batch of messages will be written in compressed form and will remain compressed in the log and will only be decompressed by the consumer.


Partition Assignment Strategy

partition.assignment.strategy: default is RangeAssignor which works on per-topic basis.

For subscribing multiple topics, better to use RoundRobinAssignor if the number of partitions for each topic are not even. 



 The partition is split into segments

When Kafka writes to a partition, it writes to a segment — the active segment. If the segment’s size limit is reached, a new segment is opened and that becomes the new active segment.

On disk, a partition is a directory and each segment is an index file and a log file.

The data format on disk is exactly the same as what the broker receives from the producer over the network and sends to its consumers. This allows Kafka to efficiently transfer data with zero copy.


Consumer rebalance

Rebalancing is the process where a group of consumer instances (belonging to the same group) co-ordinate to own a mutually exclusive set of partitions of topics that the group is subscribed to. At the end of a successful rebalance operation for a consumer group, every partition for all subscribed topics will be owned by a single consumer instance within the group. The way rebalancing works is as follows. Every broker is elected as the coordinator for a subset of the consumer groups. The co-ordinator broker for a group is responsible for orchestrating a rebalance operation on consumer group membership changes or partition changes for the subscribed topics. It is also responsible for communicating the resulting partition ownership configuration to all consumers of the group undergoing a rebalance operation.

This happens when a new consumer is added, or existing consume is failed (consume failed to send heartbeat for a period of time).

Kafka Consumer Failover

If a consumer fails after processing the record but before sending the commit to the broker, then some Kafka records could be reprocessed. In this scenario, Kafka implements the at least once behavior, and you should make sure the messages (record deliveries ) are idempotent.


Log Compaction and retention

Log compaction retains at least the last known value for each record key for a single topic partition.


Change “cleanup.policy” attribute to  switch from default ‘delete’ to ‘compact’.

When its value is ‘delete’, the retention policy is controlled by following two parameters:

retention.bytes: controls the maximum size a partition (which consists of log segments) can grow to before we will discard old log segments to free up space.  Default value is -1.

retention.ms: controls the maximum time we will retain a log before we will discard old log segments to free up space.  Default value is 604800000(7 days)



Set attributes to overwrite topic’s default properties:

final NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);



// override default property for topic

Map<String, String> customConfig = new HashMap<>();

customConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);

newTopic.configs(customConfig);

Create a new topic:

// Create topic, which is async call.

final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));



// Since the call is Async, Lets wait for it to complete.

KafkaFuture future = createTopicsResult.values().get(topicName);

future.get();

System.out.println("New topic " + topicName + " is created.");


Get how many partitions a topic has from consumer:

List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topicName);

int partitionCount = 0;

for (PartitionInfo partitionInfo : partitionInfoList) {

    System.out.println("Partition ID is " + partitionInfo.partition());

    partitionCount++;

}

System.out.println("-------Total partitions for topic " + Topics.getTopic() + " are " + partitionCount);

/**

 * Increase partitions for the current topic

 */

NewPartitions newPartitionRequest = NewPartitions.increaseTo(6);

adminClient.createPartitions(Collections.singletonMap(topicName,newPartitionRequest)).all().get();

/**

 * Read partition info back through adminClient

 */

DescribeTopicsResult topicResults = adminClient.describeTopics(Collections.singletonList(topicName));

TopicDescription topicDescrition = topicResults.values().get(topicName).get();

List<TopicPartitionInfo> partitionInfoList =topicDescrition.partitions();

int partitionCount = 0;

for (TopicPartitionInfo partitionInfo : partitionInfoList) {

    System.out.println("Partition ID (from adminClient) is " + partitionInfo.partition());

    partitionCount++;

}

System.out.println("-------Total partitions (from adminClient) for topic " + Topics.getTopic() + " are " + partitionCount);

/**

 * Get cluster info through adminClient

 */

DescribeClusterResult clusterResult = adminClient.describeCluster();

Collection<Node> nodes = clusterResult.nodes().get();

for (Node node : nodes) {

    System.out.println("Node host: " + node.host() + ", node rack: " + node.rack() + ", node port: " + node.port());

}


To enable auto AVRO class generation through AVRO schema, add following two plugin(s) and one dependency to pom.xml:

<plugin>

    <groupId>org.apache.avro</groupId>

    <artifactId>avro-maven-plugin</artifactId>

    <version>${avro.version}</version>

    <executions>

        <execution>

            <phase>generate-sources</phase>

            <goals>

                <goal>schema</goal>

                <goal>protocol</goal>

                <goal>idl-protocol</goal>

            </goals>

            <configuration>

                <sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>

                <stringType>String</stringType>

                <createSetters>false</createSetters>

                <fieldVisibility>private</fieldVisibility>

            </configuration>

        </execution>

    </executions>

</plugin>

<plugin>

    <groupId>org.codehaus.mojo</groupId>

    <artifactId>build-helper-maven-plugin</artifactId>

    <version>3.0.0</version>

    <executions>

        <execution>

            <id>add-source</id>

            <phase>generate-sources</phase>

            <goals>

                <goal>add-source</goal>

            </goals>

            <configuration>

                <sources>

                    <source>targe/generated-sources/avro</source>

                </sources>

            </configuration>

        </execution>

    </executions>

</plugin>

<dependency>

    <groupId>org.apache.servicemix.bundles</groupId>

    <artifactId>org.apache.servicemix.bundles.avro</artifactId>

    <version>1.8.2_1</version>

</dependency>


Tools

AVRO tools:


java -jar avro-tools-1.8.2.jar tojson {avro_file_name}
java -jar avro-tools-1.8.2.jar fromjson {json_file_name}


Kafka Manager:


Does not support Kafka 2.0 at this moment (Oct 9, 2018)


Kafka Topic UI from Landoop:


This does not work for me for the installation I had from Connfluent


Testing and JUnit:


import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import org.apache.curator.test.InstanceSpec;
import org.apache.curator.test.TestingServer;


Performance:

# generate messages (up to 10000) in rate of 10 per second (the throughput)
bin/kafka-producer-perf-test.sh –topic {topic-name} --num-records 10000 --throughput 10 --payload-file {any-text-file-in-size-10k} --producer-props acks=1 bootstrap.servers={kafka-server-hostname}:{port} --payload-delimiter A

# generate 10kB of random data
base64 /dev/urandom | head -c 10000 | egrep -ao "\\w" | tr -d '\n' > file10KB.txt

# consume them
bin/kafka-console-consumer.sh --bootstrap-servers ={kafka-server-hostname}:{port} –topic {topic-name}


Factors impacting Kafka performance:

Disk IO:

·       Format your drivers as XFS (easiest, no tuning required)
·       If read/write throughput is the bottleneck: mount multiple disks with config parameter log.dirs:
log.dirs=/disk1/kafka-logs,/disk2/kafka-logs,….
·       Kafka performance is constant with regards to the amount of data stored in Kafka so make sure you expire data fast enough (default is one week)

Network:

·       Ensure our Kafka instances and Zookeeper instances are geographically close.
·       Do not put one broker in Europe and another broker in the US
·       Network bandwidth is key: make sure you have enough bandwidth to handle many connections, and TCP requests.

RAM:

·       Assign a MAX amount (-Xmx) of 4GB and do not set -Xms:
export KAFKA_HEAP_OPTS=”-Xmx 4g”
·       Keep  a low Java heap usage over time, and increase it only if you are more partitions in your broker.
·       The remaining RAM will be used automatically for Linux OS Page Cache.  This is used to buffer data to the  disk and this is what gives Kafka an amazing performance.  Any un-used memory will automatically be leveraged by the Linux OS and assign them to page cache.
·       Make sure swapping is disabled for Kafka entirely (default is 60):
vm.swappiness=0 or vm.swappiness=1

CPU:

·       If you enable SSL, Kafka has to use CPU to encrypt and decrypt every payload.
·       Make sure your producer and consumers (not Kafka itself) to do compression work (that is the default setting)
·       Monitor Garbage Collection over time to ensure the pauses are not too long.


OS:

·       Not use Windows in production
·       Increase the file descriptor limits (at least 100,000 as a starting point). Kafka opens 3 file descriptor for each topic-partition-segment that lives on the broker.
·       Make sure only Kafka is running on that machine, not other service.


Others:

·       Use Java 8
·       Set Kafka Quotas:
It is still possible for some producers and consumers to produce/consume huge volume of data and hence monopolize broker resources and cause network saturation. These producers/consumers can result into DOS (Denial of Service) for other producers/consumers and thus impacting other applications. This problem gets more severe in case of large multi-tenant clusters where a small set of bad clients can degrade user experience for the well behaved ones.
This problem can be addressed by having quotas to limit broker resources and network bandwidth to producers and consumers. In fact, when running Kafka as a service, Quotas become even more useful to enforce API limits according to an agreed upon contract.


How Do Quotas work?



Quotas in Apache Kafka were introduced in 0.9 version. Quotas are enforced at the Kafka broker level where each unique client id receives a fixed quota in bytes/second. This means each unique client can publish/fetch a maximum of X bytes/second per broker before it gets throttled.
Client byte rate is measured over multiple small windows (e.g. 30 windows of 1 second each) in order to detect and correct quota violations quickly.
It is important to note that Kafka brokers do not throw any error when it detects a quota violation. Instead it attempts to slow down a client exceeding quota by delaying the response. Amount of delay is computed by brokers in a way that client does not violate its quota.
Since, Kafka brokers do not throw any error on quota violation, there is no need of retry mechanism at client side. However, when looking for performance issues, quotas should be kept in mind and changed to get required throughput. 

Configuring Parameters for Enforcing Quotas



Quotas are enforced at broker level and by default, there is a fixed quota assigned to each unique client. These default quotas can be changed in Kafka configuration file and require Kafka broker restart. For example, below configuration parameters can be used to set producer/consumer quota to 50 MB/sec -
# Sets producer quota to 50 MB
quota.producer.default=52428800
 
# Sets consumer quota to 50 MB
quota.consumer.default=52428800

However, we may need to configure different quota for different clients at run time without any restart of Kafka brokers. Fortunately, Kafka provides us with a mechanism to override quotas at client level without any restart. For example, below commands can be executed from Kafka broker home directory to configure client with id "test-client" with producer quota as 10 MB and consumer quota as 20 MB -
# Adds configuration for client with id test-client
./bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=10485760,consumer_byte_rate=20971520' --entity-name test-client --entity-type clients

Once updated, you can also verify your configurations using below describe command -
# Describe configurations
./bin/kafka-configs.sh  --zookeeper localhost:2181 --describe --entity-name test-client --entity-type clients
 
#Output of above command will be as below
Configs for clients:test-client are producer_byte_rate=10485760,consumer_byte_rate=20971520

How to change Kafka configuration:

Change the servers.properties file and restart Kafka for all brokers.





Debug and Logging:

Code to dump all attributes for a topic:
 
DescribeConfigsResult configResults =

        adminClient.describeConfigs(

                Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, topicName)));



Map<ConfigResource, KafkaFuture<Config>> configs = configResults.values();

for (Map.Entry<ConfigResource, KafkaFuture<Config>> config : configs.entrySet() ) {

    System.out.println("Topic config for " + config.getKey() + " : ");

    Collection<ConfigEntry> configEntries = config.getValue().get().entries();

    for (ConfigEntry configEntry : configEntries) {

        System.out.println("    " + configEntry.name() + "-->" + configEntry.value() + " (" + (configEntry.isDefault()?"Default":"Non-default") + ")");

    }

}
Results from above sample code:

Topic config for ConfigResource{type=TOPIC, name='Topic-2043e083-676d-4371-81a2-72870fdb20d5'} :
    compression.type-->producer (Default)
    leader.replication.throttled.replicas--> (Default)
    message.downconversion.enable-->true (Default)
    min.insync.replicas-->1 (Default)
    segment.jitter.ms-->0 (Default)
    cleanup.policy-->delete (Default)
    flush.ms-->9223372036854775807 (Default)
    follower.replication.throttled.replicas--> (Default)
    segment.bytes-->1073741824 (Default)
    retention.ms-->604800000 (Default)
    flush.messages-->9223372036854775807 (Default)
    message.format.version-->2.0-IV1 (Default)
    file.delete.delay.ms-->60000 (Default)
    max.message.bytes-->1000012 (Default)
    min.compaction.lag.ms-->0 (Default)
    message.timestamp.type-->CreateTime (Default)
    preallocate-->false (Default)
    min.cleanable.dirty.ratio-->0.5 (Default)
    index.interval.bytes-->4096 (Default)
    unclean.leader.election.enable-->false (Default)
    retention.bytes-->-1 (Default)
    delete.retention.ms-->86400000 (Default)
    segment.ms-->604800000 (Default)
    message.timestamp.difference.max.ms-->9223372036854775807 (Default)
    segment.index.bytes-->10485760 (Default)

 Pulsar

Another Kafka


Kafka Stream

KStream à KTable





No comments:

Post a Comment