Kafka 3.x.x 入门到精通(01)——对标尚硅谷Kafka教程

Kafka 3.x.x 入门到精通(01)——对标尚硅谷Kafka教程

  • 1. Kafka入门
    • 1.1 概述
      • 1.1.1 初识Kafka
      • 1.1.2 消息队列
      • 1.1.3 生产者-消费者模式
      • 1.1.4 消息中间件对比
      • 1.1.5 ZooKeeper
    • 1.2 快速上手
      • 1.2.1 环境安装
        • 1.2.1.1 安装Java8(略)
        • 1.2.1.2 安装Kafka
        • 1.2.1.3 启动ZooKeeper
        • 1.2.1.4 启动Kafka
      • 1.2.2 消息主题
        • 1.2.2.1 创建主题
        • 1.2.2.2 查询主题
        • 1.2.2.3 修改主题
        • 1.2.2.4 删除主题
      • 1.2.3 生产数据
        • 1.2.3.1 命令行操作
        • 1.2.3.2 工具操作
        • 1.2.3.3 Java API
      • 1.2.4 消费数据
        • 1.2.4.1 命令行操作
        • 1.2.4.2 Java API
      • 1.2.5 源码关联(可选)
        • 1.2.5.1 安装Java17
        • 1.2.5.2 安装Scala
        • 1.2.5.3 安装Gradle
      • 1.2.6 总结

在这里插入图片描述

在这里插入图片描述

本文档参看的视频是:

  • 尚硅谷Kafka教程,2024新版kafka视频,零基础入门到实战
  • 黑马程序员Kafka视频教程,大数据企业级消息队列kafka入门到精通
  • 小朋友也可以懂的Kafka入门教程,还不快来学

本文档参看的文档是:

  • 尚硅谷官方文档,并在基础上修改 完善!非常感谢尚硅谷团队!!!!

在这里插入图片描述

1. Kafka入门

1.1 概述

Kafka是一种消息队列,主要用来处理大量数据状态下的消息队列,一般用来做日志的处理。既然是消息队列,那么Kafka也就拥有消息队列的相应的特性了。

作为消息传输的中间件
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

1.1.1 初识Kafka

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

进程之间的交互问题,耦合性高!!!!

那么解耦合!!!

在这里插入图片描述
也就是消息中间件!!!

Kafka是一个由Scala和Java语言开发的,经典高吞吐量的分布式消息发布和订阅系统,也是大数据技术领域中用作数据交换的核心组件之一。以高吞吐,低延迟,高伸缩,高可靠性,高并发,且社区活跃度高等特性,从而备受广大技术组织的喜爱。

2010年,Linkedin公司为了解决消息传输过程中由各种缺陷导致的阻塞、服务无法访问等问题,主导开发了一款分布式消息日志传输系统。主导开发的首席架构师Jay Kreps因为喜欢写出《变形记》的西方表现主义文学先驱小说家Jay Kafka,所以给这个消息系统起了一个很酷,却和软件系统特性无关的名称Kafka。

因为备受技术组织的喜爱,2011年,Kafka软件被捐献给Apache基金会,并于7月被纳入Apache软件基金会孵化器项目进行孵化。2012年10月,Kafka从孵化器项目中毕业,转成Apache的顶级项目。由独立的消息日志传输系统转型为开源分布式事件流处理平台系统,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序。
官网地址:https://kafka.apache.org/
在这里插入图片描述

在这里插入图片描述

1.1.2 消息队列

Kafka软件最初的设计就是专门用于数据传输的消息系统,类似功能的软件有RabbitMQ、ActiveMQ、RocketMQ等。这些软件名称中的MQ是英文单词Message Queue的简称,也就是所谓的消息队列的意思。这些软件的核心功能是传输数据,而Java中如果想要实现数据传输功能,那么这个软件一般需要遵循Java消息服务技术规范JMS(Java Message Service)。前面提到的ActiveMQ软件就完全遵循了JMS技术规范,而RabbitMQ是遵循了类似JMS规范并兼容JMS规范的跨平台的AMQP(Advanced Message Queuing Protocol)规范。除了上面描述的JMS,AMQP外,还有一种用于物联网小型设备之间传输消息的MQTT通讯协议。

Kafka拥有作为一个消息系统应该具备的功能,但是却有着独特的设计。可以这样说,Kafka借鉴了JMS规范的思想,但是却并没有完全遵循JMS规范。这也恰恰是软件名称为Kafka,而不是KafkaMQ的原因。

在这里插入图片描述

这个消息中间件还是挺多的 JavaEE为了规范这样的接口——Java Message Service

由上可知,无论学习哪一种消息传输系统,JMS规范都是大家应该首先了解的。所以咱们这里就对JMS规范做一个简单的介绍:

  • JMS是Java平台的消息中间件通用规范,定义了主要用于消息中间件的标准接口。如果不是很理解这个概念,可以简单地将JMS类比为Java和数据库之间的JDBC规范。Java应用程序根据JDBC规范种的接口访问关系型数据库,而每个关系型数据库厂商可以根据JDBC接口来实现具体的访问规则。JMS定义的就是系统和系统之间传输消息的接口。
  • 为了实现系统和系统之间的数据传输,JMS规范中定义很多用于通信的组件:

