首頁 運維雜談用 Docker 快速搭建 Kafka 集群

用 Docker 快速搭建 Kafka 集群

運維派隸屬馬哥教育旗下專業運維社區,是國內成立最早的IT運維技術社區,歡迎關注公眾號:yunweipai
領取學習更多免費Linux云計算、Python、Docker、K8s教程關注公眾號:馬哥linux運維

用 Docker 快速搭建 Kafka 集群插圖

版本

  • JDK 14
  • Zookeeper
  • Kafka

安裝 Zookeeper 和 Kafka

Kafka 依賴 Zookeeper,所以我們需要在安裝 Kafka 之前先擁有 Zookeeper。準備如下的 docker-compose.yaml 文件,將文件中的主機地址 192.168.1.100 替換成你自己的環境中的主機地址即可。

version: "3"

services:
  zookeeper:
    image: zookeeper
    build:
      context: ./
    container_name: zookeeper
    ports:
      - 2181:2181
    volumes:
      - ./data/zookeeper/data:/data
      - ./data/zookeeper/datalog:/datalog
      - ./data/zookeeper/logs:/logs
    restart: always

  kafka_node_0:
    depends_on:
      - zookeeper
    build:
      context: ./
    container_name: kafka-node-0
    image: wurstmeister/kafka
    environment:
      KAFKA_BROKER_ID: 0
      KAFKA_ZOOKEEPER_CONNECT: 192.168.1.100:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.100:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_DEFAULT_REPLICATION_FACTOR: 2
    ports:
      - 9092:9092
    volumes:
      - ./data/kafka/node_0:/kafka
    restart: unless-stopped

  kafka_node_1:
    depends_on:
      - kafka_node_0
    build:
      context: ./
    container_name: kafka-node-1
    image: wurstmeister/kafka
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 192.168.1.100:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.100:9093
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_DEFAULT_REPLICATION_FACTOR: 2
    ports:
      - 9093:9093
    volumes:
      - ./data/kafka/node_1:/kafka
    restart: unless-stopped

  kafka_node_2:
    depends_on:
      - kafka_node_1
    build:
      context: ./
    container_name: kafka-node-2
    image: wurstmeister/kafka
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: 192.168.1.100:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.100:9094
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9094
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_DEFAULT_REPLICATION_FACTOR: 2
    ports:
      - 9094:9094
    volumes:
      - ./data/kafka/node_2:/kafka
    restart: unless-stopped

輸入 docker-compose up -d 運行腳本文件進行集群構建。等待一會兒,得到如下結果即為成功。

用 Docker 快速搭建 Kafka 集群插圖1

SpringBoot 集成 Kafka 集群

創建一個全新的 SpringBoot 工程,在 build.gradle 文件中添加下列依賴。

dependencies {
    ...
    ...
    implementation 'org.springframework.kafka:spring-kafka:2.5.2.RELEASE'
    implementation 'com.alibaba:fastjson:1.2.71'
}
  1. 在 application.properties 進行 Kafka 相關參數配置
spring.kafka.bootstrap-servers=192.168.1.100:9092,192.168.1.100:9093,192.168.1.100:9094

spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
  1. 創建消息體類。
public class Message {
    private Long id;
    private String message;
    private Date sendAt;
}
  1. 創建消息發送者
public class Sender {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void send() {
        Message message = new Message();

        message.setId(System.currentTimeMillis());
        message.setMessage(UUID.randomUUID().toString());
        message.setSendAt(new Date());

        log.info("message = {}", JSON.toJSONString(message));
        kafkaTemplate.send("test", JSON.toJSONString(message));
    }
}
  1. 創建消息接收者
public class Receiver {
    @KafkaListener(topics = {"test"}, groupId = "test")
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            log.info("receiver record = " + record);
            log.info("receiver message = " + message.get());
        }
    }
}
  1. 測試消息隊列
public class QueueController {
    @Autowired
    private Sender sender;

    @PostMapping("/test")
    public void testQueue() {
        sender.send();
        sender.send();
        sender.send();
    }
}

得到如下日志即為集成成功。

用 Docker 快速搭建 Kafka 集群插圖2
到這里就我們就成功搭建了一個 Kafka 偽集群,并成功與 SpringBoot 進行整合。

鏈接:https://segmentfault.com/a/1190000022988499
作者:曾是然

本文鏈接:http://www.thecarconnectin.com/36806.html

網友評論comments

發表回復

您的電子郵箱地址不會被公開。

暫無評論

Copyright ? 2012-2022 YUNWEIPAI.COM - 運維派 京ICP備16064699號-6
掃二維碼
掃二維碼
返回頂部
国产曰批视频免费观看完|久久久一本精品99久久精品66直播|色天使色偷偷AV一区二区三区|国产色秀视频在线播放|亚洲欧洲免费三级网站