三、python操作kafka

一、基本概念回顾

Topic:一组消息数据的标记符;
Producer:生产者,用于生产数据,可将生产后的消息送入指定的Topic;
Consumer:消费者,获取数据,可消费指定的Topic;
Group:消费者组,同一个group可以有多个消费者,一条消息在一个group中,只会被一个消费者获取;
Partition:分区,为了保证kafka的吞吐量,一个Topic可以设置多个分区。同一分区只能被一个消费者订阅。

二、本地安装与启动(基于Docker)

#1、下载zookeeper镜像与kafka镜像:
docker pull registry.cn-shanghai.aliyuncs.com/egon-k8s-test/kafka-zookeeper:3.4.6
docker pull registry.cn-shanghai.aliyuncs.com/egon-k8s-test/wurstmeister-kafka:2.13-2.8.1

#2、本地启动zookeeper
docker run -d --name zookeeper -p 2181:2181 -t registry.cn-shanghai.aliyuncs.com/egon-k8s-test/kafka-zookeeper:3.4.6  

#3、本地启动kafka(注意下述代码,将kafka启动在9092端口)
docker run -d --name kafka --publish 9092:9092 --link zookeeper \
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.71.113 \
--env KAFKA_ADVERTISED_PORT=9092 \
registry.cn-shanghai.aliyuncs.com/egon-k8s-test/wurstmeister-kafka:2.13-2.8.1

上面写的localhost没有影响,查看端口如下
# netstat -tuanlp |grep 9092
tcp 0 0 0.0.0.0:9092 0.0.0.0:* LISTEN 102483/docker-proxy 
tcp6 0 0 :::9092 :::* LISTEN 102487/docker-proxy 
#4、进入kafka bash
docker exec -it kafka bash
cd /opt/kafka/bin

#5、创建Topic,分区为2,Topic name为'kafka_demo'
kafka-topics.sh --create --zookeeper zookeeper:2181 \
--replication-factor 1 --partitions 2 --topic kafka_demo

kafka-topics.sh --create --zookeeper zookeeper:2181 \ 
--replication-factor 1 --partitions 2 --topic egon

数据存在哪里
[root@web02 ~]# docker exec -it kafka bash
bash-5.1# 
bash-5.1# 
bash-5.1# 
bash-5.1# ls /kafka/
kafka-logs-f33383f9c414
bash-5.1# 
bash-5.1# 
bash-5.1# 
bash-5.1# ls /kafka/kafka-logs-f33383f9c414/
kafka_demo-0 kafka_demo-1
egon-0 egon-1
。。。。。。。。。。。。 
bash-5.1# 
bash-5.1# 
bash-5.1# 
bash-5.1# ls /kafka/kafka-logs-f33383f9c414/egon-0
00000000000000000000.index 00000000000000000000.timeindex
00000000000000000000.log leader-epoch-checkpoint

#6、查看当前所有topic
kafka-topics.sh --zookeeper zookeeper:2181 --list

#7、命令行操作
$ docker exec -ti kafka sh
/ # cd /opt/kafka/bin
/ # kafka-console-producer.sh --bootstrap-server 192.168.71.113:9092 --topic test_topic
然后一行行输入,回车即发送一条消息
>111
>222
>333


另外一个终端
$ docker exec -ti kafka sh
/ # cd /opt/kafka/bin
/ # kafka-console-consumer.sh --bootstrap-server 192.168.71.113:9092 --topic test_topic --from-beginning
可以收到消息
111
222
333
#8、安装kafka-python
pip install kafka-python

三、生产者(Producer)与消费者(Consumer)

上一篇
下一篇
Copyright © 2022 Egon的技术星球 egonlin.com 版权所有 沪ICP备2022009235号 沪公网安备31011802005110号 青浦区尚茂路798弄 联系方式-13697081366