在这里插入图片描述

在这里插入图片描述

  • JMS Provider:JMS消息提供者。其实就是实现JMS接口和规范的消息中间件,也就是我们提供消息服务的软件系统,比如RabbitMQ、ActiveMQ、Kafka。
  • JMS Message:JMS消息。这里的消息指的就是数据。一般采用Java数据模型进行封装,其中包含消息头,消息属性和消息主体内容。
  • JMS Producer:JMS消息生产者。所谓的生产者,就是生产数据的客户端应用程序,这些应用通过JMS接口发送JMS消息。
  • JMS Consumer:JMS消息消费者。所谓的消费者,就是从消息提供者(JMS Provider)中获取数据的客户端应用程序,这些应用通过JMS接口接收JMS消息。

JMS支持两种消息发送和接收模型:一种是P2P(Peer-to-Peer)点对点模型,另外一种是发布/订阅(Publish/Subscribe)模型。

  • P2P模型:P2P模型是基于队列的,消息生产者将数据发送到消息队列中,消息消费者从消息队列中接收消息。因为队列的存在,消息的异步传输成为可能。P2P模型的规定就是每一个消息数据,只有一个消费者,当发送者发送消息以后,不管接收者有没有运行都不影响消息发布到队列中。接收者在成功接收消息后会向发送者发送接收成功的消息

  • 在这里插入图片描述

  • 在这里插入图片描述

  • 发布 / 订阅模型:所谓得发布订阅模型就是事先将传输的数据进行分类,我们管这个数据的分类称之为主题(Topic)。也就是说,生产者发送消息时,会根据主题进行发送。比如咱们的消息中有一个分类是NBA,那么生产者在生产消息时,就可以将NBA篮球消息数据发送到NBA主题中,这样,对NBA消息主题感兴趣的消费者就可以申请订阅NBA主题,然后从该主题中获取消息。这样,也就是说一个消息,是允许被多个消费者同时消费的。这里生产者发送消息,我们称之为发布消息,而消费者从主题中获取消息,我们就称之为订阅消息。Kafka采用就是这种模型。

  • 在这里插入图片描述

在这里插入图片描述

1.1.3 生产者-消费者模式

生产者-消费者模式是通过一个容器来解决生产者和消费者的强耦合问题

生产者和消费者彼此之间不直接通信,而通过阻塞队列来进行通信,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个消息缓冲区,平衡了生产者和消费者的处理能力。在数据传输过程中,起到了一个削弱峰值的作用,也就是我们经常说到的削峰。
在这里插入图片描述

图形中的缓冲区就是用来给生产者和消费者解耦的。在单点环境中,我们一般会采用阻塞式队列实现这个缓冲区。而在分布式环境中,一般会采用第三方软件实现缓冲区,这个第三方软件我们一般称之为中间件。纵观大多数应用场景,解耦合最常用的方式就是增加中间件。

遵循JMS规范的消息传输软件(RabbitMQ、ActiveMQ、Kafka、RocketMQ),我们一般就称之为消息中间件。使用软件的目的本质上也就是为了降低消息生产者和消费者之间的耦合性。提升消息的传输效率。

在这里插入图片描述

1.1.4 消息中间件对比

特性ActiveMQRabbitMQRocketMQKafka
单机吞吐量万级,比RocketMQ,Kafka低一个数量级万级,比RocketMQ,Kafka低一个数量级10万级,支持高吞吐10万级,支持高吞吐
Topic数量对吞吐量的影响Topic可以达到几百/几千量级Topic可以达到几百量级,如果更多的话,吞吐量会大幅度下降
时效性ms级微秒级别,延迟最低ms级ms级
可用性高,基于主从架构实现高可用高,基于主从架构实现高可用非常高,分布式架构非常高,分布式架构
消息可靠性有较低的概率丢失数据基本不丢经过参数优化配置,可以做到0丢失经过参数优化配置,可以做到0丢失
功能支持MQ领域的功能极其完备并发能力强,性能极好,延时很低MQ功能较为完善,分布式,扩展性好功能较为简单,支持简单的MQ功能,在大数据领域被广泛使用
其他很早的软件,社区不是很活跃开源,稳定,社区活跃度高阿里开发,社区活跃度不高开源,高吞吐量,社区活跃度极高

通过上面各种消息中间件的对比,大概可以了解,在大数据场景中我们主要采用kafka作为消息中间件,而在JaveEE开发中我们主要采用ActiveMQ、RabbitMQ、RocketMQ作为消息中间件。如果将JavaEE和大数据在项目中进行融合的话,那么Kafka其实是一个不错的选择。

在这里插入图片描述

1.1.5 ZooKeeper

ZooKeeper是一个开放源码的分布式应用程序协调服务软件。在当前的Web软件开发中,多节点分布式的架构设计已经成为必然,那么如何保证架构中不同的节点所运行的环境,系统配置是相同的,就是一个非常重要的话题。

