L

[labuladong学习笔记] Kafka

RoLingG 其他 2024-03-27

Kafka

我们先来了解 Kafka 是什么:Kafka 是一种基于发布/订阅的分布式消息系统。

请输入图片描述

从图上可以看出 Kafka 的作用是将上游的数据收集起来再分发给下游区域。

Kafka 是一个分布式流处理平台,具有以下几个优点:

  1. 高吞吐量:Kafka 能够处理大规模数据流,并具有很高的吞吐量。它可以有效地处理成千上万的消息并保持低延迟。
  2. 可扩展性:Kafka 是分布式的,允许用户简单地水平扩展集群,以满足不断增长的数据需求。通过添加更多的节点,可以实现更大规模的数据处理。
  3. 持久性:Kafka 提供了可配置的数据持久化机制,消息被持久化在磁盘上,即使消费者处理消息失败,消息仍然安全地存储在 Kafka 中。
  4. 容错性:Kafka 集群具有容错性,能够自动处理节点故障,并保证数据不会丢失。副本机制确保数据的可靠性和容错能力。
  5. 灵活性:Kafka 可以用于多种不同的数据处理场景,包括日志聚合、事件处理、指标收集等。它支持多种数据格式和处理方式,能够适应不同的业务需求。
  6. 流处理支持:除了消息队列功能,Kafka 还提供了流处理功能,支持实时数据处理和分析。通过 Kafka Streams 或其他流处理框架,可以对数据流进行实时处理和转换。
  7. 社区活跃:作为一个开源项目,Kafka 拥有庞大的社区支持,持续有新功能和改进推出,可以从社区中获取丰富的资源和支持。
  8. 顺序IOKafka 的存储介质是 HDD ,价格便宜,且通过顺序读写的方式使得即便使用的是 HDD 相比于 SSD 在保持着大容量的同时也拥有着不错的读写速度,使其所消耗的成本降低。

    具体来说:

    Kafka 的存储介质是 HDD ,也就是机械硬盘,这玩意相比内存来说进行读写操作速度可慢的不是一点两点(主要因为一个是磁头物理读写,一个是电信号读写,由此造成了差距)。

    另外磁盘还有两种读写模式,一种是随机读写,另一种是顺序读写。随机读写也就意味着磁盘的磁头要多次移动位置进行操作,这样损耗的非读写时间就会比较长,所以不用怎么移动磁头的顺序读写相比于随机读写来说就要快很多。

    Kafka 自身内部的数据结构也有一个叫 append-only log 的一个概念,这个数据结构是基于顺序读写而成的,它可以在写数据时将 data source 写到顺序读写相应数据的后面

    image-20240327100343903

    也就是说,因为顺序读写的原因,使得其即便使用的是速度较慢的 HDD,也能有着可观的读写速度,要知道 SSD 快但是比 HDD 贵的不是一点半点,同价位来说作为搭载 Kafka 的系统,存储量大速度尚可而且便宜肯定要比存储量小但是速度快而且昂贵所消耗的物理成本要小很多(毕竟实际系统集群插的 SSD 肯定不是按个位数算的) 。而且 HDDSSD 数据留存时间长很多。

  9. 零拷贝Kafka 需要将网络的数据写入机械硬盘中,这个过程中就拿传统方式和 Kafka 常用用的方式进行比对:

    使用传统的复制方式:数据从磁盘加载到操作系统的缓存 → 操作系统缓存复制到 Kafka → Kafka 复制到套接字缓冲区 → 套接字缓冲区复制到网卡缓冲区 → 通过网络发给消费者。

    Kafka 优化过之后的复制方式:数据从磁盘加载到操作系统 → Kafka 通过 sendfile() 函数网卡缓冲区 → 通过网络发送给消费者。

    根据现代网卡来说,这种零拷贝方法是通过 DMA(直接内存访问) 来实现的。通过直接内存访问去拷贝,就不需要通过 CPU 来进行操作(这句话有歧义),整体效率就高了一些。

    其他说法:

    零拷贝就是一个 DMA 搬运的缓冲区被另一个缓冲区引用,引用的过程虽然也是要CPU来执行程序,但是数据量比直接搬运数据小很多。

    零拷贝只是省去了从内核态拷贝用户态,再拷贝到网卡缓冲区的这些中间步骤,跟 mmap 类似,映射引用以后,既解决了内核态和用户态之间屏蔽问题,又省去了中间这些数据拷贝步骤。

    DMA 并非不和 CPU 沟通,他只是不进入 CPU 核心(不会用 CPU 核心去进行 IO 和计算),但是他会借用 CPU 的 bus 进行 IO ,也正是因为借用 Bus ,所以他的速度才快,你可以理解为 CPU 和 PCIE 通道直连,DMA 可以走 PCIE借用 CPU bus 进行 IO,这纯纯是吃了硬件红利。

    不占用 CPU 时间也是错误说法,他用了 bus 就已经挤占了 CPU 分片时间,别忘了 bus本身也是 CPU 资源,并不是核心才是 CPU 资源,还有 DMA 不是什么独立存在,一个需要和 CPU bus 交互才能起效的东西能称之为独立?独立的意思是数据流经过他一个就能把活儿干完,DMA 明显做不到,在集成系统里谈独立根本上就是虚妄。

