消息队列

消息队列 MQ是构建分布式互联网应用的基础设施,通过 MQ 实现的松耦合架构设计可以提高系统可用性以及可扩展性,是适用于现代应用的优秀设计方案。

消息队列是一种异步的服务间通信方式,适用于无服务器和微服务架构。消息在被处理和删除之前一直存储在队列上。每条消息仅可被一位用户处理一次。消息队列可被用于分离重量级处理、缓冲或批处理工作以及缓解高峰期工作负载。

MQ使用场合

削峰填谷

诸如电商业务中的秒杀、抢红包、企业开门红等大型活动时皆会带来较高的流量脉冲,或因没做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,消息队列可提供削峰填谷的服务来解决该问题。

异步解耦

交易系统作为淘宝等电商的最核心的系统,每笔交易订单数据的产生会引起几百个下游业务系统的关注,包括物流、购物车、积分、流计算分析等等,整体业务系统庞大而且复杂,消息队列可实现异步通信和应用解耦,确保主站业务的连续性。

顺序收发

细数日常中需要保证顺序的应用场景非常多,例如证券交易过程时间优先原则,交易系统中的订单创建、支付、退款等流程,航班中的旅客登机消息处理等等。与先进先出FIFO(First In FirstOut)原理类似,消息队列提供的顺序消息即保证消息FIFO。

分布式事务一致性

交易系统、支付红包等场景需要确保数据的最终一致性,大量引入消息队列的分布式事务,既可以实现系统之间的解耦,又可以保证最终的数据一致性。

大数据分析

数据在“流动”中产生价值,传统数据分析大多是基于批量计算模型,而无法做到实时的数据分析,利用消息队列与流式计算引擎相结合,可以很方便的实现业务数据的实时分析。

分布式缓存同步

电商的大促,各个分会场琳琅满目的商品需要实时感知价格变化,大量并发访问数据库导致会场页面响应时间长,集中式缓存因带宽瓶颈,限制了商品变更的访问流量,通过消息队列构建分布式缓存,实时通知商品数据的变化

蓄流压测

线上有些链路不方便做压力测试,可以通过堆积一定量消息再放开来压测

Kafka

Kafka 被称为下一代分布式消息系统,由 Scala 和 Java编写,是非营利性组织ASF(Apache Software Foundation)基金会中的一个开源项目,比如:HTTP Server、Tomcat、Hadoop、ActiveMQ等开源软件都属于 Apache基金会的开源软件,类似的消息系统还有RabbitMQ、ActiveMQ、ZeroMQ。
Kafka用于构建实时数据管道和流应用程序。 它具有水平可伸缩性,容错性,快速性,可在数千家组织中同时投入生产协同工作。

官网: http://kafka.apache.org

Kafka的特点和优缺点

特点

  • 分布式: 多机实现,不允许单机
  • 分区: 一个消息.可以拆分出多个,分别存储在多个位置
  • 多副本: 防止信息丢失,可以多来几个备份
  • 多订阅者: 可以有很多应用连接kafka
  • Zookeeper: 早期版本的Kafka依赖于zookeeper, 2021年4月19日Kafka 2.8.0正式发布,此版本包括了很多重要改动,最主要的是kafka通过自我管理的仲裁替代ZooKeeper,即Kafka将不再需要ZooKeeper!

优势

  • Kafka 通过 O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以 TB 级别以上的消息存储也能够保持长时间的稳定性能。
  • 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。支持通过Kafka 服务器分区消息。
  • 分布式: Kafka 基于分布式集群实现高可用的容错机制,可以实现自动的故障转移
  • 顺序保证:在大多数使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。 Kafka保证一个Partiton内的消息的有序性(分区间数据是无序的,如果对数据的顺序有要求,应将在创建主题时将分区数partitions设置为1)
  • 支持 Hadoop 并行数据加载
  • 通常用于大数据场合,传递单条消息比较大,而Rabbitmq 消息主要是传输业务的指令数据,单条数据较小
  • 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
  • 高并发:支持数千个客户端同时读写

缺点

  • Kafka 这种异步批量的设计带来的问题是,它的同步收发消息的响应时延比较高,因为当客户端发送一条消息的时候,Kafka 并不会立即发送出去,而是要等一会儿攒一批再发送,在它的 Broker 中,很多地方都会使用这种“先攒一波再一起处理”的设计。当你的业务场景中,每秒钟消息数量没有那么多的时候,Kafka 的时延反而会比较高。所以,Kafka 不太适合在线业务场景。