一般情况下,我们会采用独立的第三方软件保存分布式系统中的全局环境信息以及系统配置信息,这样系统中的每一个节点在运行时就可以从第三方软件中获取一致的数据。也就是说通过这个第三方软件来协调分布式各个节点之间的环境以及配置信息。

Kafka软件是一个分布式事件流处理平台系统,底层采用分布式的架构设计,就是说,也存在多个服务节点,多个节点之间Kafka就是采用ZooKeeper来实现协调调度的。

ZooKeeper的核心作用:

  • ZooKeeper的数据存储结构可以简单地理解为一个Tree结构,而Tree结构上的每一个节点可以用于存储数据,所以一般情况下,我们可以将分布式系统的元数据(环境信息以及系统配置信息)保存在ZooKeeper节点中
  • ZooKeeper创建数据节点时,会根据业务场景创建临时节点或永久(持久)节点。永久节点就是无论客户端是否连接上ZooKeeper都一直存在的节点,而临时节点指的是客户端连接时创建,断开连接后删除的节点。同时,ZooKeeper也提供了Watch(监控)机制用于监控节点的变化,然后通知对应的客户端进行相应的变化。Kafka软件中就内置了ZooKeeper的客户端,用于进行ZooKeeper的连接和通信。

其实,Kafka作为一个独立的分布式消息传输系统,还需要第三方软件进行节点间的协调调度,不能实现自我管理,无形中就导致Kafka和其他软件之间形成了耦合性,制约了Kafka软件的发展,所以从Kafka 2.8.X版本开始,Kafka就尝试增加了Raft算法实现节点间的协调管理,来代替ZooKeeper。不过Kafka官方不推荐此方式应用在生产环境中,计划在Kafka 4.X版本中完全移除ZooKeeper,让我们拭目以待。

在这里插入图片描述

1.2 快速上手

1.2.1 环境安装

作为开源分布式事件流处理平台,Kafka分布式软件环境的安装相对比较复杂,不利于Kafka软件的入门学习和练习。所以我们这里先搭建相对比较简单的windows单机环境,让初学者快速掌握软件的基本原理和用法,后面的课程中,我们再深入学习Kafka软件在生产环境中的安装和使用。

1.2.1.1 安装Java8(略)

当前Java软件开发中,主流的版本就是Java 8,而Kafka 3.X官方建议Java版本更新至Java11,但是Java8依然可用。未来Kafka 4.X版本会完全弃用Java8,不过,咱们当前学习的Kafka版本为3.6.1版本,所以使用Java8即可,无需升级。
Kafka的绝大数代码都是Scala语言编写的,而Scala语言本身就是基于Java语言开发的,并且由于Kafka内置了Scala语言包,所以Kafka是可以直接运行在JVM上的,无需安装其他软件。你能看到这个课件,相信你肯定已经安装Java8了,基本的环境变量也应该配置好了,所以此处安装过程省略。

1.2.1.2 安装Kafka
  • 下载软件安装包:kafka_2.12-3.6.1.tgz,下载地址:https://kafka.apache.org/downloads

  • 在这里插入图片描述

  • 在这里插入图片描述

  • 这里的3.6.1,是Kafka软件的版本。截至到2023年12月24日,Kafka最新版本为3.6.1。

  • 2.12是对应的Scala开发语言版本。Scala2.12和Java8是兼容的,所以可以直接使用。

  • tgz是一种linux系统中常见的压缩文件格式,类似与windows系统的ziprar格式。所以Windows环境中可以直接使用压缩工具进行解压缩
    在这里插入图片描述

  • 解压文件:kafka_2.12-3.6.1.tgz,解压目录为非系统盘的根目录,比如e:/

为了访问方便,可以将解压后的文件目录改为kafka, 更改后的文件目录结构如下:

binlinux系统下可执行脚本文件
bin/windowswindows系统下可执行脚本文件
config配置文件
libs依赖类库
licenses许可信息
site-docs文档
logs服务日志
1.2.1.3 启动ZooKeeper

当前版本Kafka软件内部依然依赖ZooKeeper进行多节点协调调度,所以启动Kafka软件之前,需要先启动ZooKeeper软件。不过因为Kafka软件本身内置了ZooKeeper软件,所以无需额外安装ZooKeeper软件,直接调用脚本命令启动即可。具体操作步骤如下:

  • 进入Kafka解压缩文件夹的config目录,修改zookeeper.properties配置文件
# the directory where the snapshot is stored.
# 修改dataDir配置,用于设置ZooKeeper数据存储位置,该路径如果不存在会自动创建。
dataDir=E:/kafka_2.12-3.6.1/data/zk

在这里插入图片描述

  • 打开DOS窗口,进入e:/kafka_2.12-3.6.1/bin/windows目录

  • 因为本章节演示的是Windows环境下Kafka软件的安装和使用,所以启动 ZooKeeper软件的指令为Windows环境下的bat批处理文件。调用启动指令时, 需要传递配置文件的路径

# 因为当前目录为windows,所以需要通过相对路径找到zookeeper的配置文件。
zookeeper-server-start.bat ../../config/zookeeper.properties

