kafka实战
# 生产者
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.200.168.10:9092")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.put(ProducerConfig.ACKS_CONFIG, "1")
val producer = new KafkaProducer[String, String](props)
producer.send(new ProducerRecord[String, String](KafkaPublishManager.TIMER_TOPIC, cate, JsonHelper.toJsonString(buildMessage(cate, key, value, System.currentTimeMillis() + execMs))))
// 不进行 close 不会进行发送会失败
producer.close(100, TimeUnit.MILLISECONDS)
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
# 消费者
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers)
props.put(ConsumerConfig.GROUP_ID_CONFIG, (kafkaServers + "_" + topics).hashCode + "")
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
val consumer = new KafkaConsumer[String, String](props)
import scala.collection.JavaConversions._
val topicList: util.Collection[String] = topics.split(",").toList
consumer.subscribe(topicList)
new Thread(new Runnable {
override def run(): Unit = {
while (true) {
val records = consumer.poll(pollTimeoutMs)
if (records.isEmpty) {
Thread.sleep(triggerIntervalMs)
} else {
if (!records.map(record => {
try {
processFun(record.key(), record.value())
} catch {
case e: Throwable =>
log.error(s"Kafka subscribe [$topics] error , at ${record.value()}", e)
false
}
}).contains(false)) {
// TODO 部分成功处理
consumer.commitAsync()
}
}
}
}
}).start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 删除Topic数据
kafka-delete-records
方式- topic 标记删除
- 直接删除文件
# kafka-delete-records
方式
可以不用kafka重启,同时也不需要开启标记删除的配置项。其是将指定的offset值之前的所有数据删除,同时将topic from-beginning 的 offset 进行重置。
1、筛选出需要删除的topic数据
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group ods-ml | grep dmp.gateway.formatted-fk.fd.fengdai_riskcontrol.loan_apply
1
2、创建一个json文件,文件名自定义,文件格式如下:
{
"partitions": [
{
"topic": "dmp.gateway.source-dop.fdn.v2.t_party_person",
"partition": 0,
"offset": 10000000
}
],
"version": 1
}
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
3、执行命令
./bin/kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file ./offsetfile.json
1
等待5min之后,查看磁盘空间,应该是会进行回收。
当你执行,从头消费,会发现不是从0开始了,而是从你删除的 offset之后开始了
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytest --from-beginning
1
# Topic 标记删除
这是将原先的topic删除之后,进行重建方式,实现原来的数据删除操作。
运行命令
./bin/kafka-topics.sh -delete -zookeeper [zookeeper server] -topic [topic name]
1
如果kafka启动时加载的配置文件server.properties没有配置 delete.topic.enable = true
,那么此时的删除并不是真正的删除。而只是把topic标记为:marked for deletion
,此时就需要执行删除ZK中的Topic记录操作。
./zkCli.sh -server AAA:2181,BBB:2181,CCC:2181
进入/admin/delete_topics目录下,找到删除的topic,删除对应的信息
1
2
3
2
3
# 修改副本数
https://kafka.apache.org/documentation/#basic_ops_increase_replication_factor
# Kafka性能测试
压测写入消息
./kafka-producer-perf-test.sh --topic test_perf --num-records 1000000 --record-size 1000 --throughput 20000 --producer-props bootstrap.servers=localhost:9092
1
压测消费消息
./kafka-consumer-perf-test.sh --zookeeper localhost:2181 --topic test_perf --fetch-size 1048576 --messages 1000000 --threads 1
1
编辑 (opens new window)
上次更新: 2021/07/21, 18:22:30