Kafkacat utility:
jzeng@cloud-dev-one:~$ kafkacat -b 10.5.134.22:31090 -L
Metadata for all topics (from broker -1:
10.5.134.22:31090/bootstrap):
3 brokers:
broker 2 at
172.17.0.27:9092
broker 1 at
172.17.0.26:9092
broker 0 at
172.17.0.23:9092
4 topics:
topic
"__confluent.support.metrics" with 1 partitions:
partition 0, leader 1,
replicas: 1,0, isrs: 1
topic
"__consumer_offsets" with 50 partitions:
partition 23, leader 2,
replicas: 0,2,1, isrs: 2,1
partition 41, leader 2, replicas: 0,2,1, isrs:
2,1
partition 32, leader 1,
replicas: 0,1,2, isrs: 2,1
partition 8, leader 1,
replicas: 0,1,2, isrs: 2,1
partition 17, leader 2,
replicas: 0,2,1, isrs: 2,1
partition 44, leader 1,
replicas: 0,1,2, isrs: 2,1
partition 35, leader 2,
replicas: 0,2,1, isrs: 2,1
……………
partition 15, leader 1,
replicas: 1,0,2, isrs: 2,1
partition 42, leader 1,
replicas: 1,2,0, isrs: 2,1
partition 24, leader 1,
replicas: 1,2,0, isrs: 2,1
partition 33, leader 1,
replicas: 1,0,2, isrs: 2,1
partition 6, leader 1,
replicas: 1,2,0, isrs: 2,1
partition 0, leader 1,
replicas: 1,2,0, isrs: 2,1
topic "test1"
with 1 partitions:
partition 0, leader 1,
replicas: 1, isrs: 1
topic "test"
with 1 partitions:
partition 0, leader 1,
replicas: 1, isrs: 1
Zookeeper:
Lets consider another example where goal is to
store some configurations in a <K,V> format and make it available across
cluster of machines. The <K,V> should be persistent aka disk based and
should be HA or Replicated and fault tolerant. ZooKeeper is a 'natural' for
this use case.
Once zNodes are
created with the desired path you can use GET and SET to use it as a distributed
<K,V> store or hashmap.
Zookeeper
is designed as a high read, high throughput system for small keys. It is
not designed as a large data store to hold very large data values.
For
Kafka, Zookeeper’s role is:
1.
Broker
registration, with heartbeat mechanism to keep the list current.
2.
Maintaining
a list of topics alongside
·
Their
configuration (partitions, replication factor, additional configurations, etc)
·
The list
of ISRs (in sync replicas) for partitions
3.
Performing
leader elections in case some brokers go down.
4.
Storing
the Kafka cluster id (randomly generated at 1st startup of cluster.
5.
Store
ACLs if security is enabled
·
Topics
·
Consumer
Groups
·
Users
6.
Quotas
configuration if enabled
Zookeeper command line
interface:
root@zookeeper:/bin# zookeeper-shell localhost:2181
Connecting to localhost:2181
Welcome to ZooKeeper!
JLine support is enabled
[zk: localhost:2181(CONNECTING) 0]
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
Ls /
[schema_registry, cluster, controller_epoch, controller, brokers,
zookeeper, admin, isr_change_notification, consumers,
log_dir_event_notification, latest_producer_id_block, config]
[zk: localhost:2181(CONNECTED) 15] ls /schema_registry
[schema_registry_master, schema_id_counter]
[zk: localhost:2181(CONNECTED) 16] get /schema_registry/schema_registry_master
{"host":"schema-registry","port":8081,"master_eligibility":true,"scheme":"http","version":1}
Zookeep
4 letter words:
conf
(configuration)
cons
(connections)
dump
(leader only, not working for followers)
envi. (environment)
srvr
(server)
stat
(statistics)
wchs
(watches)
mntr
(monitoring a list of variables for health of the quorum)
For
example:
[zk: localhost:2181(CONNECTED) 2] root@zookeeper:/bin# echo
"conf" | nc localhost 2181
clientPort=2181
dataDir=/var/lib/zookeeper/data/version-2
dataLogDir=/var/lib/zookeeper/log/version-2
tickTime=2000
maxClientCnxns=60
minSessionTimeout=4000
maxSessionTimeout=40000
serverId=0
zookeeper
files:
root@zookeeper:/var/lib/zookeeper# ls
data log
zoonavigator (Zookeeper
UI):
SJCMACJ15JHTD8:kafka-single-node jzeng$ docker-compose -f
zoonavigator.docker-compose.yml up -d
Maven:
Add
following to top level element of pom.xml:
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>assemble-all</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <configuration> <archive> <manifest> <addClasspath>true</addClasspath> <mainClass>fully.qualified.MainClass</mainClass> </manifest> </archive> </configuration> </plugin>
</plugins>
</build>
</build>
Add following to
‘dependencies’ element of pom.xml:
<properties> <kafka.version>0.11.0.2</kafka.version> <kafka.scala.version>2.10</kafka.scala.version> <confluent.version>3.2.1</confluent.version> <avro.version>1.7.7</avro.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties>
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.1</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>${avro.version}</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>${avro.version}</version> </dependency> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-avro-serializer</artifactId> <version>${confluent.version}</version> </dependency>
SJCMACJ15JHTD8:Kafka-Producer jzeng$ pwd
/Users/jzeng/pan/kafka/java/samples/Kafka-Producer
SJCMACJ15JHTD8:Kafka-Producer
jzeng$ mvn archetype:generate -DgroupId=com.pan.app -DartifactId=Kafka-Producer
-DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
SJCMACJ15JHTD8:Kafka-Producer
jzeng$ mvn package
SJCMACJ15JHTD8:Kafka-Producer
jzeng$ mvn install
………
[INFO] Installing
/Users/jzeng/pan/kafka/java/samples/Kafka-Producer/target/Kafka-Producer-1.0-SNAPSHOT.jar
to /Users/jzeng/.m2/repository/com/pan/app/Kafka-Producer/1.0-SNAPSHOT/Kafka-Producer-1.0-SNAPSHOT.jar
[INFO] Installing
/Users/jzeng/pan/kafka/java/samples/Kafka-Producer/pom.xml to
/Users/jzeng/.m2/repository/com/pan/app/Kafka-Producer/1.0-SNAPSHOT/Kafka-Producer-1.0-SNAPSHOT.pom
………
mvn dependency:resolve
Download source jar and java doc through maven:
mvn dependency:sources
mvn dependency:resolve -Dclassifier=javadoc
Kafka
Confluent: https://docs.confluent.io/
Depending on the
specific hardware and its performance characteristics, a single broker can
easily handle thousands of partitions and millions of messages per second.
Installation of Confluent
Install Docker CE for Mac:
Download confluent from their website and unzip it.
Run docker-compose from the folder which has docker-compose.yml
file (under
cp-all-in-one folder)
SJCMACJ15JHTD8:cp-all-in-one jzeng$ docker-compose up -d
Or
SJCMACJ15JHTD8:cp-all-in-one jzeng$ docker-compose start
Sample docker-compose.yml
file:
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.0.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image:
confluentinc/cp-enterprise-kafka:5.0.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS:
PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_METRIC_REPORTERS:
io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID:
'anonymous'
schema-registry:
image:
confluentinc/cp-schema-registry:5.0.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME:
schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
docker network create confluent
docker run -d \
--net=confluent \
--name=zookeeper \
-e ZOOKEEPER_CLIENT_PORT=2181 \
confluentinc/cp-zookeeper:5.0.0
docker run -d -P \
--net=confluent \
--name=kafka \
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
\
-e KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka:21090,EXTERNAL://192.168.64.2:9020 \
-e KAFKA_LISTENERS=INTERNAL://0.0.0.0:19092,EXTERNAL://0.0.0.0:9092
\
-e KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
confluentinc/cp-kafka:5.0.0
docker run \
--net=confluent \
--rm confluentinc/cp-kafka:5.0.0 \
kafka-topics --create --topic foo --partitions 1
--replication-factor 1 \
--if-not-exists --zookeeper zookeeper:2181
docker run \
--net=confluent \
--rm \
confluentinc/cp-kafka:5.0.0 \
kafka-topics --describe --topic foo --zookeeper zookeeper:2181
docker run \
--net=confluent \
--rm \
confluentinc/cp-kafka:5.0.0 \
bash -c "seq 42 | kafka-console-producer --request-required-acks 1
\
--broker-list kafka:9092 --topic foo && echo 'Produced 42
messages.'"
docker run \
--net=confluent \
--rm \
confluentinc/cp-kafka:5.0.0 \
kafka-console-consumer --bootstrap-server kafka:9092 --topic foo --from-beginning --max-messages 42
Login to docker machine:
docker-machine ssh
Kafka settings and data files:
jzeng@cloud-dev-one:~$ docker ps | grep confluent
85bc6a431874
confluentinc/cp-enterprise-control-center:5.0.0 "/etc/confluent/dock…" 10 days ago Up 10 days 0.0.0.0:29021->9021/tcp
control-center
63d3b0b70cb9
confluentinc/ksql-examples:5.0.0 "bash -c 'echo
Waiti…" 10 days ago Up 10 days
ksql-datagen
0cef7d5aab21
confluentinc/cp-ksql-server:5.0.0
"/etc/confluent/dock…" 10 days ago Up 10 days 0.0.0.0:28088->8088/tcp
ksql-server
5c3aafb10e81
confluentinc/cp-kafka-connect:5.0.0
"/etc/confluent/dock…"
10 days ago Up 10
days 9092/tcp,
0.0.0.0:28083->8083/tcp
connect
5a8f48e0eec0
confluentinc/cp-kafka-rest:latest
"/etc/confluent/dock…"
10 days ago Up 10
days
0.0.0.0:28082->8082/tcp
rest-proxy
2119e8586cce
confluentinc/cp-schema-registry:5.0.0
"/etc/confluent/dock…"
10 days ago Up 10
days
0.0.0.0:28081->8081/tcp
schema-registry
c587480e6dc9
confluentinc/cp-enterprise-kafka:5.0.0
"/etc/confluent/dock…"
10 days ago Up 10 days 9092/tcp,
0.0.0.0:21090->21090/tcp
broker
b99bf867d36a confluentinc/cp-zookeeper:5.0.0 "/etc/confluent/dock…" 2 weeks ago Up 10 days 2181/tcp, 2888/tcp, 3888/tcp
jzeng@cloud-dev-one:~$
docker exec -it 51c5b8e177ae /bin/bash
root@broker:/#
/bin/cat /etc/kafka/server.properties
……
log.dirs=/var/lib/kafka
……
root@broker:/# ls /var/lib/kafka/data
NewTopic-0
NewTopic-1
Topic-03792e3e-3a57-4dc8-a807-8ab4f2246d21-0
Topic-03792e3e-3a57-4dc8-a807-8ab4f2246d21-1
Topic-0dd3d18e-6954-410e-87de-963029bcce32-0
Topic-0dd3d18e-6954-410e-87de-963029bcce32-1
Topic-20064df9-ef83-4853-bb0b-a6354473904a-0
Topic-20064df9-ef83-4853-bb0b-a6354473904a-1
Topic-2043e083-676d-4371-81a2-72870fdb20d5-0
Topic-2043e083-676d-4371-81a2-72870fdb20d5-1
Topic-403432ac-0e2e-4bec-a84a-f57976598fdd-0
Topic-403432ac-0e2e-4bec-a84a-f57976598fdd-1
Topic-4c29e523-5f11-48a0-b6ca-7c2d00a3c011-0
Topic-4c29e523-5f11-48a0-b6ca-7c2d00a3c011-1
Topic-89ad125b-8af4-44b5-b237-158a47a03fe1-0
Topic-89ad125b-8af4-44b5-b237-158a47a03fe1-1
Topic-939583f2-0cac-4d41-828a-a50881688a81-0
Topic-939583f2-0cac-4d41-828a-a50881688a81-1
Topic-9cf270f0-4706-419b-9ca2-e5a43c86aa68-0
Topic-9cf270f0-4706-419b-9ca2-e5a43c86aa68-1
Topic-Mon_Oct_01_17_07_49_PDT_2018-0
Topic-Mon_Oct_01_17_07_49_PDT_2018-1
Topic-Mon_Oct_01_17_07_49_PDT_2018-2
Topic-Mon_Oct_01_17_10_56_PDT_2018-0
Topic-Mon_Oct_01_17_10_56_PDT_2018-1
Topic-Mon_Oct_01_17_10_56_PDT_2018-2 ß topic name and partition
root@broker:/# ls -l
/var/lib/kafka/data/Topic-Mon_Oct_01_17_10_56_PDT_2018-1
total 8
-rw-r--r-- 1 root root 10485760 Oct 2 00:10 00000000000000000000.index
-rw-r--r-- 1 root root
200 Oct 2 00:10
00000000000000000000.log
-rw-r--r-- 1 root root 10485756 Oct 2 00:10 00000000000000000000.timeindex
-rw-r--r-- 1 root root
8 Oct 2 00:10
leader-epoch-checkpoint
root@broker:/#
/bin/cat
/var/lib/kafka/data/Topic-Mon_Oct_01_17_10_56_PDT_2018-1/00000000000000000000.log
???f2Sf2S???????????????SNAPPYwu8?f2???aش??YXwww.pan.cf07599d-d7fe-4e54-8439-cdf38c0cd4d8192.168.2.1
extra1
extra2
extra3
extra4
Without Snappy:
?s?$f2<9?f2<9????????????????f2<9??????YXwww.pan.f6dd1983-d072-45f7-b2cf-932a15dc6109192.168.2.1
extra1
extra2
extra3
extra4
Dev1:
docker run \
--net=cp-all-in-one_default \
--rm \
confluentinc/cp-enterprise-kafka:5.0.0 \
kafka-topics --describe --topic foo --zookeeper zookeeper:2181
docker run \
--net=cp-all-in-one_default \
--rm \
confluentinc/cp-enterprise-kafka:5.0.0 \
bash -c "seq 4 | kafka-console-producer --request-required-acks 1 \
--broker-list broker:9092 --topic foo && echo 'Produced 4
messages.'"
docker run \
--net=cp-all-in-one_default \
--rm \
confluentinc/cp-enterprise-kafka:5.0.0 \
kafka-console-consumer --bootstrap-server broker:9092 --topic foo --from-beginning --max-messages 42
Avro Schema Registry:
Avro has
been chosen as the only
supported data format from Confluent Schema Registry.
AVRO Schema Fully Compatible:
Only add fields with
defaults.
Only remove fields that have
defaults.
List all of the schemas:
jzeng@cloud-dev-one:~$ curl -X GET http://localhost:18081/subjects
["avroTopic","avroTopic-value"]
Get all version:
jzeng@cloud-dev-one:~$
curl -X GET http://localhost:18081/subjects/avroTopic-value/versions
Monitor traffic to schema registry:
SJCMACJ15JHTD8:~
jzeng$ sudo tcpdump -nnvXSs 0 -i en0 dst 10.5.134.22 and port 18081
When a new version
of a schema is used, we can see following traffic from client to schema
registry (default port 8081, but we are using port 18081):
16:03:50.047302 IP (tos 0x0, ttl 64, id 0, offset 0, flags [none],
proto TCP (6), length 585)
10.54.92.74.50067 >
10.5.134.22.18081: Flags [P.], cksum 0xf9d9 (correct), seq
3948769710:3948770243, ack 4201917593, win 4122, options [nop,nop,TS val
548686738 ecr 2862496893], length 533
0x0000: 4500 0249 0000 0000 4006 8214 0a36 5c4a E..I....@....6\J
0x0010: 0a05 8616 c393 46a1 eb5d 71ae fa74 2c99 ......F..]q..t,.
0x0020: 8018 101a f9d9 0000 0101 080a 20b4 4b92 ..............K.
0x0030: aa9e 3c7d 7b22 7363 6865 6d61 223a 227b ..<}{"schema":"{
0x0040: 5c22 7479 7065 5c22 3a5c 2272 6563 6f72 \"type\":\"recor
0x0050: 645c 222c 5c22 6e61 6d65 5c22 3a5c 2270 d\",\"name\":\"p
0x0060: 6167 655f 7669 7369 745c 222c 5c22 6e61 age_visit\",\"na
0x0070: 6d65 7370 6163 655c 223a 5c22 6578 616d mespace\":\"exam
0x0080: 706c 652e 6176 726f 5c22 2c5c 2266 6965 ple.avro\",\"fie
0x0090: 6c64 735c 223a 5b7b 5c22 6e61 6d65 5c22 lds\":[{\"name\"
0x00a0: 3a5c 2274 696d 655c 222c 5c22 7479 7065 :\"time\",\"type
0x00b0: 5c22 3a5c 226c 6f6e 675c 227d 2c7b 5c22 \":\"long\"},{\"
0x00c0: 6e61 6d65 5c22 3a5c 2273 6974 655c 222c name\":\"site\",
0x00d0: 5c22 7479 7065 5c22 3a5c 2273 7472 696e \"type\":\"strin
0x00e0: 675c 227d 2c7b 5c22 6e61 6d65 5c22 3a5c g\"},{\"name\":\
0x00f0: 2269 705c 222c 5c22 7479 7065 5c22 3a5c "ip\",\"type\":\
0x0100: 2273 7472 696e 675c 227d 2c7b 5c22 6e61 "string\"},{\"na
0x0110: 6d65 5c22 3a5c 2261 6464 6974 696f 6e61 me\":\"additiona
0x0120: 6c69 6e66 6f5c 222c 5c22 7479 7065 5c22 linfo\",\"type\"
0x0130: 3a5b 5c22 6e75 6c6c 5c22 2c5c 2273 7472 :[\"null\",\"str
0x0140: 696e 675c 225d 2c5c 2264 6566 6175 6c74 ing\"],\"default
0x0150: 5c22 3a6e 756c 6c7d 2c7b 5c22 6e61 6d65 \":null},{\"name
0x0160: 5c22 3a5c 2261 6464 6974 696f 6e61 6c69 \":\"additionali
0x0170: 6e66 6f32 5c22 2c5c 2274 7970 655c 223a nfo2\",\"type\":
0x0180: 5b5c 226e 756c 6c5c 222c 5c22 7374 7269 [\"null\",\"stri
0x0190: 6e67 5c22 5d2c 5c22 6465 6661 756c 745c ng\"],\"default\
0x01a0: 223a 6e75 6c6c 7d2c 7b5c 226e 616d 655c ":null},{\"name\
0x01b0: 223a 5c22 6164 6469 7469 6f6e 616c 696e ":\"additionalin
0x01c0: 666f 335c 222c 5c22 7479 7065 5c22 3a5b fo3\",\"type\":[
0x01d0: 5c22 6e75 6c6c 5c22 2c5c 2273 7472 696e \"null\",\"strin
0x01e0: 675c 225d 2c5c 2264 6566 6175 6c74 5c22 g\"],\"default\"
0x01f0: 3a6e 756c 6c7d 2c7b 5c22 6e61 6d65 5c22 :null},{\"name\"
0x0200: 3a5c 2261 6464 6974 696f 6e61 6c69 6e66 :\"additionalinf
0x0210: 6f34 5c22 2c5c 2274 7970 655c 223a 5b5c o4\",\"type\":[\
0x0220: 226e 756c 6c5c 222c 5c22 7374 7269 6e67 "null\",\"string
0x0230: 5c22 5d2c 5c22 6465 6661 756c 745c 223a \"],\"default\":
0x0240: 6e75 6c6c 7d5d 7d22 7d null}]}"}
Consumer in long polling mode:
Long
polling is a simple technique for reading data from a server. The client
browser makes a normal request, but the server delays responding if it does not
have any new data. Once new information becomes available, it is sent to the
client, the client does something with the data and then starts a new long
polling request. Thus the client always keeps one long polling request open to
the server and gets new data as soon as it is available.
You don't have to enable it per say since it is the default
behavior of the kafka consumer. What you need to set in your configuration is
fetch.wait.max.ms.
Two values are important to achieve long polling:
·
fetch.min.bytes: The broker will wait for this amount of data to fill BEFORE it
sends the response to the consumer client.
Default value is 1 byte.
·
fetch.wait.max.ms: The maximum amount of time the server will block before answering
the fetch request if there isn't sufficient data to immediately satisfy the
requirement given by fetch.min.bytes. Default value is 500 ms. To support long polling, we need to increase
this number so we will not poll for the event very frequently when having no data
to consume.
The kafka user mailing list is also good option for questions
like this.
More Partitions
Requires More Open File Handles:
Each partition maps
to a directory in the file system in the broker. Within that log directory,
there will be two files (one for the index and another for the actual data) per
log segment. Currently, in Kafka, each broker opens a file handle of both the
index and the data file of every log segment. So, the more partitions, the
higher that one needs to configure the open file handle limit in the underlying
operating system. This is mostly just a configuration issue. We have seen
production Kafka clusters running with more than 30
thousand open file handles per broker.
Number of Partition:
A rough formula for picking the number of partitions is based on
throughput. You measure the throughout that you can achieve on a single
partition for production (call it p)
and consumption (call it c).
Let’s say your target throughput is t.
Then you need to have at least max(t/p, t/c) partitions.
In general, one can produce at 10s of MB/sec on just a single
partition as shown in this benchmark. The consumer throughput
is often application dependent since it corresponds to how fast the consumer
logic can process each message. So, you really need to measure it.
As a rule of thumb, if you care about latency, it’s probably a
good idea to limit the number of
partitions per broker to 100
x b x r, where b is the number of
brokers in a Kafka cluster and r is
the replication factor.
Kafka
fundamentals:
Pagechache-centric design:
sequential disk access can in some cases be faster than
random memory access!
The
performance of linear writes on a JBOD configuration with six
7200rpm SATA RAID-5 array is about 600MB/sec but the performance of random
writes is only about 100k/sec—a difference of over 6000X. Sequential disk access has the
advantage that all operations are O(1) and reads do not block writes or each
other. This has obvious performance advantages since the performance is
completely decoupled from the data size—one server can now take full advantage
of a number of cheap, low-rotational speed 1+TB SATA drives.
A modern OS will happily divert all free memory
to disk caching with little performance penalty when the memory is reclaimed.
All disk reads and writes will go through this unified cache (pagecache).
Rather than maintain as much as possible in-memory and flush it
all out to the filesystem in a panic when we run out of space, we invert that.
All data is immediately written to a persistent log on the filesystem without
necessarily flushing to disk. In effect this just means that it is transferred
into the kernel's pagecache.
Kafka interaction with the local disk has two characteristic: 1) it
appends new writes to an open file(i.e., the message-log-file) and 2)
it usually reads continues area from disk. This characteristic lets Kafka
rely on OS disk cache for optimizing
its reads from and writes to disk. The OS cache is highly efficient,
always enabled and it’s optimized for appends calls and for multi reads from a
continues area on disk. More advantages of using OS level cache is that
the Kafka’s processes uses little JVM’s heap space so avoiding all GC
related issues, and in case the broker restart its does not need to run any
cache-warm-up as the OS cache is not cleared when process terminates.
Batching messages:
Batching leads to larger network packets, larger sequential disk
operations, contiguous memory blocks, and so on, all of which allows Kafka to
turn a bursty stream of random message writes into linear writes that flow to
the consumers. This simple optimization produces orders of magnitude up.
Two batch related producer
parameters:
batch.size: batch size in
bytes. Default is 16384 (16k). records sent to same partition will be
batched.
linger.ms: wait for such
milliseconds if total bytes of record is less than ‘batch.size’. default is 0.
John’s comment: Batch should
be handledd in producer.
Zero-Copy:
Modern unix operating systems offer a highly optimized code path
for transferring data out of pagecache to a socket; in Linux this is done with
the sendfile
system call.
The Java
transferTo()
method transfers data from the file channel to the given writable
byte channel. Internally, it depends on the underlying operating system's
support for zero copy; in UNIX and various flavors of Linux, this call is
routed to the sendfile()
system
call
End-to-end Batch
Compression:
A batch of messages can be clumped together compressed and sent to
the server in this form. This batch of messages will be written in compressed
form and will remain compressed in the log and will only be decompressed by the
consumer.
Partition
Assignment Strategy
partition.assignment.strategy: default is RangeAssignor which works on
per-topic basis.
For subscribing multiple topics, better
to use
RoundRobinAssignor
if
the number of partitions for each topic are not even.
The partition is
split into segments
When Kafka writes to
a partition, it writes to a segment — the active segment. If the segment’s size
limit is reached, a new segment is opened and that becomes the new active
segment.
On disk, a partition
is a directory and each segment is an index file and a log file.
The data format on
disk is exactly the same as what the broker receives from the producer over the
network and sends to its consumers. This allows Kafka to efficiently transfer
data with zero
copy.
Consumer rebalance
Rebalancing is the process where a group of consumer
instances (belonging to the same group) co-ordinate to own a mutually exclusive
set of partitions of topics that the group is subscribed to. At the end of a
successful rebalance operation for a consumer group, every partition for all
subscribed topics will be owned by a single consumer instance within the group.
The way rebalancing works is as follows. Every broker is elected as the
coordinator for a subset of the consumer groups. The co-ordinator broker for a
group is responsible for orchestrating a rebalance operation on consumer group
membership changes or partition changes for the subscribed topics. It is also
responsible for communicating the resulting partition ownership configuration
to all consumers of the group undergoing a rebalance operation.
This happens when a new consumer is added, or existing
consume is failed (consume failed to send heartbeat for a period of time).
Kafka Consumer Failover
If a consumer fails after processing the record
but before sending the commit to the broker, then some Kafka records could be
reprocessed. In this scenario, Kafka implements the at least once behavior, and
you should make sure the messages (record deliveries ) are idempotent.
Log Compaction and
retention
Log compaction retains at least the last known value for
each record key for a single topic partition.
Change “cleanup.policy” attribute to
switch from default ‘delete’ to ‘compact’.
When its value is ‘delete’, the retention
policy is controlled by following two parameters:
retention.bytes: controls the maximum size a partition (which
consists of log segments) can grow to before we will discard old log segments
to free up space. Default value is -1.
retention.ms: controls the maximum time we will
retain a log before we will discard old log segments to free up space. Default value is 604800000(7 days)
Set attributes to overwrite topic’s default
properties:
final NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor); // override default property for topic Map<String, String> customConfig = new HashMap<>(); customConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT); newTopic.configs(customConfig);
Create a new topic:
// Create topic, which is async call. final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic)); // Since the call is Async, Lets wait for it to complete. KafkaFuture future = createTopicsResult.values().get(topicName); future.get(); System.out.println("New topic " + topicName + " is created.");
Get how many partitions a topic has from consumer:
List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topicName); int partitionCount = 0; for (PartitionInfo partitionInfo : partitionInfoList) { System.out.println("Partition ID is " + partitionInfo.partition()); partitionCount++; } System.out.println("-------Total partitions for topic " + Topics.getTopic() + " are " + partitionCount);
/** * Increase partitions for the current topic */ NewPartitions newPartitionRequest = NewPartitions.increaseTo(6); adminClient.createPartitions(Collections.singletonMap(topicName,newPartitionRequest)).all().get();
/** * Read partition info back through adminClient */ DescribeTopicsResult topicResults = adminClient.describeTopics(Collections.singletonList(topicName)); TopicDescription topicDescrition = topicResults.values().get(topicName).get(); List<TopicPartitionInfo> partitionInfoList =topicDescrition.partitions(); int partitionCount = 0; for (TopicPartitionInfo partitionInfo : partitionInfoList) { System.out.println("Partition ID (from adminClient) is " + partitionInfo.partition()); partitionCount++; } System.out.println("-------Total partitions (from adminClient) for topic " + Topics.getTopic() + " are " + partitionCount);
/** * Get cluster info through adminClient */ DescribeClusterResult clusterResult = adminClient.describeCluster(); Collection<Node> nodes = clusterResult.nodes().get(); for (Node node : nodes) { System.out.println("Node host: " + node.host() + ", node rack: " + node.rack() + ", node port: " + node.port()); }
To enable auto AVRO class generation through AVRO
schema, add following two
plugin(s) and one dependency to pom.xml:
<plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>${avro.version}</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>schema</goal> <goal>protocol</goal> <goal>idl-protocol</goal> </goals> <configuration> <sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory> <stringType>String</stringType> <createSetters>false</createSetters> <fieldVisibility>private</fieldVisibility> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>build-helper-maven-plugin</artifactId> <version>3.0.0</version> <executions> <execution> <id>add-source</id> <phase>generate-sources</phase> <goals> <goal>add-source</goal> </goals> <configuration> <sources> <source>targe/generated-sources/avro</source> </sources> </configuration> </execution> </executions> </plugin>
<dependency> <groupId>org.apache.servicemix.bundles</groupId> <artifactId>org.apache.servicemix.bundles.avro</artifactId> <version>1.8.2_1</version> </dependency>
Tools
AVRO tools:
java -jar
avro-tools-1.8.2.jar tojson {avro_file_name}
java -jar avro-tools-1.8.2.jar
fromjson {json_file_name}
Kafka Manager:
Does not support Kafka 2.0 at
this moment (Oct 9, 2018)
Kafka Topic UI from Landoop:
This does not work for me for
the installation I had from Connfluent
Testing and JUnit:
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import org.apache.curator.test.InstanceSpec;
import org.apache.curator.test.TestingServer;
Performance:
# generate messages (up to
10000) in rate of 10 per second (the throughput)
bin/kafka-producer-perf-test.sh
–topic {topic-name} --num-records 10000 --throughput 10 --payload-file
{any-text-file-in-size-10k} --producer-props acks=1 bootstrap.servers={kafka-server-hostname}:{port}
--payload-delimiter A
# generate 10kB of random
data
base64 /dev/urandom
| head -c 10000 | egrep -ao "\\w" | tr -d '\n' > file10KB.txt
# consume them
bin/kafka-console-consumer.sh
--bootstrap-servers ={kafka-server-hostname}:{port} –topic {topic-name}
Factors impacting Kafka performance:
Disk IO:
· Format your drivers as XFS (easiest, no tuning
required)
· If read/write throughput is the bottleneck: mount
multiple disks with config parameter log.dirs:
log.dirs=/disk1/kafka-logs,/disk2/kafka-logs,….
· Kafka performance is constant with regards to the
amount of data stored in Kafka so make sure you expire data fast enough
(default is one week)
Network:
· Ensure our Kafka instances and Zookeeper instances are
geographically close.
· Do not put one broker in Europe and another broker in
the US
· Network bandwidth is key: make sure you have enough
bandwidth to handle many connections, and TCP requests.
RAM:
· Assign a MAX amount (-Xmx) of 4GB and do not set -Xms:
export
KAFKA_HEAP_OPTS=”-Xmx 4g”
· Keep a low Java
heap usage over time, and increase it only if you are more partitions in your
broker.
· The remaining RAM will be used automatically for Linux
OS Page Cache. This is used to buffer
data to the disk and this is what gives
Kafka an amazing performance. Any
un-used memory will automatically be leveraged by the Linux OS and assign them
to page cache.
· Make sure swapping is disabled for Kafka entirely
(default is 60):
vm.swappiness=0
or vm.swappiness=1
CPU:
· If you enable SSL, Kafka has to use CPU to encrypt and
decrypt every payload.
· Make sure your producer and consumers (not Kafka
itself) to do compression work (that is the default setting)
· Monitor Garbage Collection over time to ensure the
pauses are not too long.
OS:
· Not use Windows in production
· Increase the file descriptor limits (at least 100,000
as a starting point). Kafka opens 3 file descriptor for each
topic-partition-segment that lives on the broker.
· Make sure only Kafka is running on that machine, not
other service.
Others:
· Use Java 8
· Set Kafka Quotas:
It is still possible for some producers and consumers to
produce/consume huge volume of data and hence monopolize broker resources and
cause network saturation. These producers/consumers can result into DOS (Denial
of Service) for other producers/consumers and thus impacting other
applications. This problem gets more severe in case of large multi-tenant
clusters where a small set of bad clients can degrade user experience for
the well behaved ones.
This problem can be addressed by having quotas to limit broker
resources and network bandwidth to producers and consumers. In fact, when
running Kafka as a service, Quotas become even more useful to enforce API
limits according to an agreed upon contract.
How
Do Quotas work?
Client byte rate is measured over multiple small windows (e.g. 30 windows of 1 second each) in order to detect and correct quota violations quickly.
It is important to note that Kafka brokers do not throw any error when it detects a quota violation. Instead it attempts to slow down a client exceeding quota by delaying the response. Amount of delay is computed by brokers in a way that client does not violate its quota.
Since, Kafka brokers do not throw any error on quota violation, there is no need of retry mechanism at client side. However, when looking for performance issues, quotas should be kept in mind and changed to get required throughput.
Configuring
Parameters for Enforcing Quotas
# Sets producer quota to 50 MB
quota.producer.default=
52428800
# Sets consumer quota to 50 MB
quota.consumer.default=
52428800
However, we may need to configure different quota for different clients at run time without any restart of Kafka brokers. Fortunately, Kafka provides us with a mechanism to override quotas at client level without any restart. For example, below commands can be executed from Kafka broker home directory to configure client with id "test-client" with producer quota as 10 MB and consumer quota as 20 MB -
# Adds configuration for client with id test-client
./bin/kafka-configs.sh --zookeeper localhost:
2181 --alter --add-config
'producer_byte_rate=10485760,consumer_byte_rate=20971520' --entity-name test-client --entity-type clients
Once updated, you can also verify your configurations using below describe command -
# Describe configurations
./bin/kafka-configs.sh --zookeeper localhost:
2181 --describe --entity-name test-client --entity-type clients
#Output of above command will be as below
Configs
for clients:test-client are producer_byte_rate=
10485760,consumer_byte_rate=
20971520
How to change
Kafka configuration:
Change the servers.properties
file and restart Kafka for all brokers.
Debug and Logging:
Code to dump all attributes for a topic:
DescribeConfigsResult configResults = adminClient.describeConfigs( Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, topicName))); Map<ConfigResource, KafkaFuture<Config>> configs = configResults.values(); for (Map.Entry<ConfigResource, KafkaFuture<Config>> config : configs.entrySet() ) { System.out.println("Topic config for " + config.getKey() + " : "); Collection<ConfigEntry> configEntries = config.getValue().get().entries(); for (ConfigEntry configEntry : configEntries) { System.out.println(" " + configEntry.name() + "-->" + configEntry.value() + " (" + (configEntry.isDefault()?"Default":"Non-default") + ")"); } }
Results from above sample
code:
Topic config for ConfigResource{type=TOPIC,
name='Topic-2043e083-676d-4371-81a2-72870fdb20d5'} :
compression.type-->producer (Default)
leader.replication.throttled.replicas--> (Default)
message.downconversion.enable-->true (Default)
min.insync.replicas-->1 (Default)
segment.jitter.ms-->0 (Default)
cleanup.policy-->delete (Default)
flush.ms-->9223372036854775807 (Default)
follower.replication.throttled.replicas--> (Default)
segment.bytes-->1073741824 (Default)
retention.ms-->604800000 (Default)
flush.messages-->9223372036854775807 (Default)
message.format.version-->2.0-IV1 (Default)
file.delete.delay.ms-->60000 (Default)
max.message.bytes-->1000012 (Default)
min.compaction.lag.ms-->0 (Default)
message.timestamp.type-->CreateTime (Default)
preallocate-->false (Default)
min.cleanable.dirty.ratio-->0.5 (Default)
index.interval.bytes-->4096 (Default)
unclean.leader.election.enable-->false (Default)
retention.bytes-->-1 (Default)
delete.retention.ms-->86400000 (Default)
segment.ms-->604800000 (Default)
message.timestamp.difference.max.ms-->9223372036854775807 (Default)
segment.index.bytes-->10485760 (Default)
Pulsar
Another Kafka
Kafka Stream
KStream à KTable
No comments:
Post a Comment