在这里插入图片描述

  • 出现如下界面,ZooKeeper启动成功。
    在这里插入图片描述

  • 为了操作方便,也可以在kafka解压缩后的目录中,创建脚本文件zk.cmd。

# 调用启动命令,且同时指定配置文件。
call bin/windows/zookeeper-server-start.bat config/zookeeper.properties

在这里插入图片描述

1.2.1.4 启动Kafka
  • 进入Kafka解压缩文件夹的config目录,修改server.properties配置文件
# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
# 客户端访问Kafka服务器时,默认连接的服务为本机的端口9092,如果想要改变,可以修改如下配置
# 此处我们不做任何改变,默认即可
#advertised.listeners=PLAINTEXT://your.host.name:9092

# A comma separated list of directories under which to store log files
# 配置Kafka数据的存放位置,如果文件目录不存在,会自动生成。
log.dirs=E:/kafka_2.12-3.6.1/data/kafka

在这里插入图片描述

  • 打开DOS窗口,进入e:/kafka_2.12-3.6.1/bin/windows目录

在这里插入图片描述

  • 调用启动指令,传递配置文件的路径
# 因为当前目录为windows,所以需要通过相对路径找到kafka的配置文件。 
kafka-server-start.bat ../../config/server.properties

在这里插入图片描述

  • 为了操作方便,也可以在kafka解压缩后的目录中,创建脚本文件kfk.cmd。
# 调用启动命令,且同时指定配置文件。
call bin/windows/kafka-server-start.bat config/server.properties

在这里插入图片描述

  • DOS窗口中,输入jps指令,查看当前启动的软件进程

在这里插入图片描述

这里名称为QuorumPeerMain的就是ZooKeeper软件进程,名称为Kafka的就是Kafka系统进程。此时,说明Kafka已经可以正常使用了。

在这里插入图片描述

1.2.2 消息主题

在这里插入图片描述

说的就是Topic的事情!!因为 我运行的时候 没有Topic 对于Producer 和 Consumer 都需要用到Topic 所以。。。。

在消息发布/订阅(Publish/Subscribe)模型中,为了可以让消费者对感兴趣的消息进行消费,而不是对所有的数据进行消费,包括那些不感兴趣的消息,所以定义了主题(Topic)的概念,也就是说将不同的消息进行分类,分成不同的主题(Topic),然后消息生产者在生成消息时,就会向指定的主题(Topic)中发送。而消息消费者也可以订阅自己感兴趣的主题(Topic)并从中获取消息。

有很多种方式都可以操作Kafka消息中的主题(Topic):命令行、第三方工具、Java API、自动创建。而对于初学者来讲,掌握基本的命令行操作是必要的。所以接下来,我们采用命令行进行操作。

在这里插入图片描述

1.2.2.1 创建主题
  • 启动ZooKeeper,Kafka服务进程(略)
  • 打开DOS窗口,进入e:/kafka_2.12-3.6.1/bin/windows目录

dir 一下

在这里插入图片描述

  • DOS窗口输入指令,创建主题

Kafka是通过 kafka-topics.bat 指令文件进行消息主题操作的。其中包含了对主题的查询,创建,删除等功能。

调用指令创建主题时,需要传递多个参数,而且参数的前缀为两个横线。因为参数比较多

为了演示方便,这里我们只说明必须传递的参数,其他参数后面课程中会进行讲解

  • --bootstrap-server : 把当前的DOS窗口当成Kafka的客户端,那么进行操作前,就需要连接服务器,这里的参数就表示服务器的连接方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开
  • --create : 表示对主题的创建操作,是个操作参数,后面无需增加参数值
  • --topic : 主题的名称,后面接的参数值一般就是见名知意的字符串名称,类似于java中的字符串类型标识符名称,当然也可以使用数字,只不过最后还是当成数字字符串使用。
# 指令
kafka-topics.bat --bootstrap-server localhost:9092 --create --topic test

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

1.2.2.2 查询主题
  • DOS窗口输入指令,查看所有主题

Kafka是通过kafka-topics.bat文件进行消息主题操作的。其中包含了对主题的查询,创建,删除等功能。

–bootstrap-server : 把当前的DOS窗口当成Kafka的客户端,那么进行操作前,就需要连接服务器,这里的参数就表示服务器的连接方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开

–list : 表示对所有主题的查询操作,是个操作参数,后面无需增加参数值

# 指令
kafka-topics.bat --bootstrap-server localhost:9092 --list

在这里插入图片描述

  • DOS窗口输入指令,查看指定主题信息

–bootstrap-server : 把当前的DOS窗口当成Kafka的客户端,那么进行操作前,就需要连接服务器,这里的参数就表示服务器的连接方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开

–describe : 查看主题的详细信息

–topic : 查询的主题名称


# 指令
kafka-topics.bat --bootstrap-server localhost:9092 --describe --topic test

在这里插入图片描述

在这里插入图片描述

1.2.2.3 修改主题

创建主题后,可能需要对某些参数进行修改,那么就需要使用指令进行操作。

  • DOS窗口输入指令,修改指定主题的参数