但上面的持久性、容错性、灵活性其实都还有待提高。下面就通过 Apache Pulsar 这个号称是下一代云原生分布式消息流平台来分析 Kafka 的不足之处。

消息队列

Kafka 中一项重要的技术就是消息队列。首先,我们要搞清楚消息队列是什么,以及我们为什么需要消息队列。

GPT

消息队列是一种在软件系统中用于通信、解耦和异步处理的机制。它允许应用程序之间通过发送、接收和处理消息来进行通信,而无需立即相互交互。消息队列通常被用于以下几个方面:

  1. 解耦应用程序:消息队列可以帮助将应用程序解耦,使得生产者和消费者之间不需要直接相互通信。生产者将消息发送到队列中,而消费者则从队列中获取消息进行处理,这样生产者和消费者可以独立地进行操作。
  2. 异步通信:消息队列允许发送方发送消息并继续进行其他工作,而不需要等待接收方立即处理消息。这种异步通信方式可以提高系统的响应性能和吞吐量。
  3. 削峰填谷:在系统负载剧增时,消息队列可以帮助平滑处理流量峰值,将突发的大量请求暂存起来,再逐渐处理,以避免系统过载。
  4. 消息持久化:消息队列通常会保证消息的持久性,即使接收方暂时不可用,消息也不会丢失,确保消息可靠地传递到消费者。
  5. 事件驱动架构:消息队列常用于构建事件驱动的架构,通过消息发布和订阅模式来实现事件的传播和处理。

消息队列它的作用就是让两个服务之间的通信变得易操作、易管理

我们知道,想让两个主机相互通信最简单的就是让这两个主机相互建立通信就行了。但是如果我们把对象扩大,变成两个服务,这样的话主机可就不只有两台了。想象一下,所有的生产数据的服务和消费数据的服务都相互建立连接,那么系统内越来越多的主机,就会变成像蜘蛛网一样错综复杂,理都理不清。而且维护的时候,发现整个系统的所有主机可能都有相连接,想去修改一些东西,会牵扯很多别的根本不想干涉到的东西,这就很脑阔痛。

这时候,开发者们就想了个在系统设计常用的方法:加中间层去顶替直接相连的关系。于是消息队列就成为了这个中间层,消息的产生者不关心消费消息的是谁,它只需要把消息一股脑丢到消息队列里面,消费者会从消息队列里面消费数据。

其实中间层这个想法就是类似将一个大问题通过它解构成数个小一些的问题,原本复杂的部分被解构之后见会变得更易懂一点,而且是分开的小问题解决的速度也要比大问题快很多,这里称这种操作方法为 解耦

img

labuladong 原话里提到:细分一下,消费模型又分两种:

1、点对点模式,也叫队列模式。即每条消息只会被一个消费者消费。

