Kafka

创建日期:2024-06-21
更新日期:2024-12-18

官网:http://kafka.apache.org/

下载地址:https://kafka.apache.org/downloads

简介

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

使用方法

参考文档:https://kafka.apache.org/quickstart

1、修改zookeeper配置文件。

编辑文件config/zookeeper.properties,修改dataDir=D:/temp/zookeeper。

2、启动zookeeper。

.\bin\windows\zookeeper-server-start.bat config/zookeeper.properties

3、修改kafka配置文件。

编辑文件config/server.properties,修改log.dirs=D:/temp/kafka-logs。

4、启动kafka。

.\bin\windows\kafka-server-start.bat config/server.properties

5、创建话题。

bin/windows/kafka-topics.bat --create --partitions 1 --replication-factor 1 --topic quickstart-events --bootstrap-server localhost:9092

6、查询话题描述。

bin/windows/kafka-topics.bat --describe --topic quickstart-events --bootstrap-server localhost:9092

7、向话题中写入一些事件。

bin/windows/kafka-console-producer.bat --topic quickstart-events --bootstrap-server localhost:9092

This is my first event

This is my second event

(按Ctrl+C)

8、读取事件。

bin/windows/kafka-console-consumer.bat --topic quickstart-events --from-beginning --bootstrap-server localhost:9092

开发指南

添加依赖

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.25</version>
    </dependency>
</dependencies>

写入事件

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class Application {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("linger.ms", 1);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 100; i++)
            producer.send(new ProducerRecord<String, String>("quickstart-events", Integer.toString(i), Integer.toString(i)));
        producer.close();
    }
}

读取事件

import java.time.Duration;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;

import java.util.Properties;

public class Application {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "Test");
        props.put("enable.auto.commit", true);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("quickstart-events"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

            records.forEach(record -> {
                System.out.println("Topic: " + record.topic() + " Partition: " + record.partition()
                        + " Offset: " + record.offset() + " Value:" + record.value()
                );
            });
        }
    }
}

常见问题

1、kafka启动报错:java.nio.file.AccessDeniedException: D:\temp,ERROR Shutdown broker because none of the specified log dirs from D:\temp\kafka-logs can be created or validated (kafka.log.LogManager)。

在win10上使用2.13-2.8.1版本,不要用2.13-3.0.0版本。

2、IDEA报错:语言级别5不支持Diamond类型。

修改pom.xml,将语言级别设置为8。

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>8</source>
                <target>8</target>
            </configuration>
        </plugin>
    </plugins>
</build>

3、Failed to load class "org.slf4j.impl.StaticLoggerBinder"。

添加依赖项:

<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.25</version>
</dependency>

4、连接远程kafka报错:Error connecting to node WIN-PU466GJNS26:9092 (id: 0 rack: null) (org.apache.kafka.clients.NetworkClient),java.net.UnknownHostException: WIN-PU466GJNS26。

在hosts文件中添加以下代码即可。

192.168.23.122    WIN-PU466GJNS26