Kafka是通过kafka-topics.bat文件进行消息主题操作的。其中包含了对主题的查询,创建,删除等功能。

–bootstrap-server : 把当前的DOS窗口当成Kafka的客户端,那么进行操作前,就需要连接服务器,这里的参数就表示服务器的连接方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开

–alter : 表示对所有主题的查询操作,是个操作参数,后面无需增加参数值

–topic : 修改的主题名称

–partitions : 修改的配置参数:分区数量

# 指令
kafka-topics.bat --bootstrap-server localhost:9092 --topic test --alter --partitions 2

在这里插入图片描述

在这里插入图片描述

1.2.2.4 删除主题

如果主题创建后不需要了,或创建的主题有问题,那么我们可以通过相应的指令删除主题。

  • DOS窗口输入指令,删除指定名称的主题

Kafka是通过kafka-topics.bat文件进行消息主题操作的。其中包含了对主题的查询,创建,删除等功能。

–bootstrap-server : 把当前的DOS窗口当成Kafka的客户端,那么进行操作前,就需要连接服务器,这里的参数就表示服务器的连接方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开

–delete: 表示对主题的删除操作,是个操作参数,后面无需增加参数值。默认情况下,删除操作是逻辑删除,也就是说数据存储的文件依然存在,但是通过指令查询不出来。如果想要直接删除,需要在server.properties文件中设置参数delete.topic.enable=true

–topic : 删除的主题名称

# 指令
kafka-topics.bat --bootstrap-server localhost:9092 --topic test --delete

在这里插入图片描述

注意:windows系统中由于权限或进程锁定的问题,删除topic会导致kafka服务节点异常关闭。

在这里插入图片描述

在这里插入图片描述

请在后续的linux系统下演示此操作。

在这里插入图片描述

1.2.3 生产数据

消息主题创建好了,就可以通过Kafka客户端向Kafka服务器的主题中发送消息了。Kafka生产者客户端并不是一个独立的软件系统,而是一套API接口,只要通过接口能连接Kafka并发送数据的组件我们都可以称之为Kafka生产者。下面我们就演示几种不同的方式:

1.2.3.1 命令行操作
  • 打开DOS窗口,进入e:/kafka_2.12-3.6.1/bin/windows目录

在这里插入图片描述
在这里插入图片描述

  • DOS窗口输入指令,进入生产者控制台

Kafka是通过kafka-console-producer.bat文件进行消息生产者操作的。

调用指令时,需要传递多个参数,而且参数的前缀为两个横线,因为参数比较多。为了演示方便,这里我们只说明必须传递的参数,其他参数后面课程中会进行讲解

–bootstrap-server : 把当前的DOS窗口当成Kafka的客户端,那么进行操作前,就需要连接服务器,这里的参数就表示服务器的连接方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开。早期版本的Kafka也可以通过 --broker-list参数进行连接,当前版本已经不推荐使用了。

–topic : 主题的名称,后面接的参数值就是之前已经创建好的主题名称。

# 指令
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test

在这里插入图片描述

# 指令
kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test

在这里插入图片描述

  • 控制台生产数据

在这里插入图片描述
在这里插入图片描述

注意:这里的数据需要回车后,才能真正将数据发送到Kafka服务器。

在这里插入图片描述

1.2.3.2 工具操作

有的时候,使用命令行进行操作还是有一些麻烦,并且操作起来也不是很直观,所以我们一般会采用一些小工具进行快速访问。这里我们介绍一个kafkatool_64bit.exe工具软件。软件的安装过程比较简单,根据提示默认安装即可,这里就不进行介绍了。
在这里插入图片描述

  • 安装好以后,我们打开工具
    在这里插入图片描述

  • 点击左上角按钮File -> Add New Connection…建立连接
    在这里插入图片描述

  • 点击Test按钮测试
    在这里插入图片描述

  • 增加连接
    在这里插入图片描述
    在这里插入图片描述

  • 按照下面的步骤,生产数据
    在这里插入图片描述

  • 增加成功后,点击绿色箭头按钮进行查询,工具会显示当前数据
    在这里插入图片描述

在这里插入图片描述

1.2.3.3 Java API

一般情况下,我们也可以通过Java程序来生产数据,所以接下来,我们就演示一下IDEA中使用Kafka Java API来生产数据:

  • 创建Kafka项目
    在这里插入图片描述
    在这里插入图片描述

  • 修改pom.xml文件,增加Maven依赖
    在这里插入图片描述

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.6.1</version>
    </dependency>
</dependencies>
  • 创建 com.atguigu.kafka.test.KafkaProducerTest类
    在这里插入图片描述

  • 添加main方法,并增加生产者代码
    在这里插入图片描述
    在这里插入图片描述

