Ques/Help/Req Обработка данных в реальном времени из API через Apache Kafka + Hazelcast

XakeR

Member
Регистрация
13.05.2006
Сообщения
1 912
Реакции
0
Баллы
16
Местоположение
Ukraine
Для надежной поточной обработки данных в реальном времени и принятия решений на основе анализа данных из внешнего источника нужно обеспечить организацию конвейера обработки и хранения данных, который может быть кластеризирован и распараллелен для достижения необходимой производительности и отказоустойчивости. Кроме того, нужно обеспечить механизм своевременной доставки обновленных данных (на основе периодического опроса или использования Web Sockets/SSE) в систему анализа, которая также должна иметь доступ к истории изменений (например, для анализа тренда или получения усредненных значений по временному окну). В этой статье мы поговорим про использование Apache Kafka совместно с Hazelcast для анализа данных в реальном времени, а также разработаем коннектор для Kafka Connect для извлечения данных из внешнего источника (на примере WeatherStack API)

Apache Kafka — распределенная система хранения истории событий, оптимизированная для быстрого последовательного чтения. Единицей хранения в Kafka является сообщение (message), которое является частью темы (topic). Topic может быть представлен несколькими разделами (partition) для отказоустойчивости и производительности. Apache Kafka может быть запущен в режиме кластера, при этом для хранения топологии используется либо внешний Apache Zookeeper, либо внутренние механизмы хранения и протокол KRaft (подробнее этот вопрос был разобран в этой статье). Для каждого топика может быть заданы свои настройки по репликации, времени хранения истории сообщений, правил сохранения снимка в долговременную память и др. Отправку данных в topic выполняют producer по сетевому протоколу Kafka (через совместимую библиотеку), либо может использоваться коннектор в отдельном процессе Kafka Connect, который периодически опрашивается и получает данные из внешнего источника. Также Kafka Connect может использоваться для обработки поточных данных (между топиками) и для выгрузки сообщений во внешнюю систему (Sink Connector). Сообщения извлекаются процессами consumer по сетевому протоколу, при этом используется pull-модель (consumer сам периодически запрашивает Kafka о новых сообщениях).

Мы будем использовать развертывание Apache Kafka и остальных компонентов в Docker, и тут важным является корректное указание Advertised host (должны совпадать с сетевым именем контейнера внутри сети, поскольку именно оно будет возвращаться как адрес для установки сетевого подключения для отправки/извлечения сообщений). Наше решение будет состоять из нескольких компонентов:


  1. Коннектор для извлечения данных из внешнего API


  2. Apache Kafka для хранения истории ответов из внешнего API


  3. Hazelcast для обработки сообщений в реальном времени

Общая архитектура решения представлена на рисунке:

Обработка данных в реальном времени из API через Apache Kafka + Hazelcast0
Общая архитектура решения

