{"id":10274,"date":"2024-04-14T12:12:24","date_gmt":"2024-04-14T04:12:24","guid":{"rendered":"https:\/\/egonlin.com\/?p=10274"},"modified":"2024-08-19T21:05:01","modified_gmt":"2024-08-19T13:05:01","slug":"%e4%b8%89%e3%80%81python%e6%93%8d%e4%bd%9ckafka","status":"publish","type":"post","link":"https:\/\/egonlin.com\/?p=10274","title":{"rendered":"\u4e09\u3001python\u64cd\u4f5ckafka"},"content":{"rendered":"<p>\u4e00\u3001\u57fa\u672c\u6982\u5ff5\u56de\u987e<\/p>\n<pre>Topic\uff1a\u4e00\u7ec4\u6d88\u606f\u6570\u636e\u7684\u6807\u8bb0\u7b26\uff1b\r\nProducer\uff1a\u751f\u4ea7\u8005\uff0c\u7528\u4e8e\u751f\u4ea7\u6570\u636e\uff0c\u53ef\u5c06\u751f\u4ea7\u540e\u7684\u6d88\u606f\u9001\u5165\u6307\u5b9a\u7684Topic\uff1b\r\nConsumer\uff1a\u6d88\u8d39\u8005\uff0c\u83b7\u53d6\u6570\u636e\uff0c\u53ef\u6d88\u8d39\u6307\u5b9a\u7684Topic\uff1b\r\nGroup\uff1a\u6d88\u8d39\u8005\u7ec4\uff0c\u540c\u4e00\u4e2agroup\u53ef\u4ee5\u6709\u591a\u4e2a\u6d88\u8d39\u8005\uff0c\u4e00\u6761\u6d88\u606f\u5728\u4e00\u4e2agroup\u4e2d\uff0c\u53ea\u4f1a\u88ab\u4e00\u4e2a\u6d88\u8d39\u8005\u83b7\u53d6\uff1b\r\nPartition\uff1a\u5206\u533a\uff0c\u4e3a\u4e86\u4fdd\u8bc1kafka\u7684\u541e\u5410\u91cf\uff0c\u4e00\u4e2aTopic\u53ef\u4ee5\u8bbe\u7f6e\u591a\u4e2a\u5206\u533a\u3002\u540c\u4e00\u5206\u533a\u53ea\u80fd\u88ab\u4e00\u4e2a\u6d88\u8d39\u8005\u8ba2\u9605\u3002<\/pre>\n<p>\u4e8c\u3001\u672c\u5730\u5b89\u88c5\u4e0e\u542f\u52a8\uff08\u57fa\u4e8eDocker\uff09<\/p>\n<pre><strong>#1\u3001\u4e0b\u8f7dzookeeper\u955c\u50cf\u4e0ekafka\u955c\u50cf\uff1a<\/strong>\r\ndocker pull registry.cn-shanghai.aliyuncs.com\/egon-k8s-test\/kafka-zookeeper:3.4.6\r\ndocker pull registry.cn-shanghai.aliyuncs.com\/egon-k8s-test\/wurstmeister-kafka:2.13-2.8.1\r\n\r\n<strong>#2\u3001\u672c\u5730\u542f\u52a8zookeeper<\/strong>\r\ndocker run -d --name zookeeper -p 2181:2181 -t registry.cn-shanghai.aliyuncs.com\/egon-k8s-test\/kafka-zookeeper:3.4.6  \r\n\r\n<strong>#3\u3001\u672c\u5730\u542f\u52a8kafka\uff08\u6ce8\u610f\u4e0b\u8ff0\u4ee3\u7801\uff0c\u5c06kafka\u542f\u52a8\u57289092\u7aef\u53e3\uff09<\/strong>\r\ndocker run -d --name kafka --publish 9092:9092 --link zookeeper \\\r\n--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \\\r\n--env KAFKA_ADVERTISED_HOST_NAME=192.168.71.113 \\\r\n--env KAFKA_ADVERTISED_PORT=9092 \\\r\nregistry.cn-shanghai.aliyuncs.com\/egon-k8s-test\/wurstmeister-kafka:2.13-2.8.1\r\n\r\n\u4e0a\u9762\u5199\u7684localhost\u6ca1\u6709\u5f71\u54cd\uff0c\u67e5\u770b\u7aef\u53e3\u5982\u4e0b\r\n# netstat -tuanlp |grep 9092\r\ntcp 0 0 0.0.0.0:9092 0.0.0.0:* LISTEN 102483\/docker-proxy \r\ntcp6 0 0 :::9092 :::* LISTEN 102487\/docker-proxy \r\n<strong>#4\u3001\u8fdb\u5165kafka bash<\/strong>\r\ndocker exec -it kafka bash\r\ncd \/opt\/kafka\/bin\r\n\r\n<strong>#5\u3001\u521b\u5efa<a class=\"hl hl-1\" href=\"https:\/\/so.csdn.net\/so\/search?q=Topic&amp;spm=1001.2101.3001.7020\" target=\"_blank\" rel=\"noopener\" data-report-click=\"{&quot;spm&quot;:&quot;1001.2101.3001.7020&quot;,&quot;dest&quot;:&quot;https:\/\/so.csdn.net\/so\/search?q=Topic&amp;spm=1001.2101.3001.7020&quot;,&quot;extra&quot;:&quot;{\\&quot;searchword\\&quot;:\\&quot;Topic\\&quot;}&quot;}\" data-tit=\"Topic\" data-pretit=\"topic\">Topic<\/a>\uff0c\u5206\u533a\u4e3a2\uff0cTopic name\u4e3a'kafka_demo'<\/strong>\r\nkafka-topics.sh --create --zookeeper zookeeper:2181 \\\r\n--replication-factor 1 --partitions 2 --topic kafka_demo\r\n\r\nkafka-topics.sh --create --zookeeper zookeeper:2181 \\ \r\n--replication-factor 1 --partitions 2 --topic egon\r\n\r\n<span style=\"background-color: #0000ff;\">\u6570\u636e\u5b58\u5728\u54ea\u91cc\r\n[root@web02 ~]# docker exec -it kafka bash\r\nbash-5.1# \r\nbash-5.1# \r\nbash-5.1# \r\nbash-5.1# ls \/kafka\/\r\nkafka-logs-f33383f9c414\r\nbash-5.1# \r\nbash-5.1# \r\nbash-5.1# \r\nbash-5.1# ls \/kafka\/kafka-logs-f33383f9c414\/\r\nkafka_demo-0 kafka_demo-1\r\negon-0 egon-1\r\n\u3002\u3002\u3002\u3002\u3002\u3002\u3002\u3002\u3002\u3002\u3002\u3002 \r\nbash-5.1# \r\nbash-5.1# \r\nbash-5.1# \r\nbash-5.1# ls \/kafka\/kafka-logs-f33383f9c414\/egon-0\r\n00000000000000000000.index 00000000000000000000.timeindex\r\n00000000000000000000.log leader-epoch-checkpoint<\/span>\r\n\r\n<strong>#6\u3001\u67e5\u770b\u5f53\u524d\u6240\u6709topic<\/strong>\r\nkafka-topics.sh --zookeeper zookeeper:2181 --list\r\n\r\n#7\u3001\u547d\u4ee4\u884c\u64cd\u4f5c\r\n$ docker exec -ti kafka sh\r\n\/ # cd \/opt\/kafka\/bin\r\n\/ # kafka-console-producer.sh --bootstrap-server 192.168.71.113:9092 --topic test_topic\r\n\u7136\u540e\u4e00\u884c\u884c\u8f93\u5165\uff0c\u56de\u8f66\u5373\u53d1\u9001\u4e00\u6761\u6d88\u606f\r\n&gt;111\r\n&gt;222\r\n&gt;333\r\n\r\n\r\n\u53e6\u5916\u4e00\u4e2a\u7ec8\u7aef\r\n$ docker exec -ti kafka sh\r\n\/ # cd \/opt\/kafka\/bin\r\n\/ # kafka-console-consumer.sh --bootstrap-server 192.168.71.113:9092 --topic test_topic --from-beginning\r\n\u53ef\u4ee5\u6536\u5230\u6d88\u606f\r\n111\r\n222\r\n333\r\n<strong>#8\u3001\u5b89\u88c5kafka-python<\/strong>\r\npip install kafka-python<\/pre>\n<p>\u4e09\u3001\u751f\u4ea7\u8005\uff08Producer\uff09\u4e0e\u6d88\u8d39\u8005\uff08Consumer\uff09<\/p>\n<div  class='collapse-block shadow-sm collapse-block-transparent collapsed hide-border-left'><div class='collapse-block-title'><span class='collapse-block-title-inner'>\u793a\u4f8b\u4ee3\u78011<\/span><i class='collapse-icon fa fa-angle-down'><\/i><\/div><div class='collapse-block-body' style='display:none;'><\/p>\n<pre class=\"code\"># pip3 install kafka-python  # \u7248\u672c\u662f2.0.2\r\nfrom kafka import KafkaProducer, KafkaConsumer\r\nimport json\r\nimport threading\r\nimport time\r\n\r\n# Kafka broker address\r\nbootstrap_servers = '192.168.71.113:9092'\r\n\r\n# Topic name\r\ntopic = 'test_topic'\r\n\r\n\r\n# Producer function\r\ndef kafka_producer():\r\n    producer = KafkaProducer(bootstrap_servers=bootstrap_servers,\r\n                             value_serializer=lambda v: json.dumps(v).encode('utf-8'))\r\n\r\n    try:\r\n        for i in range(10):\r\n            message = {'message': f'Hello Kafka! Message {i}'}\r\n            producer.send(topic, value=message)\r\n            print(f\"Sent: {message}\")\r\n            time.sleep(1)\r\n        else:\r\n            print(\"\u53d1\u9001\u5b8c\u6210\")\r\n    except Exception as ex:\r\n        print(f\"Exception occurred: {ex}\")\r\n    finally:\r\n        producer.close()\r\n\r\n\r\n# Consumer function\r\ndef kafka_consumer():\r\n    consumer = KafkaConsumer(topic,\r\n                             bootstrap_servers=bootstrap_servers,\r\n                             auto_offset_reset='earliest',\r\n                             consumer_timeout_ms=5000)  # \u8bbe\u7f6e\u8d85\u65f6\u65f6\u95f4\u4e3a1\u79d2\r\n\r\n    try:\r\n        for message in consumer:\r\n            print(f\"Received: {message.value}\")\r\n        else:\r\n            print(\"\u6d88\u8d39\u5b8c\u6bd5\uff0c\u7b495000\u6beb\u79d2\u8d85\u65f6\u5373\u53ef\u7ed3\u675f\uff0c\u6267\u884cfinally\u5185\u7684\u4ee3\u7801\")\r\n    except Exception as ex:\r\n        print(f\"Exception occurred: {ex}\")\r\n    finally:\r\n        print(\"\u6d88\u8d39\u8005\u7ed3\u675f\")\r\n        consumer.close()\r\n\r\n\r\n# Create threads for producer and consumer\r\nproducer_thread = threading.Thread(target=kafka_producer)\r\nconsumer_thread = threading.Thread(target=kafka_consumer)\r\n\r\n# Start both threads\r\nproducer_thread.start()\r\nconsumer_thread.start()\r\n\r\n# Wait for threads to complete\r\nproducer_thread.join()\r\nconsumer_thread.join()\r\n\r\nprint(\"Kafka producer and consumer threads have finished.\")\r\n\r\n<\/pre>\n<p>\u6267\u884c\u6548\u679c\u622a\u56fe<\/p>\n<p><div class='fancybox-wrapper lazyload-container-unload' data-fancybox='post-images' href='https:\/\/egonlin.com\/wp-content\/uploads\/2024\/04\/\u5fae\u4fe1\u56fe\u7247_20240704234106.png'><img class=\"lazyload lazyload-style-2\" src=\"data:image\/svg+xml;base64,PCEtLUFyZ29uTG9hZGluZy0tPgo8c3ZnIHdpZHRoPSIxIiBoZWlnaHQ9IjEiIHhtbG5zPSJodHRwOi8vd3d3LnczLm9yZy8yMDAwL3N2ZyIgc3Ryb2tlPSIjZmZmZmZmMDAiPjxnPjwvZz4KPC9zdmc+\"  loading=\"lazy\" class=\"alignnone size-full wp-image-11191\" data-original=\"https:\/\/egonlin.com\/wp-content\/uploads\/2024\/04\/\u5fae\u4fe1\u56fe\u7247_20240704234106.png\" src=\"data:image\/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAAAXNSR0IArs4c6QAAAARnQU1BAACxjwv8YQUAAAAJcEhZcwAADsQAAA7EAZUrDhsAAAANSURBVBhXYzh8+PB\/AAffA0nNPuCLAAAAAElFTkSuQmCC\" alt=\"\" width=\"701\" height=\"767\" \/><\/div><\/p>\n<p><\/div><\/div>\n<div  class='collapse-block shadow-sm collapse-block-transparent collapsed hide-border-left'><div class='collapse-block-title'><span class='collapse-block-title-inner'>\u793a\u4f8b\u4ee3\u78012(\u5e9f\u5f03)<\/span><i class='collapse-icon fa fa-angle-down'><\/i><\/div><div class='collapse-block-body' style='display:none;'><\/p>\n<pre>#!\/usr\/bin\/env python\r\n# -*- coding: utf-8 -*-\r\n\r\n\r\nimport time\r\nimport json\r\nimport logging\r\nimport traceback\r\n\r\nfrom kafka import KafkaConsumer, KafkaProducer, TopicPartition\r\n\r\nlog = logging.getLogger(__name__)\r\n\r\n\"\"\"\r\nkafka \u751f\u4ea7\u8005\r\nKafkaProducer\u662f\u53d1\u5e03\u6d88\u606f\u5230Kafka\u96c6\u7fa4\u7684\u5ba2\u6237\u7aef\uff0c\u5b83\u662f\u7ebf\u7a0b\u5b89\u5168\u7684\u5e76\u4e14\u5171\u4eab\u5355\u4e00\u751f\u4ea7\u8005\u5b9e\u4f8b\u3002\u751f\u4ea7\u8005\u5305\u542b\u4e00\u4e2a\u5e26\u6709\u7f13\u51b2\u533a\u7684\u6c60\uff0c\r\n\u7528\u4e8e\u4fdd\u5b58\u8fd8\u6ca1\u6709\u4f20\u9001\u5230Kafka\u96c6\u7fa4\u7684\u6d88\u606f\u8bb0\u5f55\u4ee5\u53ca\u4e00\u4e2a\u540e\u53f0IO\u7ebf\u7a0b\uff0c\u8be5\u7ebf\u7a0b\u5c06\u8fd9\u4e9b\u7559\u5728\u7f13\u51b2\u533a\u7684\u6d88\u606f\u8bb0\u5f55\u53d1\u9001\u5230Kafka\u96c6\u7fa4\u4e2d\u3002\r\nKafkaProducer\u6784\u9020\u51fd\u6570\u53c2\u6570\u89e3\u91ca\r\n    - acks 0\u8868\u793a\u53d1\u9001\u4e0d\u7406\u776c\u53d1\u9001\u662f\u5426\u6210\u529f\uff1b1\u8868\u793a\u9700\u8981\u7b49\u5f85leader\u6210\u529f\u5199\u5165\u65e5\u5fd7\u624d\u8fd4\u56de\uff1ball\u8868\u793a\u6240\u6709\u526f\u672c\u90fd\u5199\u5165\u65e5\u5fd7\u624d\u8fd4\u56de\r\n    - buffer_memory \u9ed8\u8ba433554432\u4e5f\u5c31\u662f32M\uff0c\u8be5\u53c2\u6570\u7528\u4e8e\u8bbe\u7f6eproducer\u7528\u4e8e\u7f13\u5b58\u6d88\u606f\u7684\u7f13\u51b2\u533a\u5927\u5c0f\uff0c\u5982\u679c\u91c7\u7528\u5f02\u6b65\u53d1\u9001\u6d88\u606f\uff0c\u90a3\u4e48\r\n                    \u751f\u4ea7\u8005\u542f\u52a8\u540e\u4f1a\u521b\u5efa\u4e00\u4e2a\u5185\u5b58\u7f13\u51b2\u533a\u7528\u4e8e\u5b58\u653e\u5f85\u53d1\u9001\u7684\u6d88\u606f\uff0c\u7136\u540e\u7531\u4e13\u5c5e\u7ebf\u7a0b\u6765\u628a\u653e\u5728\u7f13\u51b2\u533a\u7684\u6d88\u606f\u8fdb\u884c\u771f\u6b63\u53d1\u9001\uff0c\r\n                    \u5982\u679c\u8981\u7ed9\u751f\u4ea7\u8005\u8981\u7ed9\u5f88\u591a\u5206\u533a\u53d1\u6d88\u606f\u90a3\u4e48\u5c31\u9700\u8981\u8003\u8651\u8fd9\u4e2a\u53c2\u6570\u7684\u5927\u5c0f\u9632\u6b62\u8fc7\u5c0f\u964d\u4f4e\u541e\u5410\u91cf\r\n    - compression_type \u662f\u5426\u542f\u7528\u538b\u7f29\uff0c\u9ed8\u8ba4\u662fnone\uff0c\u53ef\u9009\u7c7b\u578b\u4e3agzip\u3001lz4\u3001snappy\u4e09\u79cd\u3002\u538b\u7f29\u4f1a\u964d\u4f4e\u7f51\u7edcIO\u4f46\u662f\u4f1a\u589e\u52a0\u751f\u4ea7\u8005\u7aef\u7684CPU\r\n                       \u6d88\u8017\u3002\u53e6\u5916\u5982\u679cbroker\u7aef\u7684\u538b\u7f29\u8bbe\u7f6e\u548c\u751f\u4ea7\u8005\u4e0d\u540c\u90a3\u4e48\u4e5f\u4f1a\u7ed9broker\u5e26\u6765\u91cd\u65b0\u89e3\u538b\u7f29\u548c\u91cd\u65b0\u538b\u7f29\u7684CPU\u8d1f\u62c5\u3002\r\n    - retries \u91cd\u8bd5\u6b21\u6570\uff0c\u5f53\u6d88\u606f\u53d1\u9001\u5931\u8d25\u540e\u4f1a\u5c1d\u8bd5\u51e0\u6b21\u91cd\u53d1\u3002\u9ed8\u8ba4\u4e3a0\uff0c\u4e00\u822c\u8003\u8651\u5230\u7f51\u7edc\u6296\u52a8\u6216\u8005\u5206\u533a\u7684leader\u5207\u6362\uff0c\u800c\u4e0d\u662f\u670d\u52a1\u7aef\r\n              \u771f\u7684\u6545\u969c\u6240\u4ee5\u53ef\u4ee5\u8bbe\u7f6e\u91cd\u8bd53\u6b21\u3002\r\n    - retry_backoff_ms \u6bcf\u6b21\u91cd\u8bd5\u95f4\u9694\u591a\u5c11\u6beb\u79d2\uff0c\u9ed8\u8ba4100\u6beb\u79d2\u3002\r\n    - max_in_flight_requests_per_connection \u751f\u4ea7\u8005\u4f1a\u5c06\u591a\u4e2a\u53d1\u9001\u8bf7\u6c42\u7f13\u5b58\u5728\u5185\u5b58\u4e2d\uff0c\u9ed8\u8ba4\u662f5\u4e2a\uff0c\u5982\u679c\u4f60\u5f00\u542f\u4e86\u91cd\u8bd5\uff0c\u4e5f\u5c31\u662f\u8bbe\u7f6e\u4e86\r\n                                            retries\u53c2\u6570\uff0c\u90a3\u4e48\u5c06\u53ef\u80fd\u5bfc\u81f4\u9488\u5bf9\u4e8e\u540c\u4e00\u5206\u533a\u7684\u6d88\u606f\u51fa\u73b0\u987a\u5e8f\u9519\u4e71\u3002\u4e3a\u4e86\u9632\u6b62\u8fd9\u79cd\u60c5\u51b5\r\n                                            \u9700\u8981\u628a\u8be5\u53c2\u6570\u8bbe\u7f6e\u4e3a1\uff0c\u6765\u4fdd\u969c\u540c\u5206\u533a\u7684\u6d88\u606f\u987a\u5e8f\u3002\r\n    - batch_size \u5bf9\u4e8e\u8c03\u4f18\u751f\u4ea7\u8005\u541e\u5410\u91cf\u548c\u5ef6\u8fdf\u6027\u80fd\u6307\u6807\u6709\u91cd\u8981\u7684\u4f5c\u7528\u3002buffer_memeory\u53ef\u4ee5\u770b\u505a\u6c60\u5b50\uff0c\u800c\u8fd9\u4e2abatch_size\u53ef\u4ee5\u770b\u505a\u6c60\u5b50\u91cc\r\n                 \u88c5\u6709\u6d88\u606f\u7684\u5c0f\u76d2\u5b50\u3002\u8fd9\u4e2a\u503c\u9ed8\u8ba416384\u4e5f\u5c31\u662f16K\uff0c\u5176\u5b9e\u4e0d\u5927\u3002\u751f\u4ea7\u8005\u4f1a\u628a\u53d1\u5f80\u540c\u4e00\u4e2a\u5206\u533a\u7684\u6d88\u606f\u653e\u5728\u4e00\u4e2abatch\u4e2d\uff0c\u5f53batch\r\n                 \u6ee1\u4e86\u5c31\u4f1a\u53d1\u9001\u91cc\u9762\u7684\u6d88\u606f\uff0c\u4f46\u662f\u4e5f\u4e0d\u4e00\u5b9a\u975e\u8981\u7b49\u5230\u6ee1\u4e86\u624d\u4f1a\u53d1\u3002\u8fd9\u4e2a\u6570\u503c\u5927\u90a3\u4e48\u751f\u4ea7\u8005\u541e\u5410\u91cf\u9ad8\u4f46\u662f\u6027\u80fd\u4f4e\u56e0\u4e3a\u76d2\u5b50\u592a\u5927\u5360\u7528\u5185\u5b58\r\n                 \u53d1\u9001\u7684\u65f6\u5019\u8fd9\u4e2a\u6570\u636e\u91cf\u4e5f\u5c31\u5927\u3002\u5982\u679c\u4f60\u8bbe\u7f6e\u62101M\uff0c\u90a3\u4e48\u663e\u7136\u751f\u4ea7\u8005\u7684\u541e\u5410\u91cf\u8981\u6bd416K\u9ad8\u7684\u591a\u3002\r\n    - linger_ms \u4e0a\u9762\u8bf4batch\u6ca1\u6709\u586b\u6ee1\u4e5f\u53ef\u4ee5\u53d1\u9001\uff0c\u90a3\u663e\u7136\u6709\u4e00\u4e2a\u65f6\u95f4\u63a7\u5236\uff0c\u5c31\u662f\u8fd9\u4e2a\u53c2\u6570\uff0c\u9ed8\u8ba4\u662f0\u6beb\u79d2\uff0c\u8fd9\u4e2a\u53c2\u6570\u5c31\u662f\u7528\u4e8e\u63a7\u5236\u6d88\u606f\u53d1\u9001\u5ef6\u8fdf\r\n                \u591a\u4e45\u7684\u3002\u9ed8\u8ba4\u662f\u7acb\u5373\u53d1\u9001\uff0c\u65e0\u9700\u5173\u7cfbbatch\u662f\u5426\u586b\u6ee1\u3002\u5927\u591a\u6570\u573a\u666f\u6211\u4eec\u5e0c\u671b\u7acb\u5373\u53d1\u9001\uff0c\u4f46\u662f\u8fd9\u4e5f\u964d\u4f4e\u4e86\u541e\u5410\u91cf\u3002\r\n    - max_request_size \u6700\u5927\u8bf7\u6c42\u5927\u5c0f\uff0c\u53ef\u4ee5\u7406\u89e3\u4e3a\u4e00\u6761\u6d88\u606f\u8bb0\u5f55\u7684\u6700\u5927\u5927\u5c0f\uff0c\u9ed8\u8ba4\u662f1048576\u5b57\u8282\u3002\r\n    - request_timeout_ms  \u751f\u4ea7\u8005\u53d1\u9001\u6d88\u606f\u540e\uff0cbroker\u9700\u8981\u5728\u89c4\u5b9a\u65f6\u95f4\u5185\u5c06\u5904\u7406\u7ed3\u679c\u8fd4\u56de\u7ed9\u751f\u4ea7\u8005\uff0c\u90a3\u4e2a\u8fd9\u4e2a\u65f6\u95f4\u957f\u5ea6\u5c31\u662f\u8fd9\u4e2a\u53c2\u6570\r\n                          \u63a7\u5236\u7684\uff0c\u9ed8\u8ba430000\uff0c\u4e5f\u5c31\u662f30\u79d2\u3002\u5982\u679cbroker\u572830\u79d2\u5185\u6ca1\u6709\u7ed9\u751f\u4ea7\u8005\u54cd\u5e94\uff0c\u90a3\u4e48\u751f\u4ea7\u8005\u5c31\u4f1a\u8ba4\u4e3a\u8bf7\u6c42\u8d85\u65f6\uff0c\u5e76\u5728\u56de\u8c03\u51fd\u6570\r\n                          \u4e2d\u8fdb\u884c\u7279\u6b8a\u5904\u7406\uff0c\u6216\u8005\u8fdb\u884c\u91cd\u8bd5\u3002\r\n\r\n\"\"\"\r\n\r\n\r\nclass KProducer(object):\r\n    def __init__(self, host, port, topic):\r\n        self.host = host\r\n        self.port = port\r\n        self.bootstrap_servers = \"{0}:{1}\".format(self.host, self.port)\r\n        self.topic = topic\r\n        self.producer = None\r\n        self.connect()\r\n\r\n        log.debug(\r\n            \"Kafka producer info {0} {1}\".format(\r\n                self.bootstrap_servers,\r\n                self.topic\r\n            )\r\n        )\r\n\r\n    def connect(self):\r\n        \"\"\"\r\n        :param bootstrap_servers: \u5730\u5740\r\n        \"\"\"\r\n        while True:\r\n            try:\r\n                # json \u683c\u5f0f\u5316\u53d1\u9001\u7684\u5185\u5bb9\r\n                self.producer = KafkaProducer(\r\n                    bootstrap_servers=self.bootstrap_servers,\r\n                    buffer_memory=33554432,\r\n                    batch_size=1048576,\r\n                    max_request_size=1048576,\r\n                    value_serializer=lambda m: json.dumps(m).encode(\"ascii\"),\r\n                    compression_type=\"gzip\"  # \u538b\u7f29\u6d88\u606f\u53d1\u9001\r\n                )\r\n                break\r\n            except Exception as e:\r\n                log.error(\"Connect kafka fail, {}.\".format(e))\r\n                time.sleep(5)\r\n                continue\r\n\r\n    def sync_producer(self, data):\r\n        \"\"\"\r\n        \u540c\u6b65\u53d1\u9001 \u6570\u636e\r\n        :param data_li:  \u53d1\u9001\u6570\u636e\r\n        :return:\r\n        \"\"\"\r\n        while True:\r\n            try:\r\n                future = self.producer.send(self.topic, data)\r\n                record_metadata = future.get(timeout=10)  # \u540c\u6b65\u786e\u8ba4\u6d88\u8d39\r\n                partition = record_metadata.partition  # \u6570\u636e\u6240\u5728\u7684\u5206\u533a\r\n                offset = record_metadata.offset  # \u6570\u636e\u6240\u5728\u5206\u533a\u7684\u4f4d\u7f6e\r\n                log.debug(\"save success, partition: {}, offset: {}\".format(partition, offset))\r\n                break\r\n            except Exception as e:\r\n                log.error(\"Kafka sync send fail, {}.\".format(e))\r\n                self.connect()\r\n\r\n    def asyn_producer(self, data):\r\n        \"\"\"\r\n        \u5f02\u6b65\u53d1\u9001\u6570\u636e\r\n        :param data_li:\u53d1\u9001\u6570\u636e\r\n        :return:\r\n        \"\"\"\r\n        while True:\r\n            try:\r\n                self.producer.send(self.topic, data)\r\n                # self.producer.flush()  # \u6279\u91cf\u63d0\u4ea4\r\n                break\r\n            except Exception as e:\r\n                log.error(\"Kafka asyn send fail, {}.\".format(e))\r\n                self.connect()\r\n\r\n    def asyn_producer_callback(self, data):\r\n        \"\"\"\r\n        \u5f02\u6b65\u53d1\u9001\u6570\u636e + \u53d1\u9001\u72b6\u6001\u5904\u7406\r\n        :param data_li:\u53d1\u9001\u6570\u636e\r\n        :return:\r\n        \"\"\"\r\n        while True:\r\n            try:\r\n                for item in data:\r\n                    self.producer.send(self.topic, item).add_callback(self.send_success).add_errback(self.send_error)\r\n                # self.producer.send(self.topic, data).add_callback(self.send_success).add_errback(self.send_error)\r\n                self.producer.flush()  # \u6279\u91cf\u63d0\u4ea4\r\n                break\r\n            except Exception as e:\r\n                log.error(\"Kafka asyn send fail, {}.\".format(e))\r\n                self.connect()\r\n\r\n    def send_success(self, *args, **kwargs):\r\n        \"\"\"\u5f02\u6b65\u53d1\u9001\u6210\u529f\u56de\u8c03\u51fd\u6570\"\"\"\r\n        log.debug(\"save success\")\r\n        return\r\n\r\n    def send_error(self, *args, **kwargs):\r\n        \"\"\"\u5f02\u6b65\u53d1\u9001\u9519\u8bef\u56de\u8c03\u51fd\u6570\"\"\"\r\n        log.debug(\"save error\")\r\n        return\r\n\r\n    def close_producer(self):\r\n        try:\r\n            self.producer.close()\r\n        except:\r\n            pass\r\n\r\n\r\n\"\"\"\r\nkafka \u6d88\u8d39\u5546\r\n\"\"\"\r\n\r\n\r\nclass PConsumers(object):\r\n    def __init__(self, host, port, topic, group_id):\r\n        \"\"\"\r\n        :param bootstrap_servers: \u5730\u5740\r\n        \"\"\"\r\n        self.host = host\r\n        self.port = port\r\n        self.bootstrap_servers = \"{0}:{1}\".format(self.host, self.port)\r\n        self.topic = topic\r\n        self.group_id = group_id\r\n        self.consumer = None\r\n        self.set_conn()\r\n\r\n        log.info(\r\n            \"Kafka consumers info {0} {1} {2}\".format(\r\n                self.bootstrap_servers,\r\n                self.topic,\r\n                self.group_id\r\n            )\r\n        )\r\n\r\n    def set_conn(self):\r\n        while True:\r\n            try:\r\n                self.consumer = KafkaConsumer(\r\n                    self.topic,\r\n                    bootstrap_servers=self.bootstrap_servers,\r\n                    group_id=self.group_id,\r\n                    enable_auto_commit=False,\r\n                    # auto_commit_interval_ms = 1000,\r\n                    session_timeout_ms=30000,\r\n                    # max_poll_records=50,\r\n                    max_poll_records=1,\r\n                    # max_poll_interval_ms=30000,\r\n                    max_poll_interval_ms=86400000,\r\n                    # metadata_max_age_ms = 3000,\r\n                    auto_offset_reset=\"latest\",\r\n                    # auto_offset_reset = \"earliest\"\r\n                )\r\n                break\r\n            except Exception as e:\r\n                log.error(\"Kafka pconsumers set connect fail, {0}, {1}\".format(e, traceback.print_exc()))\r\n                time.sleep(5)\r\n                continue\r\n\r\n    def get_message(self, count=1):\r\n        \"\"\"\r\n        :param count: \u53d6\u7684\u6761\u6570\r\n        :return: msg\r\n        \"\"\"\r\n        counter = 0\r\n        msg = []\r\n\r\n        while True:\r\n            try:\r\n                consumer = KafkaConsumer(\r\n                    self.topic,\r\n                    bootstrap_servers=self.bootstrap_servers,\r\n                    group_id=self.group_id,\r\n                    # auto_offset_reset = \"latest\"\r\n                    auto_offset_reset=\"earliest\"\r\n                )\r\n\r\n                for message in consumer:\r\n                    log.debug(\r\n                        \"%s:%d:%d: key=%s value=%s header=%s\" % (\r\n                            message.topic, message.partition,\r\n                            message.offset, message.key, message.value, message.headers\r\n                        )\r\n                    )\r\n\r\n                    msg.append(json.loads(message.value))\r\n\r\n                    counter += 1\r\n                    if count == counter:\r\n                        break\r\n                    else:\r\n                        continue\r\n\r\n                consumer.close()\r\n                return msg\r\n            except Exception as e:\r\n                log.error(\"Kafka get message fail, {0}, {1}\".format(e, traceback.print_exc()))\r\n                time.sleep(5)\r\n                continue\r\n\r\n    def get_count(self):\r\n        \"\"\"\r\n        :return: count\r\n        \"\"\"\r\n        left_sum = 0\r\n        test_num = 5\r\n\r\n        while test_num:\r\n            try:\r\n                consumer = KafkaConsumer(\r\n                    self.topic,\r\n                    bootstrap_servers=self.bootstrap_servers,\r\n                    group_id=self.group_id\r\n                )\r\n                partitions = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)]\r\n\r\n                # print(\"start to cal offset:\")\r\n\r\n                # total\r\n                toff = consumer.end_offsets(partitions)\r\n                toff = [(key.partition, toff[key]) for key in toff.keys()]\r\n                toff.sort()\r\n                # print(\"total offset: {}\".format(str(toff)))\r\n\r\n                # current\r\n                coff = [(x.partition, consumer.committed(x)) for x in partitions]\r\n                coff.sort()\r\n                # print(\"current offset: {}\".format(str(coff)))\r\n\r\n                # cal sum and left\r\n                toff_sum = sum([x[1] for x in toff])\r\n                cur_sum = sum([x[1] for x in coff if x[1] is not None])\r\n                left_sum = toff_sum - cur_sum\r\n                # print(\"kafka left: {}\".format(left_sum))\r\n\r\n                consumer.close()\r\n                return left_sum\r\n            except Exception as e:\r\n                log.error(\"Kafka get count fail, {0}, {1}\".format(e, traceback.print_exc()))\r\n                time.sleep(5)\r\n                test_num -= 1\r\n                continue\r\n        return left_sum\r\n\r\n\r\nif __name__ == \"__main__\":\r\n    send_data_li = {\"test\": 1}\r\n    # kp = KProducer(topic=\"test\", bootstrap_servers='127.0.0.1:9001,127.0.0.1:9002')\r\n    kp = KProducer(bootstrap_servers=\"1.1.1.1:9092\")\r\n\r\n    # \u540c\u6b65\u53d1\u9001\r\n    # kp.sync_producer(send_data_li)\r\n\r\n    # \u5f02\u6b65\u53d1\u9001\r\n    # kp.asyn_producer(send_data_li)\r\n\r\n    # \u5f02\u6b65+\u56de\u8c03\r\n    kp.asyn_producer_callback(topic=\"test\", data=send_data_li)\r\n\r\n    # kp.close_producer()\r\n\r\n    # cp = PConsumers(bootstrap_servers=\"1.1.1.1:9092\", topic=\"detect-file\")\r\n    cp = PConsumers(bootstrap_servers=\"1.1.1.1:9092\", group_id=\"boxer\")\r\n    # cp = PConsumers(bootstrap_servers=\"1.1.1.1:9092\", topic=\"custom-event\")\r\n\r\n    # print(cp.get_count(topic=\"test\"))\r\n    print(cp.get_message(topic=\"test\"))<\/pre>\n<p><\/div><\/div>\n","protected":false},"excerpt":{"rendered":"<p>\u4e00\u3001\u57fa\u672c\u6982\u5ff5\u56de\u987e Topic\uff1a\u4e00\u7ec4\u6d88\u606f\u6570\u636e\u7684\u6807\u8bb0\u7b26\uff1b Producer\uff1a\u751f\u4ea7\u8005\uff0c\u7528\u4e8e\u751f\u4ea7\u6570\u636e\uff0c\u53ef\u5c06\u751f\u4ea7\u540e\u7684\u6d88\u606f [&hellip;]<\/p>\n","protected":false},"author":3,"featured_media":0,"comment_status":"closed","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":[],"categories":[454],"tags":[],"_links":{"self":[{"href":"https:\/\/egonlin.com\/index.php?rest_route=\/wp\/v2\/posts\/10274"}],"collection":[{"href":"https:\/\/egonlin.com\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/egonlin.com\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/egonlin.com\/index.php?rest_route=\/wp\/v2\/users\/3"}],"replies":[{"embeddable":true,"href":"https:\/\/egonlin.com\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=10274"}],"version-history":[{"count":16,"href":"https:\/\/egonlin.com\/index.php?rest_route=\/wp\/v2\/posts\/10274\/revisions"}],"predecessor-version":[{"id":11311,"href":"https:\/\/egonlin.com\/index.php?rest_route=\/wp\/v2\/posts\/10274\/revisions\/11311"}],"wp:attachment":[{"href":"https:\/\/egonlin.com\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=10274"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/egonlin.com\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=10274"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/egonlin.com\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=10274"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}