package com.atguigu.kafka.test;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.HashMap;
import java.util.Map;
public class KafkaProducerTest {
    public static void main(String[] args) {
        // TODO 配置属性集合
        Map<String, Object> configMap = new HashMap<>();
        // TODO 配置属性:Kafka服务器集群地址
        configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // TODO 配置属性:Kafka生产的数据为KV对,所以在生产数据进行传输前需要分别对K,V进行对应的序列化操作
        configMap.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        configMap.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        // TODO 创建Kafka生产者对象,建立Kafka连接
        //      构造对象时,需要传递配置参数
        KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);
        // TODO 准备数据,定义泛型
        //      构造对象时需要传递 【Topic主题名称】,【Key】,【Value】三个参数
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(
                "test", "key1", "value1"
        );
        // TODO 生产(发送)数据
        producer.send(record);
        // TODO 关闭生产者连接
        producer.close();
    }
}

在这里插入图片描述

1.2.4 消费数据

消息已经通过Kafka生产者客户端发送到Kafka服务器中了。那么此时,这个消息就会暂存在Kafka中,我们也就可以通过Kafka消费者客户端对服务器指定主题的消息进行消费了。

1.2.4.1 命令行操作
  • 打开DOS窗口,进入e:/kafka_2.12-3.6.1/bin/windows目录

  • DOS窗口输入指令,进入消费者控制台

Kafka是通过kafka-console-consumer.bat文件进行消息消费者操作的。

调用指令时,需要传递多个参数,而且参数的前缀为两个横线,因为参数比较多。为了演示方便,这里我们只说明必须传递的参数,其他参数后面课程中会进行讲解

-bootstrap-server : 把当前的DOS窗口当成Kafka的客户端,那么进行操作前,就需要连接服务器,这里的参数就表示服务器的连接方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开。早期版本的Kafka也可以通过 --broker-list参数进行连接,当前版本已经不推荐使用了。

–topic : 主题的名称,后面接的参数值就是之前已经创建好的主题名称。其实这个参数并不是必须传递的参数,因为如果不传递这个参数的话,那么消费者会消费所有主题的消息。如果传递这个参数,那么消费者只能消费到指定主题的消息数据。

–from-beginning : 从第一条数据开始消费,无参数值,是一个标记参数。默认情况下,消费者客户端连接上服务器后,是不会消费到连接之前所生产的数据的。也就意味着如果生产者客户端在消费者客户端连接前已经生产了数据,那么这部分数据消费者是无法正常消费到的。所以在实际环境中,应该是先启动消费者客户端,再启动生产者客户端,保证消费数据的完整性。增加参数后,Kafka就会从第一条数据开始消费,保证消息数据的完整性。

# 指令
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

在这里插入图片描述

在这里插入图片描述

1.2.4.2 Java API

一般情况下,我们可以通过Java程序来消费(获取)数据,所以接下来,我们就演示一下IDEA中Kafka Java API如何消费数据:

  • 创建Maven项目并增加Kafka依赖

  • 创建com.atguigu.kafka.test.KafkaConsumerTest类
    在这里插入图片描述

  • 添加main方法,并增加消费者代码

在这里插入图片描述
在这里插入图片描述

package com.atguigu.kafka.test;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class KafkaConsumerTest {
    public static void main(String[] args) {
        // TODO 配置属性集合
        Map<String, Object> configMap = new HashMap<String, Object>();
        // TODO 配置属性:Kafka集群地址
        configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // TODO 配置属性: Kafka传输的数据为KV对,所以需要对获取的数据分别进行反序列化
        configMap.put(
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        configMap.put(
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        // TODO 配置属性: 读取数据的位置 ,取值为earliest(最早),latest(最晚)
        configMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        // TODO 配置属性: 消费者组
        configMap.put("group.id", "atguigu");
        // TODO 配置属性: 自动提交偏移量
        configMap.put("enable.auto.commit", "true");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configMap);
        // TODO 消费者订阅指定主题的数据
        consumer.subscribe(Collections.singletonList("test"));
        while ( true ) {
            // TODO 每隔100毫秒,抓取一次数据
            ConsumerRecords<String, String> records =
                consumer.poll(Duration.ofMillis(100));
            // TODO 打印抓取的数据
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("K = " + record.key() + ", V = " + record.value());
            }
        }
    }
}

在这里插入图片描述

我使用教学的方式有点问题,我使用了Properties

// TODO 创建配置对象
        // TODO 对生产的数据 K,V 进行序列化的操作
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
 // TODO 创建配置对象
        // TODO 对数据 K,V 进行序列化的操作

        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"atguigu");

在这里插入图片描述

1.2.5 源码关联(可选)

将源码压缩包kafka-3.6.1-src.tgz解压缩到指定位置

在这里插入图片描述

Kafka3.6.1的源码需要使用JDK17和Scala2.13进行编译才能查看,所以需要进行安装

1.2.5.1 安装Java17

(1) 再资料文件夹中双击安装包jdk-17_windows-x64_bin.exe

在这里插入图片描述

(2) 根据安装提示安装即可。

1.2.5.2 安装Scala

(1) 进入Scala官方网站https://www.scala-lang.org/下载Scala压缩包scala-2.13.12.zip。
(2) 在IDEA中安装Scala插件

在这里插入图片描述

(3) 项目配置中关联Scala就可以了
在这里插入图片描述

在这里插入图片描述

1.2.5.3 安装Gradle

