Apache Kafka. Введение для начинающих

25 Апр 2021 , 945

В этой статье я расскажу об основных концепциях Apache Kafka , установим локально с помощью docker compose брокер Apache Kafka , создадим тестовый топик , напишем на python первый продьюсер(producer), чтобы публиковать события в этот топик и первый консьюмер(consumer) , чтобы читать из этого топика .

Apache Kafka — это распределенная платформа потоковой передачи событий с открытым исходным кодом, используемая тысячами компаний для высокопроизводительных конвейеров данных, потоковой аналитики, интеграции данных и критически важных приложений.

Apache Kafka сочетает в себе 3 ключевые возможности , такие как:

  • Публикацию(запись) потоков событий и подписку(чтение) на них, включая непрерывный импорт/экспорт ваших данных из других систем.

  • Хранит потоки событий в отказоустойчивом надежном виде.

  • Обрабатывает потоки событий по мере их возникновения.

Событие(Event) - это запись того , что произошло в мире или в вашем бизнесе.Например , Андрей зарегистривался в системе. Это событие. Или какой-то датчик отправил текущие значения. Это тоже событие. Маша сделала покупку на сумму 35 тысяч рублей , купив следующие продукты. Это тоже собыьтие. И вы хотите , чтобы эти события публиковались , хранились и обрабатывались разными микросервисами и другими источниками и потребителями данных в реальном времени. И для таких вещей предназначена Apache Kafka.

Источники данных , где происходят эти события публикуют события Apache Kafka , где эти записи хранятся в топиках и могут быть доступны для чтения (потребления) из топиков другим сервисам.

Kafka - это распределенная система , состоящая из различных типов серверов и клиентов , которые передают события через высокопроизводительный сетевой протокол TCP.

Брокеры(Brokers)

Брокер — это сервер на уровне хранилища Kafka, на котором хранятся потоки событий из одного или нескольких источников. Кластер Kafka обычно состоит из нескольких брокеров. Каждый брокер в кластере также является загрузочным сервером, то есть, если вы можете подключиться к одному брокеру в кластере, вы можете подключиться к каждому брокеру.

Давайте через docker-compose установим наш брокер.

Создадим новый проект и настроим там виртуальное окружение и в этом проекте напишем такой декларативный код в файле docker-compose.yml


version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.3
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-server:7.3.3
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

Применим команду:


docker-compose up -d

Топики(Topics)

Кластер Kafka организует и надежно хранит потоки событий в категориях, называемых топиками(topics), которые являются наиболее фундаментальной единицей организации Kafka.

Тема — это журнал событий, аналогичный папке в файловой системе, где события — это файлы в этой папке.

Создаем топик следующим образом через docker-compose.


docker compose exec broker \
  kafka-topics --create \
    --topic user-actions \
    --bootstrap-server localhost:9092 \
    --replication-factor 1 \
    --partitions 1

Теперь установим confluent-kafka


pip install confluent-kafka==2.1.0

Файл producer.py


from confluent_kafka import Producer
from random import choice


if __name__ == "__main__":
    print("Запускаем наш producer")
    config = {'bootstrap.servers': 'localhost:9092'}

    producer = Producer(config)

    topic = "user-actions"
    user_ids = ['Иван', 'Андрей', 'Игорь', 'Сергей', 'Мухаммад', 'Рамиль']
    actions = ['регистрация', 'аутентификаця', 'выход', 'подписка', 'лайк']

    def delivery_callback(err, msg):
        if err:
            print(f'Ошибка: Сообщение не доставлено : {err}')
        else:
            print(f"Публикуем сообщение в топик: {msg.topic()}:"
                  f"с ключом = {msg.key().decode('utf-8'):12} и значением = {msg.value().decode('utf-8'):12}")

    count = 0
    for _ in range(10):
        user_id = choice(user_ids)
        product = choice(actions)
        producer.produce(topic, product, user_id, callback=delivery_callback)
        count += 1

    producer.poll(10000)
    producer.flush()


Файл consumer.py


from confluent_kafka import Consumer, OFFSET_BEGINNING


if __name__ == "__main__":
    config = {
        'bootstrap.servers': 'localhost:9092',
        'group.id': 'python_example_group_1',

    }
    consumer = Consumer(config)

    def reset_offset(consumer, partitions):
        for p in partitions:
            p.offset = OFFSET_BEGINNING
            consumer.assign(partitions)

    topic = "user-actions"
    consumer.subscribe([topic], on_assign=reset_offset)

    try:
        while True:
            msg = consumer.poll(1.0)
            if msg is None:
                print("Ждем...")
            elif msg.error():
                print(f"Ошибка: {msg.error()}")
            else:
                print(f"Получено событие из топика {msg.topic()}:"
                      f" key = {msg.key().decode('utf-8'):12} value = {msg.value().decode('utf-8'):12}")
    except KeyboardInterrupt:
        pass
    finally:
        consumer.close()


Исходный код лежит на Github

Заключение

Это была ознакомительная статья , где я рассказал об основных концепциях Apache Kafka , также локально запустили , создали топик и поместили в него события с помощью продусьера и прочитали их с помощью консьюмера.

comments powered by Disqus

Подписка

Подпишитесь на наш список рассылки, чтобы получать обновления из блога

Рубрики

Теги