1. Install the python module
pip install --user kafka-python==1.4.3
If you get compression related errors try installing the following dependencies
yum install snappy-devel yum install lz4-devel pip install python-snappy pip install lz4
2. Producers
#!/usr/bin/env python # coding : utf-8 from kafka import KafkaProducer import json def kafkaProducer(): producer = KafkaProducer(bootstrap_servers='ip:9092',value_serializer=lambda v: (v).encode('utf-8')) ('world', {'key1': 'value1'}) if __name__ == '__main__': kafkaProducer()
2. Consumers
from kafka import KafkaConsumer from import TopicPartition import time import click import ConfigParser import json import threading import datetime import sched config = () ("") @() def cli(): pass @() @('--topic',type=str) @('--offset', type=(['smallest', 'earliest', 'largest'])) @("--group",type=str) def client(topic,offset,group): (topic) consumer = KafkaConsumer(topic, bootstrap_servers=("KAFKA", "Broker_Servers").split(","), group_id=group, auto_offset_reset=offset) for message in consumer: () # ("%d:%d: key=%s value=%s" % (, # , , # )) if __name__ == '__main__': cli()
3. Multi-threaded consumption
#coding:utf-8 import threading import os import sys from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata from collections import OrderedDict threads = [] class MyThread(): def __init__(self, thread_name, topic, partition): .__init__(self) self.thread_name = thread_name = partition = topic def run(self): print("Starting " + ) Consumer(self.thread_name, , ) def stop(self): () def Consumer(thread_name, topic, partition): broker_list = 'ip1:9092,ip2:9092' ''' fetch_min_bytes (int) - the minimum amount of data to be returned by the server for a fetch request, otherwise wait fetch_max_wait_ms (int) - the maximum amount of time (in milliseconds) that the server will block before responding to a fetch request if there is not enough data to immediately fulfill the requirement given by fetch_min_bytes fetch_max_bytes (int) - The maximum amount of data that the server should return for a fetch request. This is not an absolute maximum, if the first message in the first non-empty partition fetched is greater than this value, the then the message will still be returned to ensure that the consumer can make progress. Note: Users perform fetches for multiple agents in parallel, so memory usage will depend on the number of agents that contain the topic partition. Supported Kafka versions > = 0.10.1.0. Default: 52428800 (50 MB). enable_auto_commit (bool) - If True, the consumer's offset will be committed periodically in the background. Default: True. max_poll_records (int) - Maximum number of records returned in a single call to poll(). Default value: 500. max_poll_interval_ms (int) - Maximum delay between calls when poll() uses user group management . This sets an upper limit on the amount of time a consumer can be idle before fetching more records. If poll() is not called before this timeout expires, the user is considered to have failed and the group will be rebalanced in order to reassign the partition to another member. Default 300000 ''' consumer = KafkaConsumer(bootstrap_servers=broker_list, group_, client_id=thread_name, enable_auto_commit=False, fetch_min_bytes=1024 * 1024, # 1M # fetch_max_bytes=1024 * 1024 * 1024 * 10, fetch_max_wait_ms=60000, # 30s request_timeout_ms=305000, # consumer_timeout_ms=1, # max_poll_records=5000, ) # Set the topic partition tp = TopicPartition(topic, partition) # Allocate the TopicPartition for this consumer, that is, the topic and partition, and depending on the parameters, each threaded consumer consumes one partition ([tp]) # Get the maximum offset of the last consumption offset = consumer.end_offsets([tp])[tp] print(thread_name, tp, offset) # Setting the offset for consumption (tp, offset) print u"Program first run\t thread:", thread_name, u"Partition:", partition, u"Offset:", offset, u"\t start spending..." num = 0 # of times this consumer has spent money while True: msg = (timeout_ms=60000) end_offset = consumer.end_offsets([tp])[tp] '''You can keep your own records to control consumption''' print u'Saved offsets', (tp), u'Latest offset,', end_offset if len(msg) > 0: print u"Thread:", thread_name, u"Partition:", partition, u"Maximum offset:", end_offset, u"Availability of data,", len(msg) lines = 0 for data in (): for line in data: print line lines += 1 ''' do something ''' # of messages in this batch of threads print(thread_name, "lines", lines) if True: # You can save your own offsets in each topic, partition # Manually submit offsets offsets format: {TopicPartition:OffsetAndMetadata(offset_num,None)} (offsets={tp: (OffsetAndMetadata(end_offset, None))}) if True == 0: # System exit? I haven't tried that yet. () ''' () can only exit the thread, which means that the other two threads run normally, and the main program does not exit. ''' else: () else: print thread_name, 'No data' num += 1 print thread_name, "No.", num, "Second." if __name__ == '__main__': try: t1 = MyThread("Thread-0", "test", 0) (t1) t2 = MyThread("Thread-1", "test", 1) (t2) t3 = MyThread("Thread-2", "test", 2) (t3) for t in threads: () for t in threads: () print("exit program with 0") except: print("Error: failed to run consumer program")
Reference:/en/master/
https:///article/
This above python consumption kafka data tutorial is all I have shared with you.