K

[kafka] 实操记录

RoLingG 其他 2025-08-31

Kafka实操

简单的一对一生产消费订阅模型

生产者Producer

package main

import (
    "fmt"
    "github.com/IBM/sarama"
    "log"
)

func main() {
    // 配置生产者
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有副本确认
    config.Producer.Retry.Max = 3                    // 重试次数
    config.Producer.Return.Successes = true          // 返回成功的消息

    // 创建同步生产者(也可创建异步生产者)
    producer, err := sarama.NewSyncProducer([]string{"192.168.31.108:9092"}, config)
    if err != nil {
       log.Fatalf("Failed to create producer: %s", err)
    }
    defer producer.Close()

    // 发送消息
    topic := "test-topic"
    for i := 0; i < 5; i++ {
       msg := &sarama.ProducerMessage{
          Topic: topic,
          Value: sarama.StringEncoder(fmt.Sprintf("Hello Sarama! Message %d", i)),
       }

       // 同步发送,返回分区和偏移量
       partition, offset, err := producer.SendMessage(msg)
       if err != nil {
          log.Printf("Failed to send message: %s", err)
       } else {
          fmt.Printf("Message sent to partition %d, offset %d\n", partition, offset)
       }
    }
}

消费者Consumer:

package main

import (
    "fmt"
    "github.com/IBM/sarama"
    "log"
)

func main() {
    // 配置消费者
    config := sarama.NewConfig()
    config.Consumer.Return.Errors = true
    // 默认配置消费者偏移量 OffsetOldest: 从最早的消息开始消费 OffsetNewest:从当前最新消息开始消费
    config.Consumer.Offsets.Initial = sarama.OffsetOldest

    // 2. 创建基础消费者(非消费组模式)
    consumer, err := sarama.NewConsumer([]string{"192.168.31.108:9092"}, config)
    if err != nil {
       log.Fatalf("Failed to create consumer: %s", err)
    }
    defer consumer.Close()

    // 3. 指定要消费的主题和分区
    topic := "test-topic"
    partition := int32(0) // 分区编号(从0开始)

    // 4. 获取分区消费者(从指定偏移量开始消费)
    // 第三个参数为起始偏移量:可以是 sarama.OffsetOldest / sarama.OffsetNewest 或具体数值
    // 无论是哪个参数都会进行持续监听当前的partition
    pc, err := consumer.ConsumePartition(topic, partition, sarama.OffsetOldest)
    if err != nil {
       log.Fatalf("Failed to consume partition: %s", err)
    }
    defer pc.Close()

    for msg := range pc.Messages() {
       // 成功接收到消息
       fmt.Printf("%s %s %d %d\n", msg.Topic, msg.Value, msg.Partition, msg.Offset)
    }
}

多生产者单消费者

大部分使用kafka的场景就是多个生产者生产消息,往kafka里面发,然后消费者在后面慢慢消费。

这种模式不需要做额外配置,生产者只管往这个topic里面发就好了。

举例实际应用中,有多个服务产生日志,这些服务就可以每个配置一个日志生产者,生产出来的日志信息传输进kafka,然后对应处理日志的服务作为消费者模型,进行各个服务日志处理,即消费kafka内的日志消息。这就是实战中常用的kafka日志处理订阅发布模型。

消费者组

我们上面第一个实战操作都是无副本的单机处理,而且消费者在因为某些情况故障或者崩溃的情况下,要么从最开始位置进行消费,要么从最近最新运行开始接收到的消息进行消费,不能做到从崩溃时消费的消息后一条开始消费。

即消费者每次中断后都不知道自己消费到了哪里。

如果需要知道,我们就得引入消费者组这个概念,使用消费者组可以自动实现 “记录消费位置” 的功能,无需手动管理偏移量,这是消费组最核心的优势之一。

消费组会通过 kafka 集群自动跟踪和存储每个分区的消费偏移量(默认存储在 kafka 内部的 __consumer_offsets 主题中),并在消费者重启或重平衡(Rebalance)时自动恢复消费位置,无需手动持久化偏移量。

package main

import (
    "context"
    "fmt"
    "github.com/IBM/sarama"
    "log"
    "os"
    "os/signal"
    "syscall"
)

// GroupConsumer 自定义消费者,实现 sarama.ConsumerGroupHandler 接口
type GroupConsumer struct{}

