Apache Kafka. Введение для начинающих
В этой статье я расскажу об основных концепциях Apache Kafka , установим локально с помощью docker compose брокер Apache Kafka , создадим тестовый топик , напишем на python первый продьюсер(producer), чтобы публиковать события в этот топик и первый консьюмер(consumer) , чтобы читать из этого топика .
Apache Kafka — это распределенная платформа потоковой передачи событий с открытым исходным кодом, используемая тысячами компаний для высокопроизводительных конвейеров данных, потоковой аналитики, интеграции данных и критически важных приложений.
Apache Kafka сочетает в себе 3 ключевые возможности , такие как:
-  
Публикацию(запись) потоков событий и подписку(чтение) на них, включая непрерывный импорт/экспорт ваших данных из других систем.
 Хранит потоки событий в отказоустойчивом надежном виде.
Обрабатывает потоки событий по мере их возникновения.
Событие(Event) - это запись того , что произошло в мире или в вашем бизнесе.Например , Андрей зарегистривался в системе. Это событие. Или какой-то датчик отправил текущие значения. Это тоже событие. Маша сделала покупку на сумму 35 тысяч рублей , купив следующие продукты. Это тоже собыьтие. И вы хотите , чтобы эти события публиковались , хранились и обрабатывались разными микросервисами и другими источниками и потребителями данных в реальном времени. И для таких вещей предназначена Apache Kafka.
Источники данных , где происходят эти события публикуют события Apache Kafka , где эти записи хранятся в топиках и могут быть доступны для чтения (потребления) из топиков другим сервисам.
Kafka - это распределенная система , состоящая из различных типов серверов и клиентов , которые передают события через высокопроизводительный сетевой протокол TCP.
Брокеры(Brokers)
Брокер — это сервер на уровне хранилища Kafka, на котором хранятся потоки событий из одного или нескольких источников. Кластер Kafka обычно состоит из нескольких брокеров. Каждый брокер в кластере также является загрузочным сервером, то есть, если вы можете подключиться к одному брокеру в кластере, вы можете подключиться к каждому брокеру.
Давайте через docker-compose установим наш брокер.
Создадим новый проект и настроим там виртуальное окружение и в этом проекте напишем такой декларативный код в файле docker-compose.yml
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.3
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  broker:
    image: confluentinc/cp-server:7.3.3
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
Применим команду:
docker-compose up -d
Топики(Topics)
Кластер Kafka организует и надежно хранит потоки событий в категориях, называемых топиками(topics), которые являются наиболее фундаментальной единицей организации Kafka.
Тема — это журнал событий, аналогичный папке в файловой системе, где события — это файлы в этой папке.
Создаем топик следующим образом через docker-compose.
docker compose exec broker \
  kafka-topics --create \
    --topic user-actions \
    --bootstrap-server localhost:9092 \
    --replication-factor 1 \
    --partitions 1
Теперь установим confluent-kafka
pip install confluent-kafka==2.1.0
Файл producer.py
from confluent_kafka import Producer
from random import choice
if __name__ == "__main__":
    print("Запускаем наш producer")
    config = {'bootstrap.servers': 'localhost:9092'}
    producer = Producer(config)
    topic = "user-actions"
    user_ids = ['Иван', 'Андрей', 'Игорь', 'Сергей', 'Мухаммад', 'Рамиль']
    actions = ['регистрация', 'аутентификаця', 'выход', 'подписка', 'лайк']
    def delivery_callback(err, msg):
        if err:
            print(f'Ошибка: Сообщение не доставлено : {err}')
        else:
            print(f"Публикуем сообщение в топик: {msg.topic()}:"
                  f"с ключом = {msg.key().decode('utf-8'):12} и значением = {msg.value().decode('utf-8'):12}")
    count = 0
    for _ in range(10):
        user_id = choice(user_ids)
        product = choice(actions)
        producer.produce(topic, product, user_id, callback=delivery_callback)
        count += 1
    producer.poll(10000)
    producer.flush()
Файл consumer.py
from confluent_kafka import Consumer, OFFSET_BEGINNING
if __name__ == "__main__":
    config = {
        'bootstrap.servers': 'localhost:9092',
        'group.id': 'python_example_group_1',
    }
    consumer = Consumer(config)
    def reset_offset(consumer, partitions):
        for p in partitions:
            p.offset = OFFSET_BEGINNING
            consumer.assign(partitions)
    topic = "user-actions"
    consumer.subscribe([topic], on_assign=reset_offset)
    try:
        while True:
            msg = consumer.poll(1.0)
            if msg is None:
                print("Ждем...")
            elif msg.error():
                print(f"Ошибка: {msg.error()}")
            else:
                print(f"Получено событие из топика {msg.topic()}:"
                      f" key = {msg.key().decode('utf-8'):12} value = {msg.value().decode('utf-8'):12}")
    except KeyboardInterrupt:
        pass
    finally:
        consumer.close()
Исходный код лежит на Github
Заключение
Это была ознакомительная статья , где я рассказал об основных концепциях Apache Kafka , также локально запустили , создали топик и поместили в него события с помощью продусьера и прочитали их с помощью консьюмера.