(1) 进入Gradle官方网站https://gradle.org/releases/下载Gradle安装包,根据自己需要选择不同版本进行下载。下载后将Gradle文件解压到相应目录
(2) 新增系统环境GRADLE_HOME,指定gradle安装路径,并将%GRADLE_HOME%\bin添加到path中
(3) Gradle安装及环境变量配置完成之后,打开Windows的cmd命令窗口,输入gradle –version
在这里插入图片描述
(4) 在解压缩目录中打开命令行,依次执行gradle idea命令
在这里插入图片描述

(5) 在命令行中执行gradle build --exclude-task test命令
在这里插入图片描述

(6) 使用IDE工具IDEA打开该项目目录
在这里插入图片描述
在这里插入图片描述

1.2.6 总结

本章作为Kafka软件的入门章节,介绍了一些消息传输系统中的基本概念以及单机版Windows系统中Kafka软件的基本操作。如果仅从操作上,感觉Kafka和数据库的功能还是有点像的。比如:

  • 数据库可以创建表保存数据,kafka可以创建主题保存消息。
  • Java客户端程序可以通过JDBC访问数据库:保存数据、修改数据、查询数据,kafka可以通过生产者客户端生产数据,通过消费者客户端消费数据。
    从这几点来看,确实有相像的地方,但其实两者的本质并不一样:
  • 数据库的本质是为了更好的组织和管理数据,所以关注点是如何设计更好的数据模型用于保存数据,保证核心的业务数据不丢失,这样才能准确地对数据进行操作。
  • Kafka的本质是为了高效地传输数据。所以软件的侧重点是如何优化传输的过程,让数据更快,更安全地进行系统间的传输。

通过以上的介绍,你会发现,两者的区别还是很大的,不能混为一谈。接下来的章节我们会给大家详细讲解Kafka在分布式环境中是如何高效地传输数据的。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/569630.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【南京工程学院×朗汀留学】部分录取案例合集

朗汀留学 X 南京工程学院 作为深耕留学的专业资深团队&#xff0c;朗汀留学成功帮助上千名学生出国留学。 在此我们将南京工程学院的部分留学案例作以总结&#xff0c;以供新生参考。再次恭喜所有 获得理想大学offer的学生们&#xff0c;你们的努力让梦想照进现实。 学校介绍…

2024年外贸独立站建设首选:WordPress引领市场,助力企业出海

随着全球经济的不断融合与发展&#xff0c;越来越多的企业开始关注海外市场&#xff0c;希望通过建设外贸独立站来扩大品牌影响力和销售额。在众多的内容管理系统&#xff08;CMS&#xff09;中&#xff0c;WordPress以其强大的功能、丰富的插件资源和用户友好的操作界面&#…

日志框架整合SpringBoot保姆级教程+日志文件拆分(附源码)

目录 介绍 日志概述 日志文件 调试日志 系统日志 日志框架 日志框架的作用 日志框架的价值 流行的日志框架 SLF4J日志门面 介绍 环境搭建简单测试 集成log4j logback Logback简介 Logback中的组件 Logback配置文件 日志输出格式 控制台输出日志 输出日志到…

演示在一台Windows主机上运行两个Mysql服务器(端口号3306 和 3307),安装步骤详解

目录 在一台Windows主机上运行两个Mysql服务器&#xff0c;安装步骤详解因为演示需要两个 MySQL 服务器终端&#xff0c;我只有一个 3306 端口号的 MySQL 服务器&#xff0c;所以需要再创建一个 3307 的。创建一个3307端口号的MySQL服务器1、复制 mysql 的安装目录2、修改my.in…

通过Bedrock Access Gateway解决方案快速访问Amazon Bedrock的多种大语言模型

Bedrock Access Gateway&#xff08;BAG&#xff09;解决方案提供了开箱即用、兼容 OpenAI 的代理功能&#xff0c;帮助用户轻松无缝地从 OpenAI 迁移到 Amazon Bedrock。 1. 概述 亚马逊云科技的 Amazon Bedrock 服务支持一系列领先的基础模型&#xff0c;为客户提供多种选择…

SpringCloud Alibaba--nacos简介和注册中心和登录

目录 一.理论基础 二.nacos 2.1 简介 2.2 安装 三.父项目 三.生产者 3.1 配置依赖 3.2 配置文件 3.3 启动类 3.4 控制类 四.消费者 4.1 配置依赖 4.2 配置文件 4.3 启动类 4.4 feign的接口 五.效果 六.负载均衡--权重算法 6.1重启nacos 6.2 设置权重 6.3 设…

【1431】java学习网站系统Myeclipse开发mysql数据库web结构java编程计算机网页项目

一、源码特点 java 学习网站系统是一套完善的java web信息管理系统&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为TOMCAT7.0,Myeclipse8.5开发&#xff0c;数据库为Mysql5.0&…

【Django】django.core.exceptions.AppRegistryNotReady: Apps aren‘t loaded yet.

其中django后台manage.py入口程序报错&#xff0c;检索很多问题解决方案&#xff0c;这里记录下个人问题原因 1.django启动异常问题详情 django.core.exceptions.AppRegistryNotReady: Apps aren’t loaded yet. 2.问题原因 Python第三方包安装版本不一致或缺少依赖包&…