// Setup 在分区分配完成后调用(初始化)
func (c *GroupConsumer) Setup(_ sarama.ConsumerGroupSession) error {
    return nil
}

// Cleanup 在分区被重新分配前调用(清理)
func (c *GroupConsumer) Cleanup(_ sarama.ConsumerGroupSession) error {
    return nil
}

// ConsumeClaim 处理分区消息(核心逻辑)
func (c *GroupConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    // 遍历分区消息
    for msg := range claim.Messages() {
        // 处理消息(实际业务逻辑)
        fmt.Printf("Received message: %s (topic: %s, partition: %d, offset: %d)\n",
            string(msg.Value), msg.Topic, msg.Partition, msg.Offset)

        // 手动提交偏移量(可选,若关闭自动提交则必须手动调用)
        // 注意:需在消息处理成功后再提交,避免消息丢失
        session.MarkMessage(msg, "**") // 标记消息为已处理,会在 session 结束时提交
    }
    return nil
}

func main() {
    // 配置消费组
    config := sarama.NewConfig()
    config.Version = sarama.V2_8_1_0 // 指定 Kafka 版本(需与集群版本匹配)

    // 偏移量配置:默认自动提交(可改为手动提交)
    config.Consumer.Offsets.AutoCommit.Enable = true      // 开启自动提交
    config.Consumer.Offsets.AutoCommit.Interval = 5000    // 自动提交间隔(毫秒)
    config.Consumer.Offsets.Initial = sarama.OffsetOldest // 初始偏移量(首次消费时)

    // 创建消费组
    groupID := "auto-offset-group" // 消费组唯一标识
    consumerGroup, err := sarama.NewConsumerGroup(
        []string{"192.168.31.108:9092"}, // Kafka 地址
        groupID,
        config,
    )
    if err != nil {
        log.Fatalf("Failed to create consumer group: %v", err)
    }
    defer consumerGroup.Close()

    // 要消费的主题
    topics := []string{"test-topic"}
    consumer := &GroupConsumer{}

    // 启动消费循环协程
    go func() {
        for {
            // 持续消费(重平衡后会重新调用)
            if err := consumerGroup.Consume(context.Background(), topics, consumer); err != nil {
                log.Printf("Consume error: %v", err)
                return
            }
        }
    }()

    // 等待中断信号
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
    <-sigChan

    fmt.Println("Shutting down consumer group...")
}

负载均衡模式

提到负载均衡,消费者组有一个概念,即当只有一个分区被多个消费者消费时,为了保证高可用,则默认只有其中一个消费者可以消费消息,其余的消费者阻塞待命,等消费者因为一些原因或者故障导致消费暂停,那么其他的消费者就会自动推选出一个进行当前消费,这种应该被称为故障转移模式。

2025-08-31T04:44:17.png

请输入图片描述

请输入图片描述

如上图,消费者①消费到39即暂停消费,但生产者生产到了44,则因为故障转移机制,生产者②衔接上上一位消费者接着消费后续消息内容。

生产者组也可以进行负载均衡,一个生产者生产消息,消息被轮询到不同的消费者上,实现这个,我们需要在创建topic的时候,指定分区数量。

我们先创建一个有3个分区的 topic:

package main

import (
    "fmt"
    "github.com/IBM/sarama"
    "log"
    "time"
)

// 检查主题是否存在(旧版API)
func topicExists(client sarama.Client, topic string) (bool, error) {
    // 获取所有主题
    topics, err := client.Topics()
    if err != nil {
       return false, fmt.Errorf("获取主题列表失败: %v", err)
    }

    // 检查目标主题是否在列表中
    for _, t := range topics {
       if t == topic {
          return true, nil
       }
    }
    return false, nil
}