2、发布订阅(Pub/Sub)模式。发送到某个 Topic 的消息,会分发给所有订阅该 Topic 的消费者进行消费。

一个成熟的消息队列应该同时支持上述两种消费模型。

另外,你总不能让生产者的消息「阅后即焚」吧,所以消息队列应该有自己的持久化存储系统,能够把消息存储下来,方便后续的回溯、分析等操作。

综上,消息队列在整个系统中的主要作用就是:

1、解耦。使得服务之间的拓扑关系简单了很多。

2、削峰/异步化。消息队列可以把大量不需要实时处理的数据暂存下来,等待消费者慢慢消费。

所以最顶上对 Kafka 的描述就有说到过持久性,为的就是将消息存储下来方便日后操作。

回来讲前面提到的这个问题:

但上面的持久性、容错性、可拓展性其实都还有待提高。

消息队列中间件看似消除了服务之间的互相依赖,但说到底其实是【让所有服务之间相互依赖 】转换成→ 【让所有服务都依赖消息队列】,这也就意味着如果消息队列突然坏掉,那就全完蛋了。

所以我们对这个消息队列本身的要求就非常高,具体来说有几方面:

1、高性能。消息队列作为整个系统的枢纽,它的性能必须足够高,否则很可能成为整个系统的性能瓶颈。

2、高可用。说白了就是要抗揍,如果消息队列集群中的少部分节点由于种种原因去世了,也不能影响整个集群的服务。

3、数据可靠性。在各种极端情况下(比如突然断网、突然断电),要保证已经收到的消息成功储存(一般是指落到磁盘中)。

4、可扩展。业务是发展的,如果消息队列集群快扛不住计算压力了,就需要更多的计算资源(扩容);如果消息队列集群压力很小,导致很多节点搁那打酱油,那么需要回收计算资源(缩容)。这就需要消息队列在设计时就考虑如何进行灵活的扩缩容。

让我们细数 Kafka 的罪恶吧!让我先引用一下 labuladong 的说法:

首先说说 Kafka 的架构设计。

producer 和 customer 可以选定 Kafka 的某些 topic 中投递和消费消息,但 topic 其实只是个逻辑概念,topic 下面分为多个 partition,消息是真正存在 partition 中的:

img

每个 partition 会分配给一个 broker 节点管理:

img

所谓 broker 节点,就是一个服务进程。简单来说,你把一个 broker 节点理解为一台服务器,把 partition 理解为这台服务器上的一个文件就行了

发到 topic 的消息实际上是发给了某个 broker 服务器,然后被持久化存储到一个文件里,我们一般称这个文件是 log file。

那么为什么要给一个 topic 分配多个 partition 呢?

很显然,如果一个 topic 只有一个 partition,那么也就只能有一台 broker 服务器处理这个 topic 上的消息,如果划分成很多 partition,就可以把这个 topic 上的消息分配到多台 broker 的 partition 上,每个 broker 处理消息并将消息持久化存储到 log file 中,从而提高单 topic 的数据处理能力。

但问题是怎么保证高可用?如果某个 broker 节点挂了,对应的 partition 上的数据不就就无法访问了吗?

一般都是通过「数据冗余」和「故障自动恢复」来保证高可用,Kafka 会对每个 partition 维护若干冗余副本:

img

若干个 partition 副本中,有一个 leader 副本(图中红色的),其余都是 follower 副本(也可称为 replica,图中橙色的)。

leader 副本负责向生产者和消费者提供订阅发布服务,负责持久化存储消息,同时还要把最新的消息同步给所有 follower,让 follower 小弟们和自己存储的数据尽可能相同。

这样的话,如果 leader 副本挂了,就能从 follower 中选取一个副本作为新的 leader,继续对外提供服务。

这就是 Kafka 的设计,一切看起来很完美,是吧?实际上并不完美。

我们从上面也可以看出 Kafka 其实也有一些设计缺陷:

broker 和 partition 绑定

  1. Kafka 把 broker 和 partition 绑定在了一起,这就会有一些问题产生。

    要知道 Kafka 的很多操作都涉及 partition 数据的全量复制。如果我们的 broker-1 容量不够了,我们要去添加一个 broker-3 去分担一些 broker-1 的压力。

    这时候,你会发现,并不是简单的就将 broker-1 的部分移动过去,而是复制 broker-1 中 partition 的全量数据

    这就是个很大的问题,如果 broker-1 一边被写入,那你 broker-3 就要跟着不断地复制新内容到它自己上面,什么时候复制完,broker3 才能上线提供服务。这样既在不断地消耗 IO 以及网络资源,如果复制数据的速度小于 partition 的写入速度,那 broker3 永远都别想复制完了。

  2. 再有个问题就是,如果有个 partition 新增一个 follower 副本,那么这个新增的 follower 副本必须要跟 leader 副本同步全量数据。毕竟 follower 存在的目的就是随时替代 leader,所以复制 leader 的全量数据是必须的。
  3. 除此之外,因为 broker 要负责存储,所以整个集群的容量可能局限于存储能力最差的那个 broker 节点。而且如果某些 partition 中的数据特别多(数据倾斜),那么对应 broker 的磁盘可能很快被写满,这又要涉及到 partition 的迁移,数据复制在所难免。

    意思大概就是能力差的broker复制数据的时候会很慢,占用资源但是效率低下。某些 partition 中的数据特别多的时候,因为数据迁移不方便的问题会导致对应 broker 的磁盘可能很快被写满,这样又要全量复制大量数据,效率低。因为 Kafka 数据迁移就要牵扯到全量复制,需要将分区中的所有数据都复制到新的位置上。(相当于把东西全部搬到另一个地方,空出一片新的区域,而不是搬掉部分东西,留够能放的空间刚好放进去就行的那种)

所以 labuladong 后面有一句总结的话:虽然 Kafka 提供了现成的脚本来做这些事情,但实际需要考虑的问题比较多,操作也比较复杂,数据迁移也很耗时,远远做不到集群的「平滑扩容」。

Kafka 底层依赖操作系统的 Page Cache

Linux 文件系统会利用 Page Cache 机制优化性能。Page Cache 说白了就是读写缓存,Linux 告诉你写入成功,但实际上数据并没有真的写进磁盘里,而是写到了 Page Cache 缓存里,可能要过一会儿才会被真正写入磁盘。这里面就有一个时间差,当数据还在 Page Cache 缓存没有落盘的时候机器突然断电,缓存中的数据就会永远丢失。所以 Kafka 消息持久化利用了 Page Cache 提高了性能,但是并不可靠,可能丢消息。

接下来我们分析另外一个有关消息消费的问题:

消费者消费数据的情况,主要有两种可能:一种叫追尾读(Tailing Reads),一种叫追赶读(Catch-up Reads)

追尾读,顾名思义,就是消费者的消费速度比较快,生产者刚生产一条消息,消费者立刻就把它消费了。我们可以想象一下这种情况 broker 底层是如何处理的:

生产者写入消息,broker 把消息写入 Page Cache 写缓存,然后消费者马上就来读消息,那么 broker 就可以快速地从 Page Cache 里面读取这条消息发给消费者,这挺好,没毛病。

所谓追赶读的场景,就是消费者的消费速度比较慢,生产者已经生产了很多新消息了,但消费者还在读取比较旧的数据。

这种情况下,Page Cache 缓存里没有消费者想读的老数据,那么 broker 就不得不从磁盘中读取数据并存储在 Page Cache 读缓存。

注意此时读写都依赖 Page Cache,所以读写操作可能会互相影响,对一个 partition 的大量读可能影响到写入性能,大量写也会影响读取性能,而且读写缓存会互相争用内存资源,可能造成 IO 性能抖动。

再进一步分析,因为每个 partition 都可以理解为 broker 节点上的一个文件,那么如果 partition 的数量特别多,一个 broker 就需要同时对很多文件进行大量读写操作,这样性能就框框的往下降。

