Kafka is a distributed system and data is read from and written to the partition leader. The leader can be on any broker in a cluster. When a client (producer or consumer) starts, it will request metadata about which broker is the leader for a partition. This request for metadata can come from any broker. The metadata that is returned will include the available endpoints for the lead broker of that partition. The client will use those endpoints to connect to the broker to read or write data as required.
1. Architecture Design
1.1. Architecture Diagram
1.2. Kafka Broker Disks Sizing
Because Kafka persists messages to log segment files, excellent disk I/O performance is a requirement. Writing Kafka logs to multiple disks improves performance through parallelism. Since Kafka requires that an entire partition fit on a single disk, this is an upper bound on the amount of data that can be stored by Kafka for a given topic partition. Hence, the number of topics, the number of partitions per topic, the size of the records, etc. all affect the log retention limits. So, if you have a high volume partition, allocate it to a dedicated drive. For resilience, also be sure to allocate enough disk space cluster-wide for partition replicas.
Kafka does not degrade gracefully if it runs out of disk space. Make sure your sizing estimates are generous, monitor disk utilization, and take corrective action well before disk exhaustion occurs! In a default deployment Kafka can consume all disk space available on the disks it accesses, which not only causes Kafka to fail, but also the other applications and services on the node.
Based on the anticipated data rate, you can calculate how much disk space may be required and how to trade off the following variables to meet your needs and keep Kafka healthy.
Disk Size - Multi-TB hard drives have more capacity compared to SSDs, but trade off read and write performance.
Number of Partitions - Using more partitions increases throughput per topic through parallelism. It also allows more drives to be used, one per partition, which increases capacity. Note that Kafka only preserves message order per partition, not over a whole, multi-partition topic. Topic messages can be partitioned randomly or by hashing on a key.
Replication Factor - The replication factor provides resiliency for a partition. Replicas are spread across available brokers, therefore your replication factor is always <= the number of brokers in your cluster. It must be accounted for when determining disk requirements because there is no difference between how much space a leader partition and a replica partition uses. The total number of partitions for a given a topic is the number of partitions * the replication factor.
Dedicated Drives - Using dedicated drives for the Kafka brokers not only improves performance, by removing other activity from the drive, but should the drive become full, then other services that need drive space will not be adversely affected.
Do not use RAID for fault tolerance. Kafka provides fault tolerance guarantees at the application layer (with topic replication factor) making fault tolerance at the hard disk level redundant.
To estimate disk requirements for your need, per broker, begin by estimating the projected usage. Here is an example.
|Average topic message throughput (MT)||500 msg/sec||For each topic, how many messages on average per second.|
|Average message size (MS)||5 KB||Note that Kafka works best with messages under 1MB in size.|
|Broker message retention length (RL)||24 hours (86400 seconds)||The Kafka cluster retains all published messages whether or not they have been consumed for a configurable period of time.|
For example if the log retention is set to 24 hours, then for 24 hours after a message is published it is available for consumption, after which it will be discarded to free up space.
|Number of topics (NT)||1|
|Average replication factor (NR)||2|
|Number of Kafka Brokers (KB)||3|
1.3. Kafka Broker Memory (RAM) Sizing
Assuming message stays in memory in the worst case for 1 hour (3600 seconds)
2. System Requirements
|Number of VMs||3|
|Operating System||CentOS 7 x64|
|Privileges||ROOT access prefered|
3. Hardware Specifications (Per VM)
|OS Disk Size||40 GB SSD|
|Data Disk Size||250 GB SSD|
4. Preparing the VMs
- Verify the MAC address and product_uuid are unique for every node. You can get the MAC address of the network interfaces using the command
ip link | grep link/ether
- The product_uuid can be checked by using the command
4.1. Install prerequisites.
4.2. Install JDK 8.
4.3. Setting up JAVA_HOME by adding the following line at the bottom of
4.4. Get the updated JAVA_HOME into current shell.
4.5. Verify the Java version.
4.6. Synchronize server time with Google NTP server.
4.7. Start and enable chronyd service.
4.8. Display time synchronization status.
4.9. Start and enable NTP service.
4.10. Add the following lines to your
/etc/hosts file in ALL nodes.
4.11. Tweaking the system for high concurrancy and security.
4.12. Reload all sysctl variables without rebooting the server.
4.13. Configure firewall for ZooKeeper.
4.14. Enable ZooKeeper firewall rules.
4.15. Configure firewall for Kafka.
4.16. Enable Kafka firewall rules.
4.17. Restart the
4.18. Verify the changes.
5. Creating XFS File System based on LVM for DATA
The XFS filesystem is a high performance journalling filesystem and it is the default file system for RedHat/CentOS Linux 7. XFS supports a maximum file system size of 500 TB and a maximum file size of 16 TB.
5.1. In order to continue further, you must attach a new disk to your server. The
lsblk command lists all the block storage devices attached to the system. This includes raw disks, primary partitions, logical partitions and even network attached storage. Here, we have attached a new 250 GB disk which shows up as device sdb. The sda device holds the operating system.
5.2. To create a new XFS file system based on LVM you will first need a partition to format. You can use fdisk to create a new partition, like in the example below, you first need to invoke fdisk with the name of the hard disk you wish to create the partition on and then use "n" command inside fdisk for a new parttion. After you have set the size you will need to change the partition type from "Linux" to "Linux LVM" by typing "t" and "8e" respectively. Use the "w" command to write the new partition table to disk.
5.3. Inform the operating system kernel of partition table changes, by requesting that the operating system re-read the partition table.
5.4. Verify the changes using
5.5. Create a physical volume.
Physical volume is the actual storage device that will be used in the LVM configuration. It can be an entire disk, a partition on disk or a LUN on the SAN. You can use
pvcreate to create the physical volume. The
pvcreate command initialize these disks so that they can be a part in forming volume groups.
5.6. Display Physical Volumes using
5.7. Create a Volume Group.
Physical volumes are combined into volume groups (VGs). It creates a pool of disk space out of which logical volumes can be allocated. The disk space available for allocation in Volume Group is divided into units of a fixed-size called extents. An extent is the smallest unit of storage that can be allocated. Within a physical volume, extents are referred to as physical extents. A logical volume is allocated into logical extents of the same size as the physical extents. The extent size is thus the same for all logical volumes in the volume group. The volume group maps the logical extents to physical extents.
5.8. Display Volume Groups using
5.9. Create Logical Volume.
A volume group is divided up into logical volumes. So if you have created kafka_vg earlier then you can create logical volumes from that VG. The amount of space you want to allocate depends on your requirement. You might want to create LV of 200MB, 1GB etc. In here, we create a logical volume that uses the entire volume group space.
5.10. Display Logical Volumes using
5.11. Format logical partition to XFS filesystem.
5.12. Create a directory where this XFS file system will be mounted.
5.13. Find the Universally Unique Identifier (UUID) of the XFS filesystem using
blkid command. This UUID will be used to mount newly created logical volume under
5.14. Making mounts persistent by adding it to
5.15. Mount the file system.
5.16. Verify the changes using
5.17. In order to permanently disable swap space in Linux, open
/etc/fstab file, search for the swap line and comment the entire line by adding a "#" sign in front of the line.
5.18. Execute the following command to turn off all swap devices and files.
5.19. Disable File Access Time Logging and enable Combat Fragmentation to enhance XFS file system performance. Add
noatime,nodiratime,allocsize=64m to all XFS volumes.
5.20. The servers need to be restarted before continue further.
6. Install and Configure Confluent Community Components
6.1. Setting up Confluent repository
6.1.1. Install the Confluent Platform public key. This key is used to sign packages in the YUM repository.
6.1.2. Add the Confluent repository.
6.1.3. Update the YUM caches and install Confluent platform using only Confluent Community components.
6.2. Configure ZooKeeper
6.2.1. Create the zookeeper-data directory.
6.2.2. Each of our cluster nodes needs a UNIQUE SERVER ID. ZooKeeper looks up this information from the file
6.2.3. Verify all nodes have a UNIQUE SERVER ID.
6.2.4. Setting up
zookeeper-data directory ownership.
6.2.5. Backup existing
6.2.6. Configure ZooKeeper.
6.2.7. Start and enable ZooKeeper.
6.3. Configure Kafka
6.3.1. Create the kafka-data directory.
6.3.2. Setting up
kafka-data directory ownership.
6.3.3. Backup existing Kafka
6.3.4. Configure Kafka.
- Please note that broker.id and advertised.listeners properties are unique for each cluster nodes. Change its values accordingly.
6.3.5. Start and enable Kafka.
- The confluent-zookeeper service must be started before the confluent-kafka service.
- All confluent-zookeeper must be up and running before starting the confluent-kafka service.
7.1. Server Reboot Procedure
confluent-kafka service in all nodes ONE BY ONE.
confluent-zookeeper in one node and reboot the server.
7.1.3. Once the above rebooted server back online and healthy, stop
confluent-zookeeper in the next server and reboot.
7.1.4. Follow the above steps for all nodes in the cluster.
7.1.5. Verify all confluent services are up and running using
systemctl status confluent* command.
7.2. How to clean up a Kafka Cluster
confluent-kafka service in all nodes ONE BY ONE.
confluent-zookeeper in all nodes ONE BY ONE.
7.2.3. Remove Kafka
log.dir data in all nodes using the following command.
7.2.4. Remove ZooKeeper
dataDir data in all nodes excluding the
myid file using the below command.
7.2.5. Reboot all nodes.
7.2.6. Verify all confluent services are up and running using
systemctl status confluent* command. Most likely, Kafka nodes stop working after the reboot since all ZooKeeper nodes are NOT running at the same time. please make sure to start it using
systemctl start confluent-kafka in failed nodes.