Apache Kafka – это популярный брокер распределенных сообщений, предназначенный для эффективной обработки больших объемов данных в режиме реального времени. Кластер Kafka не только обладает высокой масштабируемостью и отказоустойчивостью, но также имеет гораздо более высокую пропускную способность по сравнению с другими брокерами сообщений, такими как ActiveMQ и RabbitMQ . Хотя он обычно используется в качестве системы обмена сообщениями для публикации / подписки, многие организации также используют его для агрегации журналов, поскольку он обеспечивает постоянное хранение опубликованных сообщений.
Система обмена сообщениями publish/subscribe позволяет одному или нескольким производителям публиковать сообщения без учета количества потребителей или способа их обработки. Подписанные клиенты автоматически получают уведомления об обновлениях и создании новых сообщений. Эта система является более эффективной и масштабируемой, чем системы, в которых клиенты периодически опрашивают, чтобы определить, доступны ли новые сообщения.
В этой статье мы будем устанавливать Apache Kafka 2.1.1 на Ubuntu 18.04.
Необходимые компоненты
Для того чтобы следовать вперед, вам будет нужно:
Один сервер Ubuntu 18.04 и пользователь без полномочий root с привилегиями sudo. Если у вас нет настроенного пользователя без полномочий root, выполните действия, указанные в этом руководстве.
По крайней мере, 4 ГБ оперативной памяти на сервере. Установка без этого объема оперативной памяти может привести к сбою службы Kafka, при этом виртуальная машина Java (JVM) выбрасывает исключение “из памяти” во время запуска.
OpenJDK 8, установленный на вашем сервере. Чтобы установить эту версию, выполните следующие инструкции по установке определенных версий OpenJDK. Kafka написан на Java, поэтому он требует JVM; однако его сценарий запуска оболочки имеет ошибку обнаружения версии, которая заставляет его не запускаться с версиями JVM выше 8.
Шаг 1. Создание пользователя для Kafka
Поскольку Kafka может обрабатывать запросы по сети, вы должны создать для него выделенного пользователя. Это минимизирует ущерб вашей машине Ubuntu, если сервер Kafka будет скомпрометирован. На этом шаге мы создадим выделенного пользователя kafka, но вы должны создать другого пользователя без полномочий root для выполнения других задач на этом сервере после завершения настройки Kafka.
Войдя в систему как ваш некорневой пользователь sudo, создайте пользователя с именем kafka с помощью useradd команды:
sudo useradd kafka -m
-m флаг гарантирует, что для пользователя будет создан домашний каталог. Этот домашний каталог /home/kafka будет выступать в качестве нашего каталога рабочей области для выполнения команд в разделах ниже.
Установите пароль с помощью passwd:
sudo passwd kafka
Добавьте пользователя kafka в sudo группу с помощью этой adduser команды, чтобы он имел права доступа, необходимые для установки зависимостей Kafka:
sudo adduser kafka sudo
Теперь ваш пользователь kafka готов. Войдите в эту учетную запись с помощью su:
su -l kafka
Теперь, когда мы создали конкретного пользователя Kafka, мы можем перейти к загрузке и извлечению двоичных файлов Kafka.
Шаг 2. Загрузка и извлечение двоичных файлов Кафки
Давайте загрузим и распакуем двоичные файлы Kafka в выделенные папки в домашнем каталоге нашего пользователя kafka.
Для начала создайте каталог Downloads в /home/kafka для хранения ваших загрузок:
mkdir ~/Downloads
Используйте curlдля загрузки двоичных файлов Kafka:
curl "https://www.apache.org/dist/kafka/2.1.1/kafka_2.11-2.1.1.tgz" -o ~/Downloads/kafka.tgz
Создайте вызываемый каталог kafkaи перейдите в этот каталог. Это будет базовый каталог установки Kafka:
mkdir ~/kafka && cd ~/kafka
Извлеките архив, который вы загрузили с помощью tar:
tar -xvzf ~/Downloads/kafka.tgz --strip 1
Мы задаем –strip 1 флаг, чтобы гарантировать, что содержимое архива извлекается в ~/kafka/, а не в другой каталог (например ~/kafka/kafka_2.11-2.1.1/) внутри него.
Теперь, когда мы успешно загрузили и извлекли двоичные файлы,мы можем перейти к настройке в Kafka, чтобы разрешить удаление темы.
Шаг 3. Настройка сервера Kafka
Поведение Кафки по умолчанию не позволяет нам удалить тему, категорию, группу или имя канала, в котором могут быть опубликованы сообщения. Чтобы изменить это, давайте отредактируем файл конфигурации.
Параметры конфигурации Kafka указаны в server.properties. Откройте этот файл с помощью вашего любимого редактора:
vim ~/kafka/config/server.properties
Давайте добавим настройку, которая позволит нам удалить темы Кафки. Добавьте в нижнюю часть файла ~/kafka/config/server.properties
delete.topic.enable = true
Сохраните файл и закройте . Теперь, когда мы настроили Kafka, мы можем перейти к созданию файлов блока systemd для запуска и включения его при запуске.
Шаг 4. Создание файлов модуля Systemd и запуск сервера Kafka
В этом разделе мы создадим файлы блока systemd для сервиса Kafka. Это поможет нам выполнять общие действия службы, такие как запуск, остановка и перезапуск Kafka в соответствии с другими службами Linux.
Zookeeper – это сервис, который использует Kafka для управления своим состоянием кластера и конфигурациями. Он обычно используется в мульти-распределенных системах как неотъемлемый компонент. Если вы хотите узнать больше об этом, почитайте официальную документацию Zookeeper .
Создайте файл объекта для zookeeper /etc/systemd/system/zookeeper.service
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
В [Unit] разделе указывается, что Zookeeper требует, чтобы сеть и файловая система были готовы до ее запуска.
В [Service] разделе указывается, что systemd должен использовать файлы zookeeper-server-start.sh и zookeeper-server-stop.sh для запуска и остановки службы. Он также указывает, что Zookeeper должен быть перезапущен автоматически, если он выходит ненормально.
Затем создайте файл службы systemd для kafka:
sudo nano /etc/systemd/system/kafka.service
Введите в файл следующее определение единицы измерения /etc/systemd/system/kafka.service
[Unit]
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
В [Unit] разделе указывается, что этот единичный файл зависит от zookeeper.service. Это гарантирует, что zookeeper запускается автоматически при запуске kafka службы.
В [Service] разделе указывается, что systemd должен использовать файлы kafka-server-start.shand kafka-server-stop.shshell для запуска и остановки службы. Он также указывает, что Kafka должен быть перезапущен автоматически, если он выходит ненормально.
Теперь, когда единицы были определены, начните Kafka со следующей команды:
sudo systemctl start kafka
Чтобы убедиться, что сервер успешно запущен, проверьте журналы журнала для службы kafka:
sudo journalctl -u kafka
Вы должны увидеть вывод, подобный следующему:
Output
Jul 17 18:38:59 kafka-ubuntu systemd[1]: Started kafka.service.
Теперь у вас есть сервер Kafka, слушающий на порту 9092.
Несмотря на то, что мы запустили службу kafka, если бы мы перезагрузили наш сервер, он не был бы запущен автоматически. Чтобы включить kafka загрузку с сервера, выполните:
sudo systemctl enable kafka
Теперь, когда мы запустили и включили службы, давайте проверим установку.
Шаг 5. Тестирование установки
Давайте опубликуем и используем сообщение “Hello World”, чтобы убедиться, что сервер Kafka ведет себя правильно. Публикация сообщений в Kafka требует:
Производитель, который позволяет публиковать записи и данные по темам.
Потребитель, который считывает сообщения и данные из разделов.
Во-первых, создайте тему с именем TutorialTopic, введя:
~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic
Вы можете создать источник из командной строки с помощью скрипта kafka-console-producer.sh (ему необходимы имя хоста сервера Kafka, порт и имя раздела в качестве аргументов)
Опубликуйте строку “Hello, World”в TutorialTopic теме, введя:
echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null
Далее, вы можете создать потребитель Kafka с помощью kafka-console-consumer.shсценария. Он ожидает имя хоста и порт сервера ZooKeeper, а также имя темы в качестве аргументов.
Следующая команда использует сообщения от TutorialTopic. Обратите внимание на использование –from-beginning флага, который позволяет использовать сообщения, опубликованные до запуска приемника:
~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning
Если все ок с конфигурацией, вы должны увидеть Hello, World в своем терминале:
Output
Hello, World
Сценарий продолжит выполнение, ожидая появления дополнительных сообщений, которые будут опубликованы в теме. Не стесняйтесь открывать новый терминал и запускать производителя, чтобы опубликовать еще несколько сообщений. Вы должны быть в состоянии увидеть их все в продукции потребителя.
После завершения тестирования нажмите CTRL+C, чтобы остановить сценарий приемника. Теперь, когда мы протестировали установку, давайте перейдем к установке KafkaT.
Шаг 6. Установите KafkaT (Необязательно)
KafkaT-это инструмент от Airbnb, который упрощает просмотр сведений о вашем кластере Kafka и выполнение определенных административных задач из командной строки. Поскольку это рубиновый камень, вам понадобится Рубин, чтобы использовать его. Вам также понадобится build-essential чтобы иметь возможность построить другие драгоценные камни, от которых он зависит. Установите их с помощью apt:
sudo apt install ruby ruby-dev build-essential
Теперь вы можете установить KafkaT с помощью команды gem:
sudo gem install kafkat
KafkaT использует .kafkatcfgв качестве файла конфигурации для определения каталогов установки и журналов вашего сервера Kafka. Он также должен иметь запись, указывающую KafkaT на ваш экземпляр ZooKeeper.
Создайте новый файл с именем .kafkatcfg:
nano ~/.kafkatcfg
Добавьте следующие строки, чтобы указать необходимую информацию о вашем сервере Kafka и экземпляре Zookeeper
sudo vim ~/.kafkatcfg
{
"kafka_path": "~/kafka",
"log_path": "/tmp/kafka-logs",
"zk_path": "localhost:2181"
}
Теперь вы готовы использовать KafkaT. Для начала, вот как вы можете использовать его для просмотра сведений обо всех разделах Kafka:
kafkat partitions
Вы увидите следующие выходные данные:
Output
Topic Partition Leader Replicas ISRs
TutorialTopic 0 0 [0] [0]
__consumer_offsets 0 0 [0] [0]
...
...
Вы увидите Tutorial Topic также __consumer_offsets внутреннюю тему, используемую Кафкой для хранения информации, связанной с клиентом. Вы можете безопасно игнорировать строки__consumer_offsets, начиная с.
Чтобы узнать больше о KafkaT, обратитесь к его репозиторию GitHub.
Шаг 7. Настройка кластера с несколькими узлами (не обязательно)
Если вы хотите создать мультиброкерный кластер с использованием большего количества компьютеров Ubuntu 18.04, необходимо повторить Шаг 1, Шаг 4 и Шаг 5 на каждом из новых компьютеров. Кроме того, необходимо внести следующие изменения в server.properties для каждого:
Значение broker.id должно быть изменено таким образом, чтобы оно было уникальным во всем кластере. Это свойство уникально идентифицирует каждый сервер в кластере и может иметь любую строку в качестве его значения. Например,”server1″,”server2″, и т.д.
Значение zookeeper.connect должно быть изменено таким образом, чтобы все узлы указывали на один и тот же экземпляр ZooKeeper. Это свойство указывает адрес экземпляра Zookeeper и следует <HOSTNAME/IP_ADDRESS>:PORT. Например,”203.0.113.0:2181″, “203.0.113.1:2181” и т.п.
Если вы хотите иметь несколько экземпляров ZooKeeper для вашего кластера, значение zookeeper.connect на каждом узле должно быть идентичной, разделенной запятыми строкой со списком IP-адресов и номеров портов всех экземпляров ZooKeeper.
Шаг 8. Ограничение пользователя Kafka
Теперь, когда все установки выполнены, вы можете удалить права администратора пользователя kafka. Прежде чем вы сделаете это, выйдите из системы и войдите обратно, как любой другой пользователь sudo, не являющийся root. Если вы все еще работаете в том же сеансе оболочки, с которым вы начали этот учебник, просто введите exit.
Удалите пользователя kafka из группы sudo:
sudo deluser kafka sudo
Чтобы еще больше повысить безопасность вашего сервера Kafka, заблокируйте пароль пользователя kafka с помощью этой passwdкоманды. Это гарантирует, что никто не сможет напрямую войти на сервер, используя эту учетную запись:
sudo passwd kafka -l
На этом этапе только пользователь root или sudo может войти в систему как kafka, введя следующую команду:
sudo su - kafka
В дальнейшем, если вы хотите разблокировать его, используйте passwd -u:
sudo passwd kafka -u
Теперь вы успешно ограничили права администратора пользователя kafka.
Вывод
Теперь у вас есть Apache Kafka, работающий безопасно на вашем сервере Ubuntu. Вы можете использовать его в своих проектах, создавая производителей и потребителей Kafka с помощью клиентов Kafka, которые доступны для большинства языков программирования. Чтобы узнать больше о Кафке, вы также можете ознакомиться с его документацией .