// 创建主题(旧版API)
func createTopicIfNotExists(brokers []string, topic string, partitions int32, replicationFactor int16) error {
    // 配置客户端
    config := sarama.NewConfig()
    config.Net.DialTimeout = 5 * time.Second

    // 创建基础客户端(旧版没有专门的AdminClient)
    client, err := sarama.NewClient(brokers, config)
    if err != nil {
       return fmt.Errorf("创建客户端失败: %v", err)
    }
    defer client.Close()

    // 检查主题是否存在
    exists, err := topicExists(client, topic)
    if err != nil {
       return fmt.Errorf("检查主题存在性失败: %v", err)
    }
    if exists {
       log.Printf("主题 %s 已存在,无需创建", topic)
       return nil
    }

    // 构建创建主题的请求
    request := sarama.CreateTopicsRequest{
       TopicDetails: map[string]*sarama.TopicDetail{
          topic: {
             NumPartitions:     partitions,
             ReplicationFactor: replicationFactor,
             ConfigEntries:     make(map[string]*string),
          },
       },
       Timeout: 10 * time.Second, // 10秒超时
    }

    // 通过客户端发送创建请求(旧版需手动选择broker)
    broker, err := client.Controller()
    if err != nil {
       return fmt.Errorf("无法获取控制器broker")
    }

    // 发送请求
    response, err := broker.CreateTopics(&request)
    if err != nil {
       return fmt.Errorf("发送创建请求失败: %v", err)
    }

    // 检查响应结果
    for topic, err := range response.TopicErrors {
       fmt.Println(topic, err)
    }

    log.Printf("主题 %s 创建成功,分区数: %d,副本数: %d", topic, partitions, replicationFactor)
    return nil
}

func main() {
    brokers := []string{"192.168.31.108:9092"}
    topicName := "test-topic2"
    partitionCount := int32(3)
    replicationFactor := int16(1)

    err := createTopicIfNotExists(brokers, topicName, partitionCount, replicationFactor)
    if err != nil {
       log.Fatalf("操作失败: %v", err)
    }
    log.Println("操作完成")
}

然后我们使用三消费者监听这个 topic:

请输入图片描述

请输入图片描述

请输入图片描述

请输入图片描述

我们会发现生产者生产出来了三个 partition 的消息,三个消费者就分别对应了一个 partition 进行消息消费。

但是要注意,消费者数量必须是能够被 partition 整除的,例如:3个partition/3个消费者=1。如果 partition 为3,但是消费者只有2的时候,那么所有的消息都会被一个消费者包揽,另一个消费者阻塞待命,回到了最初单机故障转移机制的模型。

安全认证

kafka安全认证加密脚本:

#!/bin/bash

# 创建证书目录(与 Docker 挂载目录一致)
mkdir -p kafka-certs && cd kafka-certs

# 生成 CA 证书
openssl genrsa -out ca.key 2048
openssl req -new -x509 -key ca.key -out ca.crt -days 3650 \
  -subj "/C=CN/ST=Beijing/L=Beijing/O=YourCompany/CN=kafka-ca"

# 生成 Broker 证书(含 SAN 扩展)
cat > san.cnf <<EOF
[req]
distinguished_name = req_distinguished_name
req_extensions = v3_req
prompt = no

[req_distinguished_name]
C = CN
ST = Beijing
L = Beijing
O = YourCompany
CN = kafka-broker

[v3_req]
subjectAltName = @alt_names

## 重要配置
[alt_names]
IP.1 = 192.168.31.108    ## 运行kafka虚拟机本机ip
DNS.1 = kafka-broker
IP.2 = 10.3.0.2    ## 运行kafka容器配置的子网ip
DNS.2 = kafka-broker
EOF

openssl genrsa -out kafka.server.key 2048
openssl req -new -key kafka.server.key -out kafka.server.csr -config san.cnf
openssl x509 -req -in kafka.server.csr -CA ca.crt -CAkey ca.key -CAcreateserial \
  -out kafka.server.crt -days 3650 -extensions v3_req -extfile san.cnf

# 生成 JKS 格式证书(Bitnami 镜像专用)
openssl pkcs12 -export \
  -in kafka.server.crt -inkey kafka.server.key \
  -name kafka-ssl -out kafka.keystore.p12 \
  -password pass:yourpassword -certfile ca.crt

keytool -importkeystore \
  -srckeystore kafka.keystore.p12 -srcstoretype pkcs12 \
  -destkeystore kafka.keystore.jks -deststoretype JKS \
  -alias kafka-ssl -srcstorepass yourpassword -deststorepass yourpassword

# 生成 Truststore
keytool -keystore kafka.truststore.jks \
  -alias ca-cert -import -file ca.crt \
  -storepass yourpassword -noprompt

echo "证书已生成到 kafka-certs 目录,请挂载到容器的 /bitnami/kafka/config/certs"

docker-compose.yaml 配置文件:

services:
  kafka-server:
    image: bitnami/kafka:3.9.0
    restart: always
    ports:
      - "9095:9095"  # 对外暴露 9095 端口
    environment:
      # KRaft 基础配置
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_NODE_ID=1
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@10.3.0.2:9093
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER

      # SSL 核心参数
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SSL
      - KAFKA_CFG_SSL_PROTOCOL=TLSv1.2
      - KAFKA_CFG_SSL_CLIENT_AUTH=none
      - KAFKA_CFG_SSL_KEYSTORE_LOCATION=/bitnami/kafka/config/certs/kafka.keystore.jks
      - KAFKA_CFG_SSL_KEYSTORE_PASSWORD=yourpassword
      - KAFKA_CFG_SSL_KEY_PASSWORD=yourpassword
      - KAFKA_CFG_SSL_TRUSTSTORE_LOCATION=/bitnami/kafka/config/certs/kafka.truststore.jks
      - KAFKA_CFG_SSL_TRUSTSTORE_PASSWORD=yourpassword

      # 监听器配置
      - KAFKA_CFG_LISTENERS=INTERNAL://:9092,CONTROLLER://:9093,SSL://:9095
      - KAFKA_CFG_ADVERTISED_LISTENERS=SSL://192.168.31.108:9095,INTERNAL://10.3.0.2:9092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,SSL:SASL_SSL,INTERNAL:PLAINTEXT

      # SASL 认证(与 SSL 叠加)
      - KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN
      - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
      - KAFKA_CLIENT_USERS=user1
      - KAFKA_CLIENT_PASSWORDS=password1

    volumes:
      - /root/kafka/auth/kafka-certs:/bitnami/kafka/config/certs

    # 重要,网络分配
    networks:
      kafka_network:
        ipv4_address: 10.3.0.2    # 分配其中一个子网给kafka,作为服务无需认证鉴权即可通过

# 重要,容器网络配置
networks:
  kafka_network:
    driver: bridge    # 桥接
    ipam:
      driver: default
      config:
        - subnet: 10.3.0.0/24    # 设置子网

高可用集群简单实例

version: '3.8'

services:
  kraft-kafka-1:          # 容器名:kraft-kafka-1
    image: bitnami/kafka:3.9.0
    container_name: kraft-kafka-1
    restart: unless-stopped
    environment:
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_KRAFT_CLUSTER_ID=abcdef1234567890
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_NODE_ID=1
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kraft-kafka-1:9093,2@kraft-kafka-2:9093
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL

      # 监听器:INTERNAL(容器间)、EXTERNAL(宿主机)、CONTROLLER(Raft)
      - KAFKA_CFG_LISTENERS=INTERNAL://:9092,EXTERNAL://:9095,CONTROLLER://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kraft-kafka-1:9092,EXTERNAL://192.168.31.108:9095

      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT

    ports:
      - "9095:9095"   # 外部生产/消费端口
      - "9093:9093"   # 控制器端口
    networks:
      - kafka-net

  kraft-kafka-2:          # 容器名:kraft-kafka-2
    image: bitnami/kafka:3.9.0
    container_name: kraft-kafka-2
    restart: unless-stopped
    environment:
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_KRAFT_CLUSTER_ID=abcdef1234567890
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_NODE_ID=2
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kraft-kafka-1:9093,2@kraft-kafka-2:9093
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL

      - KAFKA_CFG_LISTENERS=INTERNAL://:9092,EXTERNAL://:9095,CONTROLLER://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kraft-kafka-2:9092,EXTERNAL://192.168.31.108:19095

      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT

    ports:
      - "19095:9095"  # 外部生产/消费端口(避免冲突)
      - "19093:9093"  # 控制器端口(避免冲突)
    networks:
      - kafka-net

networks:
  kafka-net:
    driver: bridge

然后生产者和消费者用之前的一般简易实现就能够进行互动了。多机 kafka 的好处就是,当生产者生产消息的时候,kafka 即便挂了一个,另一个也能自动被选举上去进行顶替使用,有一定的可用性与稳定性保证。(代码示例是单机多容器模拟多机,如果是多机的话,对应端口的网络地址配置不能用容器名代替,需要用实际IP代替)

此次学习参照的枫枫大佬的文档:https://www.fengfengzhidao.com/article/9-OXrJgB8lppN5cbw2jc

在此感谢大佬。

PREV
[n8n] 部署与踩坑

评论(0)

发布评论