SoFunction
Updated on 2024-11-13

python consumption kafka data tutorial

1. Install the python module

pip install --user kafka-python==1.4.3 

If you get compression related errors try installing the following dependencies

yum install snappy-devel
yum install lz4-devel
pip install python-snappy
pip install lz4

2. Producers

#!/usr/bin/env python
# coding : utf-8

from kafka import KafkaProducer
import json

def kafkaProducer():
  producer = KafkaProducer(bootstrap_servers='ip:9092',value_serializer=lambda v: (v).encode('utf-8'))
  ('world', {'key1': 'value1'})

if __name__ == '__main__':
  kafkaProducer()

2. Consumers

from kafka import KafkaConsumer
from  import TopicPartition
import time
import click
import ConfigParser
import json
import threading
import datetime
import sched


config = ()
("")

@()
def cli():
  pass

@()
@('--topic',type=str)
@('--offset', type=(['smallest', 'earliest', 'largest']))
@("--group",type=str)
def client(topic,offset,group):
  (topic)
  consumer = KafkaConsumer(topic,
               bootstrap_servers=("KAFKA", "Broker_Servers").split(","),
               group_id=group,
               auto_offset_reset=offset)
  for message in consumer:
    ()
    # ("%d:%d: key=%s value=%s" % (,
    #                      , ,
    #                      ))

if __name__ == '__main__':
  cli()

3. Multi-threaded consumption

#coding:utf-8
import threading

import os
import sys
from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
from collections import OrderedDict


threads = []


class MyThread():
  def __init__(self, thread_name, topic, partition):
    .__init__(self)
    self.thread_name = thread_name
     = partition
     = topic

  def run(self):
    print("Starting " + )
    Consumer(self.thread_name, , )

  def stop(self):
    ()


def Consumer(thread_name, topic, partition):
  broker_list = 'ip1:9092,ip2:9092'

  '''
  fetch_min_bytes (int) - the minimum amount of data to be returned by the server for a fetch request, otherwise wait
  fetch_max_wait_ms (int) - the maximum amount of time (in milliseconds) that the server will block before responding to a fetch request if there is not enough data to immediately fulfill the requirement given by fetch_min_bytes
  fetch_max_bytes (int) - The maximum amount of data that the server should return for a fetch request. This is not an absolute maximum, if the first message in the first non-empty partition fetched is greater than this value, the
              then the message will still be returned to ensure that the consumer can make progress. Note: Users perform fetches for multiple agents in parallel, so memory usage will depend on the number of agents that contain the topic partition.
              Supported Kafka versions > = 0.10.1.0. Default: 52428800 (50 MB).
  enable_auto_commit (bool) - If True, the consumer's offset will be committed periodically in the background. Default: True.
  max_poll_records (int) - Maximum number of records returned in a single call to poll(). Default value: 500.
  max_poll_interval_ms (int) - Maximum delay between calls when poll() uses user group management . This sets an upper limit on the amount of time a consumer can be idle before fetching more records.
                If poll() is not called before this timeout expires, the user is considered to have failed and the group will be rebalanced in order to reassign the partition to another member. Default 300000
  '''

  consumer = KafkaConsumer(bootstrap_servers=broker_list,
               group_,
               client_id=thread_name,
               enable_auto_commit=False,
               fetch_min_bytes=1024 * 1024, # 1M
               # fetch_max_bytes=1024 * 1024 * 1024 * 10,
               fetch_max_wait_ms=60000, # 30s
               request_timeout_ms=305000,
               # consumer_timeout_ms=1,
               # max_poll_records=5000,
               )
  # Set the topic partition
  tp = TopicPartition(topic, partition)
  # Allocate the TopicPartition for this consumer, that is, the topic and partition, and depending on the parameters, each threaded consumer consumes one partition
  ([tp])
  # Get the maximum offset of the last consumption
  offset = consumer.end_offsets([tp])[tp]
  print(thread_name, tp, offset)

  # Setting the offset for consumption
  (tp, offset)

  print u"Program first run\t thread:", thread_name, u"Partition:", partition, u"Offset:", offset, u"\t start spending..."
  num = 0 # of times this consumer has spent money
  while True:
    msg = (timeout_ms=60000)
    end_offset = consumer.end_offsets([tp])[tp]
    '''You can keep your own records to control consumption'''
    print u'Saved offsets', (tp), u'Latest offset,', end_offset
    if len(msg) > 0:
      print u"Thread:", thread_name, u"Partition:", partition, u"Maximum offset:", end_offset, u"Availability of data,", len(msg)
      lines = 0
      for data in ():
        for line in data:
          print line
          lines += 1
        '''
        do something
        '''
      # of messages in this batch of threads

      print(thread_name, "lines", lines)
      if True:
        # You can save your own offsets in each topic, partition
        # Manually submit offsets offsets format: {TopicPartition:OffsetAndMetadata(offset_num,None)}
        (offsets={tp: (OffsetAndMetadata(end_offset, None))})
        if True == 0:
          # System exit? I haven't tried that yet.
          ()
          '''
          () can only exit the thread, which means that the other two threads run normally, and the main program does not exit.
          '''
      else:
        ()
    else:
      print thread_name, 'No data'
    num += 1
    print thread_name, "No.", num, "Second."


if __name__ == '__main__':
  try:
    t1 = MyThread("Thread-0", "test", 0)
    (t1)
    t2 = MyThread("Thread-1", "test", 1)
    (t2)
    t3 = MyThread("Thread-2", "test", 2)
    (t3)

    for t in threads:
      ()

    for t in threads:
      ()

    print("exit program with 0")
  except:
    print("Error: failed to run consumer program")

Reference:/en/master/

https:///article/

This above python consumption kafka data tutorial is all I have shared with you.