中译中:Page Cache 缓存里没有消费者想读的老数据,broker 只能从磁盘中读取数据并存储在 Page Cache 读缓存。一直没有想要的数据就一直从磁盘中读取到缓存中。要知道读写磁盘的IO速度可比缓存慢多了,如果 partition 的数量特别多,就得大量读写大量IO操作,性能也就在猛猛下降。

让我们看看Pulsar 是如何解决 Kafka 的这些问题的:

首先,Kafka broker 的扩容都会涉及 partition 数据的迁移,这是因为 Kafka 使用的是传统的单层架构,broker 需要同时进行计算(向生产者和消费者提供服务)和存储(消息的持久化)。

Pulsar 的优化方法就是多一层 BookKeeper 层去进行存储,将原来 broker 的工作变成只计算不存储。

img

有了存算分离架构,Pulsar 的 partition 在 broker 之间的迁移完全不会涉及数据复制,所以可以迅速完成。

不会设计数据复制的原因是 partition 在 broker 里只是一个操作符,也就是一个指针,没有实质上的数据内容,也就不存在全量复制数据。

在 Kafka 中,我们可以把每个 partition 理解成一个存储消息的大文件,所以在 broker 间转移 partition 需要复制数据,异常麻烦。

而在 Pulsar 中,我们可以把每个 partition 理解成一个文件描述符,broker 只需持有这个文件描述符即可,把对数据的处理全部甩给存储引擎 Bookkeeper 去做。

上面这也是揭示了为什么 Pulsar 数据迁移比 Kafka 快的主要原因。

如果 Pulsar 的某个 broker 节点的压力特别大,那你增加 broker 节点去分担一些 partition 就行;类似的,如果某个 broker 节点突然坏了,那你直接把这个 broker 节点管理的 partition 转移到别的 broker 就行了,这些操作完全不涉及数据复制

进一步,由于 Pulsar 中的 broker 是无状态的,所以很容易借助 k8s 这样的基础设施实现弹性扩缩容。

经过这一波操作,broker 把数据存储的关键任务甩给了存储层(也就是 Bookkeeper ),那么 Bookkeeper 是如何提供高可用、数据可靠性、高性能、可扩展的特性呢?

Pulsar采用了节点对等架构,优化了高可用性问题

Bookkeeper 本身就是一个分布式日志存储系统,Kafka 使用主从复制的方式实现高可用;而 Bookkeeper 采用 Quorum 机制实现高可用

Quorum 机制的核心思想是通过多数派的原则来做出决策,确保在系统中的节点出现故障或网络分区的情况下,仍然能够保持一致性。

Bookkeeper 集群是由若干 bookie 节点(运行着 bookie 进程的服务器)组成的,不过和 Kafka 的主从复制机制不同,这些 bookie 节点都是对等的,没有主从的关系。

img

当 broker 要求 Bookkeeper 集群存储一条消息(entry)时,这条消息会被并发地同时写入多个 bookie 节点进行存储:

img

之后的消息会以滚动的方式选取不同的 bookie 节点进行写入:

img

这种写入方式称为「条带化写入」,既实现了数据的冗余存储,又使得数据能够均匀分布在多个 bookie 存储节点上,从而避免数据倾斜某个存储节点压力过大。

这也就意味着如果一个 bookie 快满了装不下新进来的 entry ,那么条带化写入能将 entry 存进别的够空间的 bookie 中,而不是 Kafka 那样整个 broker 数据迁移来放这个大 entry。

因为节点对等,所以 bookie 节点可以进行快速故障恢复和扩容。(就不会像 Kafka 为了避免坏一个 broker 就得全复制数据进而来当做副本;Pulsar 的每个 bookie 都可能成为副本,想要的数据去对应的 bookie 拿就行了。而且扩容也不用全复制,只要存入 bookie 即可)

比方说 entry0 ~ entry99 都成功写入到了 bookie1, bookie2, bookie3 中,写 entry100bookie2 突然坏掉了,那么直接加入一个新的 bookie4 节点接替 bookie2 的工作就行了。