Kafka角色和流程

  • Producer:Producer即生产者,消息的产生者,是消息的入口。负责发布消息到Kafka broker。
  • Consumer:消费者,用于消费消息,即处理消息
  • Consumer group: 每个consumer 属于一个特定的consumer group(可为每个consumer 指定 groupname,若不指定 group name 则属于默认的group),使用 consumer high level API 时,同一topic的一条消息只能被同一个consumer group 内的一个consumer 消费,但多个consumer group 可同时消费这一消息。
  • Broker:Broker是kafka实例,每个服务器上可以有一个或多个kafka的实例,假设每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的编号,如: broker-0、broker-1等……
  • Topic :消息的主题,可以理解为消息的分类,相当于Redis的Key和ES中的索引,kafka的数据就保存在topic。在每个broker上都可以创建多个topic。物理上不同 topic 的消息分开存储在不同的文件夹,逻辑上一个 topic的消息虽然保存于一个或多个broker 上, 但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处,topic 在逻辑上对record(记录、日志)进行分组保存,消费者需要订阅相应的topic 才能消费topic中的消息。
  • Replication: 同样数据的副本,包括leader和follower的副本数,基本于数据安全,建议至少2个,是Kafka的高可靠性的保障,和ES不同的,ES中的副本数不包括主分片数
  • Partition :是物理上的概念,每个topic 分割为一个或多个partition,即一个topic切分为多份。创建topic时可指定 parition 数量,partition的表现形式就是一个一个的文件夹,该文件夹下存储该partition的数据和索引文件,分区的作用还可以实现负载均衡,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,一般Partition数不要超过节点数据,注意同一个partition数据是有顺序的,但不同的partition则是无序的。为了实现数据的高可用,比如将分区 0 的数据分散到不同的kafka 节点,每一个分区都有一个 broker 作为 Leader 和一个 broker 作为Follower,类似于ES中的主分片和副本分片,
  • AR: Assigned Replicas,分区中的所有副本的统称,AR= lSR+ OSR
  • lSR:ln Sync Replicas,所有与leader副本保持同步的副本 follower和leader本身组成的集合,是AR的子集
  • OSR:out-of-Sync Replied,所有与leader副本同步不能同步的 follower的集合,是AR的子集

分区的优势:

  • ​ 实现存储空间的横向扩容,即将多个kafka服务器的空间结合利用
  • ​ 提升性能,多服务器读写
  • ​ 实现高可用,分区leader 分布在不同的kafka 服务器,假设分区因子为 3, 分区 0 的leader为服务器A,则服务器 B 和服务器 C 为 A 的follower,而分区 1 的leader为服务器B,则服务器 A 和C 为服务器B 的follower,而分区 2 的leader 为C,则服务器A 和 B 为C 的follower。

Kafka部署

官方文档: https://kafka.apache.org/quickstart

官网各版本下载:https://archive.apache.org/dist/kafka/

kafka版本格式

kafka_<scala 版本>_<kafka 版本>
#示例:kafka_2.13-2.7.0.tgz

环境准备

#三台CentOS或者Ubuntu服务器,复用三个zookeeper节点,部署zookeeper步骤参见zookeeper文章,示例版本依然需要安装zookeeperzookeeper-node1.haoge.org 192.168.0.101
zookeeper-node2.haoge.org 192.168.0.102
zookeeper-node3.haoge.org 192.168.0.103
#在三个节点都安装JDK,要求8以上CentOS使用如下命令:
yum -y install  java-1.8.0-openjdk
#在三个节点都安装JDKUbuntu使用如下命令:
apt update
apt -y install openjdk-8-jdk

开始部署

# 部署示例节点为
1、左右节点安装jdk,要求8以上
apt install openjdk-8-jdk -y (Ubuntu系统) 或者  yum install java-1.8.0-openjdk (CentOS系统)
2、所有节点下载软件包
官网下载
wget https://archive.apache.org/dist/kafka/2.7.0/kafka_2.13-2.7.0.tgz
3、所有节点执行解压到/usr/local目录并创建软连接
tar xf kafka_2.13-2.7.0.tgz -C /usr/local/
ln -s /usr/local/kafka_2.13-2.7.0/ /usr/local/kafka
4、所有节点配置环境变量
echo 'PATH=/usr/local/kafka/bin:$PATH' > /etc/profile.d/kafka.sh
source /etc/profile.d/kafka.sh
5、第一个节点修改配置文件
vim /usr/local/kafka/config/server.properties
broker.id=1 #每个broker在集群中每个节点的正整数唯一标识,此值保存在log.dirs下的
meta.properties文件
listeners=PLAINTEXT://192.168.0.101:9092 #指定当前主机的IP做为监听地址,注意:不支持0.0.0.0
log.dirs=/usr/local/kafka/data #kakfa用于保存数据的目录,所有的消息都会存储在该目录当中
num.partitions=1 #设置创建新的topic时默认分区数量,建议和kafka的节点数量一致
default.replication.factor=3 #指定默认的副本数为3,可以实现故障的自动转移
log.retention.hours=168 #设置kafka中消息保留时间,默认为168小时即7天
zookeeper.connect=192.168.0.101:2181,192.168.0.102:2181,192.168.0.103:2181 #指定连接的zk的地址,zk中存储了broker的元数据信息
zookeeper.connection.timeout.ms=6000 #设置连接zookeeper的超时时间,单位为ms,默认6秒钟
6、三个节点都创建数据目录
mkdir /usr/local/kafka/data
7、将第一个节点的配置文件分别发送给另外两个节点
scp /usr/local/kafka/config/server.properties 192.168.0.102:/usr/local/kafka/config
scp /usr/local/kafka/config/server.properties 192.168.0.103:/usr/local/kafka/config
8、修改第二个节点的配置
vim /usr/local/kafka/config/server.properties
broker.id=2 #每个broker 在集群中的唯一标识,正整数。
listeners=PLAINTEXT://192.168.0.102:9092 #指定当前主机的IP做为监听地址,注意:不支持0.0.0.0
9、修改第三个节点的配置
vim /usr/local/kafka/config/server.properties
broker.id=3 #每个broker 在集群中的唯一标识,正整数。
listeners=PLAINTEXT://192.168.0.103:9092 #指定当前主机的IP做为监听地址,注意:不支持0.0.0.0
10、如果需要调整服务内存,可以修改改文件内的内存配置
vim /usr/local/kafka/bin/kafka-server-start.sh
if[ " x$KAFKA_HEAP_OPTS"="x"] ; then
  export KAFKA_HEAP_OPTS=" -Xmx1G-Xms1G"  #可以调整内存
