Zeros Tech Zeros Tech
首页
架构
大数据
数据库
  • 面试

    • Java面试
    • 大数据面试
    • 架构面试
语言
运维
关于
  • 网站
  • 资源
  • Vue资源
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

迹_Jason

全栈工程师
首页
架构
大数据
数据库
  • 面试

    • Java面试
    • 大数据面试
    • 架构面试
语言
运维
关于
  • 网站
  • 资源
  • Vue资源
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • clickhouse

  • deep_learning

  • delta

  • doris

  • es

  • flink

  • hadoop

  • hbase

  • hive

  • kafka

    • kafka_概述
    • kafka_控制器
    • kafka_启动流程
    • kafka_生产者
    • kafka_消费者
    • kafka_延迟操作组件
    • kafka_应用
    • kafka_指令
    • Kafka_字典表
    • kafka_parition
    • kafka_qa
    • kafka_topic
    • kafka-log-architure
    • kafka实战
      • kafka问题
    • kerberos

    • kudu

    • kylin

    • Livy

    • phoneix

    • ranger

    • spark

    • tidb

    • time_series

    • zeppelin

    • 大数据
    • kafka
    迹_Jason
    2021-07-18

    kafka实战

    # 生产者

        val props = new Properties()
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.200.168.10:9092")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
        props.put(ProducerConfig.ACKS_CONFIG, "1")
        val producer = new KafkaProducer[String, String](props)
        producer.send(new ProducerRecord[String, String](KafkaPublishManager.TIMER_TOPIC, cate, JsonHelper.toJsonString(buildMessage(cate, key, value, System.currentTimeMillis() + execMs))))
        // 不进行 close 不会进行发送会失败
        producer.close(100, TimeUnit.MILLISECONDS)
    
    1
    2
    3
    4
    5
    6
    7
    8
    9

    # 消费者

    val props = new Properties()
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers)
        props.put(ConsumerConfig.GROUP_ID_CONFIG, (kafkaServers + "_" + topics).hashCode + "")
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
        val consumer = new KafkaConsumer[String, String](props)
          import scala.collection.JavaConversions._
        val topicList: util.Collection[String] = topics.split(",").toList
        consumer.subscribe(topicList)
        new Thread(new Runnable {
          override def run(): Unit = {
            while (true) {
              val records = consumer.poll(pollTimeoutMs)
              if (records.isEmpty) {
                Thread.sleep(triggerIntervalMs)
              } else {
                if (!records.map(record => {
                  try {
                    processFun(record.key(), record.value())
                  } catch {
                    case e: Throwable =>
                      log.error(s"Kafka subscribe [$topics] error , at ${record.value()}", e)
                      false
                  }
                }).contains(false)) {
                  // TODO 部分成功处理
                  consumer.commitAsync()
                }
              }
            }
          }
        }).start()
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32

    # 删除Topic数据

    • kafka-delete-records 方式
    • topic 标记删除
    • 直接删除文件

    # kafka-delete-records 方式

    可以不用kafka重启,同时也不需要开启标记删除的配置项。其是将指定的offset值之前的所有数据删除,同时将topic from-beginning 的 offset 进行重置。

    1、筛选出需要删除的topic数据

    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group ods-ml | grep dmp.gateway.formatted-fk.fd.fengdai_riskcontrol.loan_apply
    
    1

    2、创建一个json文件,文件名自定义,文件格式如下:

    {
        "partitions": [
            {
                "topic": "dmp.gateway.source-dop.fdn.v2.t_party_person",
                "partition": 0,
                "offset": 10000000
            }
        ],
        "version": 1
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10

    3、执行命令

    ./bin/kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file ./offsetfile.json
    
    1

    等待5min之后,查看磁盘空间,应该是会进行回收。

    当你执行,从头消费,会发现不是从0开始了,而是从你删除的 offset之后开始了

    ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytest --from-beginning
    
    1

    # Topic 标记删除

    这是将原先的topic删除之后,进行重建方式,实现原来的数据删除操作。

    运行命令

    ./bin/kafka-topics.sh -delete -zookeeper [zookeeper server] -topic [topic name]
    
    1

    如果kafka启动时加载的配置文件server.properties没有配置 delete.topic.enable = true,那么此时的删除并不是真正的删除。而只是把topic标记为:marked for deletion,此时就需要执行删除ZK中的Topic记录操作。

    ./zkCli.sh -server AAA:2181,BBB:2181,CCC:2181
    
    进入/admin/delete_topics目录下,找到删除的topic,删除对应的信息
    
    1
    2
    3

    # 修改副本数

    https://kafka.apache.org/documentation/#basic_ops_increase_replication_factor

    # Kafka性能测试

    压测写入消息

    ./kafka-producer-perf-test.sh --topic test_perf --num-records 1000000 --record-size 1000 --throughput 20000 --producer-props bootstrap.servers=localhost:9092
    
    1

    压测消费消息

    ./kafka-consumer-perf-test.sh --zookeeper localhost:2181 --topic test_perf --fetch-size 1048576 --messages 1000000 --threads 1
    
    1
    编辑 (opens new window)
    上次更新: 2021/07/21, 18:22:30
    kafka-log-architure
    kafka问题

    ← kafka-log-architure kafka问题→

    最近更新
    01
    权限
    12-17
    02
    SpringGateway
    12-17
    03
    Spock
    12-17
    更多文章>
    Theme by Vdoing | Copyright © 2021-2021 迹_Jason | MIT License
    • 跟随系统
    • 浅色模式
    • 深色模式
    • 阅读模式
    ×