那肯定有读者疑惑,新增进来的 bookie4 难道不需要先复制 bookie2 的数据吗(像 Kafka broker 节点那样)?

主从复制的架构才需要数据复制,因为从节点必须保证和主节点完全相同,以便随时接替主节点。而节点对等的架构是不需要数据复制的

Bookkeeper 中维护了类似这样的一组元数据:

[bookie1, bookie2, bookie3], 0
[bookie1, bookie3, bookie4], 100

这组元数据的含义是:entry0 ~ entry99 都写到了 bookie1, bookie2, bookie3 中,entry100 及之后的消息都写到了 bookie1, bookie3, bookie4 中。

有了这组元数据,我们知道每条 entry 具体存在那些 bookie 节点里面,即便 bookie2 节点坏了,这不是还有 bookie1, bookie3 节点可以读取嘛。

扩容也是类似的,可以直接添加新的 bookie 节点增加存储能力,完全不需要数据复制。

对比来看,Kafka 是以 partition 为单位存储在 broker 中的:

img

Bookkeeper 这边压根没有 partition 的概念,而是以 entry(消息)为单位进行存储,某个 partition 中的数据会被打散在多个 bookie 节点中:

img

之所以能够打散在多个 bookie 中是因为 entry 里有对应 partition 的数据信息,所以要找某个 partition 的消息时只需要筛选数据信息就能找到了

Pulsar通过读写隔离,优化了可靠性和高性能问题

bookie 节点实现读写隔离,自己维护缓存,不再依赖操作系统的 Page Cache,保证了数据可靠性和高性能

之前说到 Kafka 完全依赖 Page Cache 产生的一些问题,而 Bookkeeper 集群中的 bookie 存储节点采用了读写隔离的架构:

img

每个 bookie 节点都拥有两块磁盘,其中 Journal 磁盘专门用于写入数据,Entry Log 磁盘专门用于读取数据,而 memtable 是 bookie 节点自行维护的读写缓存。

其中 Journal 盘的写入不依赖 Page Cache,直接强制刷盘(可配置),写入完成后 bookie 节点就会返回 ACK 写入成功。

写 Journal 盘的同时,数据还会在 memotable 缓存中写一份,memtable 会对数据进行排序一段时间后刷入 Entry Log 盘

这样不仅多了一层缓存,而且 Entry Log 盘中的数据有一定的有序性,在读取数据时可以一定程度上提高性能。

这样设计的缺点是一份数据要存两次,消耗磁盘空间,但优势也很明显:

1、保证可靠性,不会丢数据。因为 Journal 落盘后才判定为写入成功,那么即使机器断电,数据也不会丢失。

2、数据读写不依赖操作系统的 Page Cache,即便读写压力较大,也可以保证稳定的性能。

3、可以灵活配置。因为 Journal 盘的数据可以定时迁出,所以可以采用存储空间较小但写入速度快的存储设备来提高写入性能。(这算是物理上的拓展优势)

Kafka 的设计中并没有类似的数据定时迁出功能。Kafka 使用的是顺序写入的方式将消息追加到日志文件中,这在一定程度上限制了对存储设备类型的选择。因为 Kafka 需要将数据直接写入持久化存储中,所以对于写入速度较快的需求,通常需要选择成本较高的高性能存储设备,如 SSD。(总的来说就是用Kafka的 SSD 满了就得定时清数据或者是拔下来换块新的)

Pulsar 的 Journal 盘会将消息先缓存在较快速的存储设备(如 SSD),然后定期将数据迁移到持久化存储中(如 HDD)。这样一来,Pulsar 能够兼顾写入性能和存储成本,通过采用较小但写入速度快的存储设备来提高写入性能,同时通过定时迁移数据来保证数据的持久化和可靠性。(这个过程是自动进行的,系统会根据配置的参数和条件来触发数据迁移,并确保数据的一致性和可靠性。不需要手操换硬件)

PREV
[labuladong学习笔记] Linux中的进程与线程
NEXT
[Redis学习] Redis基础一条龙

评论(0)

发布评论