fi
11、三个节点都执行命令启动服务
kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
12、查看端口或服务日志 确认服务启动正常
ss -tnl|grep 9092
tail -f /usr/local/kafka/logs/server.log

Kafka命令使用示例

#创建 名为yuhao的topic
kafka-topics.sh --create --topic yuhao --bootstrap-server 192.168.0.101:9092 --partitions 3 --replication-factor 2
#可在各节点上观察生成的相关数据
ls /usr/local/kafka/data/
# 旧版本的kafka创建topic命令
kafka-topics.sh --create --zookeeper 192.168.0.101:2181,192.168.0.102:2181,192.168.0.103:2181 --partitions 3 --replication-factor 2 --topic yuhao
# 新版获取所有topic命令
kafka-topics.sh --list --bootstrap-server 192.168.0.101:9092
# 旧版获取所有topic命令
kafka-topics.sh --list --zookeeper 192.168.0.101:2181,192.168.0.102:2181,192.168.0.103:2181
# 验证 Topic 详情
状态说明:yuhao 有三个分区分别为0、1、2,分区0的leader是3 (broker.id),分区 0 有2 个副本,并且状态都为 Isr(ln-sync,表示可以参加选举成为leader)
kafka-topics.sh --describe --bootstrap-server 192.168.0.101:9092 --topic yuhao

Topic: yuhao    PartitionCount: 3       ReplicationFactor: 2    Configs: segment.bytes=1073741824
        Topic: yuhao    Partition: 0    Leader: 3       Replicas: 3,1   Isr: 3,1
        Topic: yuhao    Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: yuhao    Partition: 2    Leader: 2       Replicas: 2,3   Isr: 2,3

# 旧版命令
kafka-topics.sh --describe --zookeeper 192.168.0.101:2181,192.168.0.102:2181,192.168.0.103:2181 --topic yuhao

# 发送消息命令,交互式输入消息,按Ctrl+C退出
kafka-console-producer.sh --broker-list 192.168.0.101:9092,192.168.0.102:9092,192.168.0.103:9092 --topic yuhao
>test1
>test2
>test3

# 按下面的方式也可以
kafka-console-producer.sh --topic yuhao --bootstrap-server 192.168.0.101:9092

# 消费topic命令,交互式持续接收消息,按Ctrl+C退出
kafka-console-consumer.sh --bootstrap-server 192.168.0.101:9092 --topic yuhao --from-beginning
test1
test2
test3

# 删除Topic
kafka-topics.sh --delete --bootstrap-server 192.168.0.101:9092 --topic yuhao
# 旧版删除Topic
kafka-topics.sh --delete --zookeeper 192.168.0.101:2181,192.168.0.102:2181,192.168.0.103:2181 --topic yuhao

Kafka图形工具 Offset Explorer(Kafka Tool)

Offset Explorer ,旧称Kafka Tool,工具是一个 GUI 应用程序,用于管理和使用 Apache Kafka 群集。它提供了一个直观的 UI,允许人们快速查看 Kafka 群集中的对象以及存储在群集主题中的消息。它包含面向开发人员和管理员的功能。一些关键功能包括

  • 快速查看您的所有 Kafka 集群,包括主题和消费者
  • 查看分区中邮件的内容并添加新邮件
  • 以漂亮的打印格式显示 JSON和 XML 消息
  • 添加和删除主题以及其他管理功能
  • 将单个邮件从分区保存到本地硬盘驱动器
  • 编写自己的插件,允许您查看自定义数据格式
  • Kafka 工具在Windows、Linux 和 Mac 操作系统上运行

官网: https://www.kafkatool.com/

下载地址:https://www.kafkatool.com/download.html

作者:于浩  创建时间:2023-02-07 14:59
最后编辑:于浩  更新时间:2024-02-06 11:18