利用AI知识库,优化医保系统售后信息管理流程

在医疗行业中&#xff0c;传统知识库管理虽能整合医疗行业知识&#xff0c;但搜索和管理效率有限&#xff0c;导致医护人员难以高效利用。特别是面对医保系统等复杂系统时&#xff0c;他们常需依赖人工客服或繁琐的电子产品手册解决问题。而HelpLook AI知识库利用AI技术&#x…

拼多多面试题——力扣版测试用例纠错

最近我看到力扣上这个题目&#xff0c;用了三种方法&#xff0c;结果没有一种正确&#xff0c;我就纳闷儿了&#xff0c;为何总有一个测试用例过不了&#xff0c;结果我发现这个测试用例确实有问题啊。。。。。 题目&#xff1a; 表&#xff1a;Logs ----------------------…

Anon Network:基于 Ator Protocol 的 DePIN 匿名互联网

Anon Network正在以Ator Protocol为基础构建世界上最大的Web3隐私互联网生态&#xff0c;其旨在基于DePIN网络&#xff08;Ator protocol&#xff09;&#xff0c;通过激励体系构建一个自下而上、自我维持且可持续、不依赖于任何三方实体且完全匿名的完备互联体系。在该体系中&…

在html页面中使用Vue3和Element-Plus实现基金成本计算器

背景 周边朋友都说基金亏麻了&#xff0c;有些 “鸡” 到底部了&#xff0c;想要补个仓&#xff0c;但是又不知道要投入多少才能拉低到心里预期的成本&#xff0c;并且每只 “鸡” 都得自己输入计算&#xff0c;很麻烦&#xff0c;所以本着偷懒的原则&#xff0c;做了下面的这个…

机器学习中常见的数据分析,处理方式(以泰坦尼克号为例)

数据分析 读取数据查看数据各个参数信息查看有无空值如何填充空值一些特殊字段如何处理读取数据查看数据中的参数信息实操具体问题具体分析年龄问题 重新划分数据集如何删除含有空白值的行根据条件删除一些行查看特征和标签的相关性 读取数据 查看数据各个参数信息 查看有无空…

【Linux】实现一个进度条

我们之前也学了gcc/vim/make和makefile&#xff0c;那么我们就用它们实现一个进度条。 在实现这个进度条之前&#xff0c;我们要先简单了解一下缓冲区和回车和换行的区别 缓冲区其实就是一块内存空间&#xff0c;我们先看这样一段代码 它的现象是先立马打印&#xff0c;三秒后程…

Java Instrumentation插桩技术

Instrumentation基础 openrasp中用到了Instrumentation技术&#xff0c;它的最大作用&#xff0c;就是类的动态改变和操作。 使用Instrumentation实际上也可以可以开发一个代理来监视jvm的上运行的程序&#xff0c;可以动态的替换类的定义&#xff0c;就可以达到虚拟机级别的…

QA测试开发工程师面试题满分问答20: 软件的安全性应从哪几个方面去测试?

软件的安全性测试应从多个方面进行&#xff0c;并确保覆盖以下关键方面&#xff1a; 当回答问题时&#xff0c;可以根据自己的经验和知识&#xff0c;从上述要点中选择适合的方面进行详细说明。强调测试的综合性、全面性和持续性&#xff0c;并强调测试的重要性以及如何与开发团…

亚信安全深度参与撰写《金融行业云原生安全体系研究报告》引领云原生安全 !

近日&#xff0c;北京金融科技产业联盟正式发布《金融行业云原生安全体系研究报告》&#xff08;以下简称《报告》&#xff09;。报告由中国银联等单位牵头&#xff0c;亚信安全受邀作为联合牵头单位全程参与《报告》的讨论和撰写工作。亚信安全依托突出的云原生安全实践经验以…

windows安装nssm并将jar打包为服务

一、nssm 下载地址 二、安装nssm服务 将下载的压缩包复制到安装目录进行解压&#xff0c;解压后有两个版本 win32 和 win64&#xff0c;根据系统选择。打开系统 powershell 命令窗口&#xff0c;进入安装目录指定版本目录&#xff0c;就可以使用nssm服务了。 # 安装服务&…

揭开ChatGPT面纱(2):OpenAI主类源码概览

文章目录 〇、使用OpenAI的两个步骤一、初始化方法__init__()1.源码2.参数解析 二、提供的接口1.源码2.接口说明主要接口说明 OpenAI版本1.6.1 〇、使用OpenAI的两个步骤 在上一篇博客中&#xff0c;我实现并运行了一个OpenAI的demo&#xff0c;我们可以发现&#xff0c;想要使…

自定义mybatis插件实现读写分离

欢迎来到我的博客&#xff0c;代码的世界里&#xff0c;每一行都是一个故事 自定义mybatis插件实现读写分离 前言场景分析前置配置讲解数据源切换实现代码实现(插件)说明 注意实现效果 有时候我更想看到的是bug&#xff0c;比如做这个插件的时候 前言 在数据库的世界里&#x…
最新文章