Apache Kafka. Введение для начинающих
В этой статье я расскажу об основных концепциях Apache Kafka , установим локально с помощью docker compose брокер Apache Kafka , создадим тестовый топик , напишем на python первый продьюсер(producer), чтобы публиковать события в этот топик и первый консьюмер(consumer) , чтобы читать из этого топика .
Apache Kafka — это распределенная платформа потоковой передачи событий с открытым исходным кодом, используемая тысячами компаний для высокопроизводительных конвейеров данных, потоковой аналитики, интеграции данных и критически важных приложений.
Apache Kafka сочетает в себе 3 ключевые возможности , такие как:
-
Публикацию(запись) потоков событий и подписку(чтение) на них, включая непрерывный импорт/экспорт ваших данных из других систем.
Хранит потоки событий в отказоустойчивом надежном виде.
Обрабатывает потоки событий по мере их возникновения.
Событие(Event) - это запись того , что произошло в мире или в вашем бизнесе.Например , Андрей зарегистривался в системе. Это событие. Или какой-то датчик отправил текущие значения. Это тоже событие. Маша сделала покупку на сумму 35 тысяч рублей , купив следующие продукты. Это тоже собыьтие. И вы хотите , чтобы эти события публиковались , хранились и обрабатывались разными микросервисами и другими источниками и потребителями данных в реальном времени. И для таких вещей предназначена Apache Kafka.
Источники данных , где происходят эти события публикуют события Apache Kafka , где эти записи хранятся в топиках и могут быть доступны для чтения (потребления) из топиков другим сервисам.
Kafka - это распределенная система , состоящая из различных типов серверов и клиентов , которые передают события через высокопроизводительный сетевой протокол TCP.
Брокеры(Brokers)
Брокер — это сервер на уровне хранилища Kafka, на котором хранятся потоки событий из одного или нескольких источников. Кластер Kafka обычно состоит из нескольких брокеров. Каждый брокер в кластере также является загрузочным сервером, то есть, если вы можете подключиться к одному брокеру в кластере, вы можете подключиться к каждому брокеру.
Давайте через docker-compose установим наш брокер.
Создадим новый проект и настроим там виртуальное окружение и в этом проекте напишем такой декларативный код в файле docker-compose.yml
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.3
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server:7.3.3
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
Применим команду:
docker-compose up -d
Топики(Topics)
Кластер Kafka организует и надежно хранит потоки событий в категориях, называемых топиками(topics), которые являются наиболее фундаментальной единицей организации Kafka.
Тема — это журнал событий, аналогичный папке в файловой системе, где события — это файлы в этой папке.
Создаем топик следующим образом через docker-compose.
docker compose exec broker \
kafka-topics --create \
--topic user-actions \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1
Теперь установим confluent-kafka
pip install confluent-kafka==2.1.0
Файл producer.py
from confluent_kafka import Producer
from random import choice
if __name__ == "__main__":
print("Запускаем наш producer")
config = {'bootstrap.servers': 'localhost:9092'}
producer = Producer(config)
topic = "user-actions"
user_ids = ['Иван', 'Андрей', 'Игорь', 'Сергей', 'Мухаммад', 'Рамиль']
actions = ['регистрация', 'аутентификаця', 'выход', 'подписка', 'лайк']
def delivery_callback(err, msg):
if err:
print(f'Ошибка: Сообщение не доставлено : {err}')
else:
print(f"Публикуем сообщение в топик: {msg.topic()}:"
f"с ключом = {msg.key().decode('utf-8'):12} и значением = {msg.value().decode('utf-8'):12}")
count = 0
for _ in range(10):
user_id = choice(user_ids)
product = choice(actions)
producer.produce(topic, product, user_id, callback=delivery_callback)
count += 1
producer.poll(10000)
producer.flush()
Файл consumer.py
from confluent_kafka import Consumer, OFFSET_BEGINNING
if __name__ == "__main__":
config = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'python_example_group_1',
}
consumer = Consumer(config)
def reset_offset(consumer, partitions):
for p in partitions:
p.offset = OFFSET_BEGINNING
consumer.assign(partitions)
topic = "user-actions"
consumer.subscribe([topic], on_assign=reset_offset)
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
print("Ждем...")
elif msg.error():
print(f"Ошибка: {msg.error()}")
else:
print(f"Получено событие из топика {msg.topic()}:"
f" key = {msg.key().decode('utf-8'):12} value = {msg.value().decode('utf-8'):12}")
except KeyboardInterrupt:
pass
finally:
consumer.close()
Исходный код лежит на Github
Заключение
Это была ознакомительная статья , где я рассказал об основных концепциях Apache Kafka , также локально запустили , создали топик и поместили в него события с помощью продусьера и прочитали их с помощью консьюмера.