Hazelcast может рассматриваться как среда для выполнения конвейерной обработки данных (может работать как с Kafka, так и с другими источниками данных, например Amazon Kinesis, Apache Pulsar или поток изменений, полученных через подключение CDC (Change Data Capture, например Debezium) над реляционными базами данных MySQL/PostgreSQL или NoSQL MongoDB, а также при наблюдении за файлами. Данные для обработки загружаются в оперативную память и могут быть проанализированы через встроенный SQL-подобный запрос (ориентирован на выполнение агрегаций внутри скользящего окна) или с использованием кода (может быть написан на Java/Kotlin, C++, .Net, Python, Node.JS, Go). Во втором случае обработка конвейера определяется через преобразования потока и операции группировки:


  • преобразования: distinct, sort, map, filter, flatMap, join, merge, mergeUsingService/mergeUsingServiceAsync (через внешний сервис), mapUsingReplicatedMap (через сохраненный key-value внутри Hazelcast);


  • агрегации: aggregate (например среднее значение, наименьшее-наибольшее, тренд, применяются к скользящему окну или группе), window/slidingWindow (определение окна, состоящего из N последних замеров).


  • сохранения в sink (например в лог, базу данных, файл, ElasticSearch и др.): writeTo

Начнем с создания конфигурации для запуска Apache Kafka в Docker Compose:

version: ‘3’ services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ports: — 22181:2181 kafka: image: confluentinc/cp-kafka:latest depends_on: — zookeeper ports: — 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:pLAINTEXT,PLAINTEXT_HOST:pLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT

Здесь мы регистрируем имя kafka и порт 9092 для взаимодействия с Kafka из внешних контейнеров внутри сети. Также нам нужно будет создать несколько очередей, для хранения служебной информации для Kafka Connect (актуальные смещения, конфигурация и состояние передачи данных). Для этого добавим еще один контейнер, который будет выполнять подготовку Kafka при первом запуске:

init-kafka: image: confluentinc/cp-kafka:latest depends_on: — kafka entrypoint: [ ‘/bin/sh’, ‘-c’ ] command: | » kafka-topics —bootstrap-server kafka:9092 —list echo -e ‘Creating kafka topics’ kafka-topics —bootstrap-server kafka:9092 —create —if-not-exists —topic connect_config —replication-factor 1 —partitions 1 —config cleanup.policy=compact kafka-topics —bootstrap-server kafka:9092 —create —if-not-exists —topic connect_offset —replication-factor 1 —partitions 1 —config cleanup.policy=compact kafka-topics —bootstrap-server kafka:9092 —create —if-not-exists —topic connect_status —replication-factor 1 —partitions 1 —config cleanup.policy=compact # в эту очередь будут добавляться новые извлеченные из API данные kafka-topics —bootstrap-server kafka:9092 —create —if-not-exists —topic weather —replication-factor 1 —partitions 1 echo -e ‘Successfully created the following topics:’ kafka-topics —bootstrap-server kafka:9092 —list

Теперь добавим еще один контейнер для управления коннекторами, которые будут выполнять извлечение данных из внешнего источника. Мы будем использовать вариант контейнера от confluentinc, который также поддерживает управление коннекторами через Confluent Hub. Коннекторы могут решать следующие задачи:


  • source — извлечение данных из внешнего источника (расширение класса SourceConnector)


  • sink — отправка данных во внешнюю систему (SinkConnector)


  • transform — изменение данных (Transformer)


  • converter — сериализации-десериализация сообщений (например, в Avro или JSON, с использованием схемы из Kafka Schema Registry).

В этой статье мы рассмотрим только первый тип коннектора. Для разработки будем использовать Kotlin и начнем с добавления необходимых зависимостей — поддержку API Kafka Connect, сериализацию KotlinX Serialization, Ktor Client для взаимодействия с API, а также библиотеку для логирования Kotlin Logging и ShadowJar (последняя необходима для интеграции Kotlin Runtime в единый JAR с коннектором для корректного запуска в JVM):

import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar plugins { kotlin(«jvm») version «1.8.21» id(«com.github.johnrengelman.shadow») version «8.1.1» kotlin(«plugin.serialization») version «1.8.21» } group = «tech.dzolotov» version = «1.0-SNAPSHOT» repositories { mavenCentral() } dependencies { implementation(«org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1») implementation(«org.apache.kafka:connect-api:3.4.1») implementation(«io.ktor:ktor-client-core:2.3.1») implementation(«io.ktor:ktor-client-cio:2.3.1») implementation(«io.github.oshai:kotlin-logging-jvm:4.0.0-beta-29») implementation(«org.jetbrains.kotlinx:kotlinx-serialization-json:1.5.1») }

Поскольку внешний API требует авторизации, получим токен после регистрации на (может использоваться бесплатно с ограничениями). Для запроса актуальных данных будем использовать REST API: , схема данных описана здесь.

Создадим класс для модели данных (возьмем только некоторые из полей):

@Serializable class WeatherData(val current: CurrentWeather) @Serializable class CurrentWeather( val observation_time: String, val temperature: Double, val wind_speed: Double, val wind_dir: String, val pressure: Double, )

Для определения коннектора сначала договоримся об используемой конфигурации, она будет использоваться при запуске экземпляра для настройки извлечения данных. В нашем случае для коннектора необходимо задать три значения:


  • адрес для подключения к внешнему API (вместе с токеном)


  • интервал периодического опроса API


  • название Kafka Topic, куда будут отправлять данные

Конфигурация задается через типизированные параметры, которые в коде определяются через Builder-класс ConfigDef (org.apache.kafka.common.config.ConfigDef). Для удобства объединим все константы с названиями параметров конфигурации и определение объекта конфигурации в общем singleton-объекте ApiConnectConfig:

object ApiConnectConfig { const val VERSION = «1.0.0» const val TOPIC_CONFIG = «topic» const val API_URL_CONFIG = «apiUrl» const val PERIODIC_POLL = «periodicPoll» var topic: String? = null var apiUrl: String? = null var periodicPoll: Int? = null val config: ConfigDef = ConfigDef() .define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, «Topic name») .define(API_URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, «API Endpoint URL») .define(PERIODIC_POLL, ConfigDef.Type.INT, 60, ConfigDef.Importance.HIGH, «Polling interval in seconds») }

В последнем определении 60 — это значение по умолчанию для параметра periodicPoll. Представленная схема данных автоматически валидируется в момент регистрации коннектора, но при необходимости может быть создан специальный валидатор как реализация интерфейса ConfigDef.Validator для проверки корректности переданных значений (автоматически проверяются только типы значений и их наличие, если не указано значение по умолчанию)

Создадим реализацию абстрактного класса SourceConnector:

class ApiSourceConnector : SourceConnector() { val logger = KotlinLogging.logger(«ApiSourceConnector») //версия коннектора override fun version() = ApiConnectConfig.VERSION // конфигурация коннектора override fun config() = ApiConnectConfig.config //класс обработчик override fun taskClass() = ApiSourceTask::class.java override fun start(props: MutableMap<String, String>?) { //разбираем конфиг } override fun stop() { // при удалении регистрации коннектора logger.info { «Stopping connector» } } override fun taskConfigs(maxTasks: Int): Map<String,String> { //создание конфигурации для задачи } }

Здесь в методе start нужно будет разобрать Key-Value Map с параметрами запуска коннектора. Коннектор при запуске создает объект класса SourceTask и будет использовать эту конфигурацию при инициализации, которая будет создаваться в методе taskConfigs. Для удобства объединим все методы для преобразования Map в конфигурацию, создание Map для SourceTask и извлечение отдельных значений конфигурации в объекте ApiConnectConfig, для этого добавим в существующий объект:

object ApiConnectConfig const val VERSION = «1.0.0» const val TOPIC_CONFIG = «topic» const val API_URL_CONFIG = «apiUrl» const val PERIODIC_POLL = «periodicPoll» var topic: String? = null var apiUrl: String? = null var periodicPoll: Int? = null val config: ConfigDef = ConfigDef() .define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, «Topic name») .define(API_URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, «URL Name») .define(PERIODIC_POLL, ConfigDef.Type.INT, 60, ConfigDef.Importance.HIGH, «Polling interval in seconds») fun buildFromMap(map: Map<String, String>) { val parsed = AbstractConfig(config, map) topic = parsed.getString(TOPIC_CONFIG) apiUrl = parsed.getString(API_URL_CONFIG) periodicPoll = parsed.getInt(PERIODIC_POLL) } fun toMap(): Map<String, String> { return mapOf( TOPIC_CONFIG to topic.orEmpty(), API_URL_CONFIG to apiUrl.orEmpty(), PERIODIC_POLL to periodicPoll?.toString().orEmpty() ) } fun getTopic(config: Map<String, String>) = config[TOPIC_CONFIG] fun getApiUrl(config: Map<String, String>) = config[API_URL_CONFIG] fun getPeriodicPoll(config: Map<String, String>) = config[PERIODIC_POLL]?.toIntOrNull() ?: 60 }

Теперь мы может сделать реализацию методов start и taskConfigs в ApiSourceConnector:

override fun start(props: MutableMap<String, String>?) { //разбираем конфиг ApiConnectConfig.buildFromMap(props ?: mapOf()) logger.info { «Starting connector for ${ApiConnectConfig.topic}, URL: ${ApiConnectConfig.apiUrl}, Polling interval: ${ApiConnectConfig.periodicPoll}» } } override fun taskConfigs(maxTasks: Int) = listOf(ApiConnectConfig.toMap())

Дальше мы можем создать реализацию абстрактного интерфейса SourceTask, которая должна реализовать следующие методы:


  • start — конфигурация коннектора (принимает Map, который создается в taskConfigs)


  • stop — очистка ресурсов после отключения коннектора


  • poll — извлечение списка новых сообщений (может вернуть null, если сообщений нет)

Добавим использование Channel для передачи извлеченных данных в метод poll. Сам периодический опрос мы будем делать через flow (но можно использовать и любой механизм запуска задач через интервал времени).

class ApiSourceTask : SourceTask() { //Клиент для запросов к REST val httpClient = HttpClient(CIO) //Логирование val logger = KotlinLogging.logger(«ApiSourceTask») //Канал для передачи извлеченных данных val channel = Channel<SourceRecord>(capacity = 1) //Версия task’а override fun version() = ApiConnectConfig.VERSION fun tickerFlow(period: Duration, initialDelay: Duration = Duration.ZERO) = flow { delay(initialDelay) while (true) { emit(Unit) delay(period) } } private val json = Json { ignoreUnknownKeys = true } // Извлекаем данные из REST API suspend fun getWeather(url: String): WeatherData { val response = httpClient.get(url) return Json.decodeFromString(response.bodyAsText()) } // Запуск извлечения данных override fun start(props: MutableMap<String, String>?) { val config = props ?: mapOf() // Получаем конфигурацию val topic = ApiConnectConfig.getTopic(config) val apiUrl = ApiConnectConfig.getApiUrl(config) val periodicPoll = ApiConnectConfig.getPeriodicPoll(config) logger.info { «Start api connect task» } //Запускаем периодический опрос CoroutineScope(Dispatchers.IO).launch { tickerFlow(periodicPoll.seconds.toJavaDuration()).flowOn(Dispatchers.IO).collect { logger.info { «Polling element » } val weather = getWeather(apiUrl.orEmpty()) //Сохраняем извлеченные данные //Здесь первые два аргумента — метаданные //Затем название topic для отправки сообщений //Следующий аргумент — идентификатор раздела (может быть не указан) //Следующие два аргумента — схема и содержание ключа //И последние два — схема и содержание значения channel.send( SourceRecord( mapOf(«domain» to «weather»), //extracted data mapOf(«dt» to LocalTime.now().toString()), //timestamp topic, //topic name null, //partition Schema.OPTIONAL_STRING_SCHEMA, weather.current.observation_time, //key Schema.OPTIONAL_STRING_SCHEMA, Json.encodeToString(weather.current), //value ) ) } } } // При отключении коннектора отключаем канал override fun stop() { channel.close() logger.info { «Stop» } } // При опросе возвращаем значение, если оно есть в канале или null для пропуска опроса override fun poll(): MutableList<SourceRecord>? = runBlocking { return@runBlocking try { mutableListOf<SourceRecord>(channel.receiveCatching().getOrThrow()) } catch (e: Exception) { null } } }

Теперь, когда код коннектора подготовлен, создадим jar-файл. При компиляции нужно использовать Java 11 для совместимости с JRE внутри контейнера Confluent Kafka Connect.

./gradlew shadowJar

И добавим к стеку Docker Compose запуск Kafka Connect:

kafka-connect: image: confluentinc/cp-kafka-connect:latest depends_on: — init-kafka environment: CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars CONNECT_BOOTSTRAP_SERVERS: kafka:9092 CONNECT_GROUP_ID: weather CONNECT_CONFIG_STORAGE_TOPIC: connect_config CONNECT_OFFSET_STORAGE_TOPIC: connect_offset CONNECT_STATUS_STORAGE_TOPIC: connect_status CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect CONNECT_TOPIC_CREATION_ENABLE: false ports: — 8083:8083 volumes: — /tmp/KafkaConnector.jar:/etc/kafka-connect/jars/KafkaConnector.jar command: — bash — -c — confluent-hub install —no-prompt confluentinc/kafka-connect-datagen:0.6.0 && /etc/confluent/docker/run && sleep infinity

Здесь в /tmp/KafkaConnector.jar скопирован результат сборки jar-файла из build/libs. Также здесь приведен пример установки коннектора из Confluent Hub. Добавление jar-файла недостаточно для регистрации и необходимо после завершения инициализации Kafka Connect обратиться к REST API и зарегистрировать коннектор, при этом в POST-запросе кроме названия класса коннектора также необходимо передать значения для параметров конфигурации и используемые сериализаторы. Нам понадобится использовать json-конфигурацию для регистрации коннектора:

{ «name»: «poll», «config»: { «connector.class»: «tech.dzolotov.kafka.connector.poll.ApiSourceConnector», «topic»: «weather», «apiUrl»: «https://api.weatherstack.com/current?access_key=TOKEN&query=Moscow», «periodicPoll»: «30», «key.converter»: «org.apache.kafka.connect.storage.StringConverter», «value.converter»: «org.apache.kafka.connect.json.JsonConverter» } }

Выполним запрос к Kafka Connect:

curl -X POST -d @register.json —header «Content-Type: application/json»

В логах контейнера Kafka Connect мы можем увидеть, что опрос API начался (с интервалом в 30 секунд). Увидеть полученные сообщения мы можем через запрос внутри Kafka-контейнера в консольной consumer-утилите Kafka:

kafka-console-consumer —bootstrap-server kafka:9092 —topic weather

Теперь перейдем к запуску Hazelcast и созданию конвейера для обработки данных. Hazelcast не требует запуска отдельного конфигурирования и запускается из образа контейнера hazelcast/hazelcast. Приложение, определяющее конвейер обработки может инициировать запрос создания нового сервера (на следующем по увеличению номера порта). Для отправки задания можно будет просто запустить процесс приложения, который будет взаимодействовать через Hazelcast Jet, либо использовать утилиту командной строки hz-cli и команду submit.

Для управления конвейером используются классы из com.hazelcast.jet.pipeline (нам понадобится создать и наполнить Pipeline) и использовать встроенный Sink для отправки сообщений в лог. Подключение к Hazelcast будем выполнять через com.hazelcast.core.Hazelcast. Для указания источника будем использовать com.hazelcast.jet.kafka.KafkaSources .

package tech.dzolotov.hazelcast import com.hazelcast.com.fasterxml.jackson.core.util.RequestPayload import com.hazelcast.core.Hazelcast import com.hazelcast.jet.config.JobConfig import com.hazelcast.jet.kafka.KafkaSources import com.hazelcast.jet.pipeline.Pipeline import com.hazelcast.jet.pipeline.Sinks import com.hazelcast.jet.pipeline.WindowDefinition.sliding import com.hazelcast.nonapi.io.github.classgraph.json.JSONDeserializer import kotlinx.serialization.Serializable import kotlinx.serialization.decodeFromString import kotlinx.serialization.json.Json import org.apache.kafka.common.serialization.StringDeserializer import java.util.* fun main(args: Array<String>) { JetJob.apply() } @Serializable class CurrentWeatherData(val payload: String) @Serializable data class CurrentWeather( val observation_time: String, val temperature: Double, val wind_speed: Double, val wind_dir: String, val pressure: Double, ) class JetJob { companion object { private val json = Json { ignoreUnknownKeys = true } fun apply() { val p = Pipeline.create() val kafkaSource = KafkaSources.kafka<String, String>(kafkaProps(), «weather») val window = p.readFrom(kafkaSource).withNativeTimestamps(0).window(sliding(1,1)) window.distinct() .apply { it.map { val payload = json.decodeFromString<CurrentWeatherData>(it.result().value).payload //здесь в payload будет json-строка json.decodeFromString<CurrentWeather>(payload) } } .writeTo(Sinks.logger { «Get json » + it }) val cfg = JobConfig().setName(«hazelcast-weather») Hazelcast.bootstrappedInstance().jet.newJob(p, cfg) } private fun kafkaProps(): Properties { val props = Properties() props.setProperty(«bootstrap.servers», «kafka:9092») props.setProperty(«key.deserializer», StringDeserializer::class.java.name) props.setProperty(«value.deserializer», StringDeserializer::class.java.name) props.setProperty(«auto.offset.reset», «earliest») return props } } }

При подключении к Kafka здесь указана политика «чтения всех сообщений, начиная с самого старого», но это может быть изменено и, например, можно получать только новые сообщения, которые этот consumer ранее еще не получал. Также здесь мы не используем возможности окна (размер =1) и извлекаем последнее полученное сообщение. Для декодирования JSON-сообщения также можно использовать конвейнерный метод map для извлечения значения температуры:

val window = p.readFrom(kafkaSource) .withNativeTimestamps(0) .window(sliding(1, 1)) .streamStage() .map { val payload = json.decodeFromString<CurrentWeatherData>(it.value).payload //здесь в payload будет json-строка json.decodeFromString<CurrentWeather>(payload).temperature }.writeTo(Sinks.logger { «Temperature $it» })

Альтернативно можно было бы получить среднее значение за последние 10 измерений температуры с использованием скользящего окна. Для этого мы можем определить свою функцию-агрегатор, которая будет принимать входной набор данных из Kafka (Map.Entry<String,String>) и возвращать среднее значение температуры среди переданных значений. Определение агрегатора выполняется через builder-методы от AggregateOperation с фазой инициализации (whenCreate), последовательных операций накопления (с каждым значением из списка через andAccumulate) и обобщения (andExportFinish), что по сути является определением Reducer в подходе Map-Reduce. В нашем случае функция агрегации может выглядеть следующим образом:

//принимаем Map.Entry<String,String>, аккумулятор будет Double, результат Double (средняя) val avgTemperatureOp = com.hazelcast.jet.aggregate.AggregateOperation.withCreate { listOf( DoubleAccumulator(0.0), DoubleAccumulator(0.0) ) }.andAccumulate<Map.Entry<String,String>> { acc, data -> val payload = Json.decodeFromString<CurrentWeatherData>(data.value).payload //здесь в payload будет json-строка val temperature = Json.decodeFromString<CurrentWeather>(payload).temperature acc[0].accumulate(temperature) acc[1].accumulate(1.0) }.andExportFinish { acc -> acc[0].export() / acc[1].export() }

Тогда конвейер, который будет получать последние 10 измерений (со сдвигом на единицу при появлении новых данных) может выглядеть так:

val p = Pipeline.create() val kafkaSource = KafkaSources.kafka<String, String>(kafkaProps(), «weather») p.readFrom(kafkaSource).withNativeTimestamps(0).window(sliding(10, 1)) .aggregate(avgTemperatureOp) .writeTo(Sinks.logger { «Temperature $it» })

При запуске используется активный экземпляр Hazelcast. Для сборки также будем использовать Java 11 и собирать через ShadowJar и будем использовать плагин application для определения класса с функцией main.

import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar plugins { kotlin(«jvm») version «1.8.21» id(«com.github.johnrengelman.shadow») version «8.1.1» kotlin(«plugin.serialization») version «1.8.21» application } group = «tech.dzolotov.hazelcast» version = «1.0-SNAPSHOT» repositories { mavenCentral() } dependencies { implementation(«org.jetbrains.kotlin:kotlin-stdlib:1.8.21») implementation(«com.hazelcast:hazelcast:5.2.3») implementation(«com.hazelcast.jet:hazelcast-jet-kafka:5.2.3») implementation(«org.jetbrains.kotlinx:kotlinx-serialization-json:1.5.1») testImplementation(kotlin(«test»)) } project.setProperty(«mainClassName», «tech.dzolotov.hazelcast.MainKt») tasks.test { useJUnitPlatform() } kotlin { jvmToolchain(11) }

Для запуска приложения в Hazelcast можно переслать jar-файл в контейнер и запустить там команду hz-cli submit:

docker cp HZTest-1.0-SNAPSHOT-all.jar kafka-hazelcast-1:/tmp docker exec -it kafka-hazelcast-1 hz-cli submit /tmp/HZTest-1.0-SNAPSHOT-all.jar

При запуске через submit код процесса будет запускаться внутри JVM Hazelcast и результаты выполнения можно увидеть через логи контейнера:

docker logs -f kafka-hazelcast-1

Таким образом, сочетание Kafka Connect (извлечение из API) + Kafka (для долговременного хранения истории изменения значений) + Hazelcast (анализ потока событий из Kafka и выполнение Filter-Map-Reduce преобразований) может быть использовано для решения задач анализа потока входных данных в реальном времени для любого источника данных (включая реляционные базы данных, текстовые файлы или любые другие источники, для которых может быть сформирован поток изменений и загружен в topic Kafka через Kafka Connect.

Во второй части статьи мы разберемся с созданием Sink Connector для передачи сообщений во внешнюю систему, а также посмотрим на возможности, доступные нам при реализации трансформеров для Kafka Connect.

А прямо сейчас хочу пригласить вас на бесплатный урок курса Apache Kafka, на котором вы узнаете про особенности Kafka и ее устройство, познакомитесь с основными утилитами, рассмотрите базовое АПИ для работы с Kafka.


  • Зарегистрироваться на бесплатный урок
 
198 162Темы
635 133Сообщения
3 618 414Пользователи
drakkon12345Новый пользователь
Верх