Build a Distributed Streaming System with Apache Kafka and Python

Build a Distributed Streaming System with Apache Kafka and Python
Build a Distributed Streaming System with Apache Kafka and Python

What is Apache Kafka?

Kafka is an open source distributed streaming platform that simplifies data integration between systems. A stream is a pipeline to which your applications receives data continuously. As a streaming platform Kafka has two primary uses:

  • Data Integration: Kafka captures streams of events or data changes and feeds these to other data systems such as relational databases, key-value stores or data warehouses.

  • Stream processing: Kafka accepts each stream of events and stores it in an append-only queue called a log. Information in the log is immutable hence enables continuous, real-time processing and transformation of these streams and makes the results available system-wide.

Compared to other technologies, Kafka has a better throughput, built-in partitioning, replication, and fault-tolerance which makes it a good solution for large-scale message processing applications.

Kafka system has three main components:

  1. A Producer: The service that emits the source data.

  2. A Broker: Kafka acts as an intermediary between the producer and the consumer. It uses the power of API's to get and broadcast data

  3. A Consumer: The service that uses the data which the broker will broadcast.

You can find more info on Kafka’s Official site

Project

We are going to build a simple streaming application that streams a video file from our producer and displays it in a web browser. This project aims to showcase data integration and stream processing properties of Kafka.

Project requirements:

This project introduces the basics of Kafka and messaging. Basic knowledge of Python is needed.

  • Data Integration: Kafka captures streams of events or data changes and feeds these to other data systems such as relational databases, key-value stores or data warehouses.

  • Stream processing: Kafka accepts each stream of events and stores it in an append-only queue called a log. Information in the log is immutable hence enables continuous, real-time processing and transformation of these streams and makes the results available system-wide.

Installing Kafka

  • Data Integration: Kafka captures streams of events or data changes and feeds these to other data systems such as relational databases, key-value stores or data warehouses.

  • Stream processing: Kafka accepts each stream of events and stores it in an append-only queue called a log. Information in the log is immutable hence enables continuous, real-time processing and transformation of these streams and makes the results available system-wide.

For Linux user follow installation instruction from here.
Kafka runs on port 9092 by default

Setting up:

Our project will consist of:

  • Data Integration: Kafka captures streams of events or data changes and feeds these to other data systems such as relational databases, key-value stores or data warehouses.

  • Stream processing: Kafka accepts each stream of events and stores it in an append-only queue called a log. Information in the log is immutable hence enables continuous, real-time processing and transformation of these streams and makes the results available system-wide.

Create a project directory :

 $ mkdir kafka  &&  cd kafka

Create a virtualenv and activate it :

$ virtualenv env && source env/bin/activate

Install required dependencies :
We need to install Flask and opencv

pip install kafka-python opencv-python Flask

Creating the Producer

A producer is a service that sends messages to the Kafka broker.
One thing to note is, the producer is not concerned with the various systems that will eventually consume or load the broadcast data.

Let's create it

Create a producer.py file and add this code.

# producer.py

import time
import cv2
from kafka import SimpleProducer, KafkaClient
#  connect to Kafka
kafka = KafkaClient('localhost:9092')
producer = SimpleProducer(kafka)
# Assign a topic
topic = 'my-topic'

Creating the Message:

The message will consist of images sent in the binary form. OpenCV enables us to read our movie file and convert it into bytes before sending it to Kafka.
We need to create a function that will take in the video file, read the file and convert it into bytes before sending it to Kafka. For this tutorial, place the video file in the same folder as the producer.

Sending the message:

Kafka messages are in byte string format, therefore images need encoding before sending.

Here is the full producer code.

# producer.py

import time
import cv2
from kafka import SimpleProducer, KafkaClient
#  connect to Kafka
kafka = KafkaClient('localhost:9092')
producer = SimpleProducer(kafka)
# Assign a topic
topic = 'my-topic'

def video_emitter(video):
    # Open the video
    video = cv2.VideoCapture(video)
    print(' emitting.....')

    # read the file
    while (video.isOpened):
        # read the image in each frame
        success, image = video.read()
        # check if the file has read to the end
        if not success:
            break
        # convert the image png
        ret, jpeg = cv2.imencode('.png', image)
        # Convert the image to bytes and send to kafka
        producer.send_messages(topic, jpeg.tobytes())
        # To reduce CPU usage create sleep time of 0.2sec  
        time.sleep(0.2)
    # clear the capture
    video.release()
    print('done emitting')

if __name__ == '__main__':
    video_emitter('video.mp4')

Great !! we are done with the producer.

Creating the Consumer

The consumer is a service that listens and consumes messages from Kafka brokers.
Our consumer should listen to messages bearing the topic my-topic and display them.
We shall use Flask - A Python microframework to display the received video images.

Continuous Listening:

Updates and new messages from the broker are fetched by continuously listening to what is broadcast. A generator is used to keep the connection open. A generator is a loop that produces results sequentially instead of a single value. Since images are streamed sequentially, our response will use multipart/x-mixed-replace mime type.

Here is the consumer.py code.

from flask import Flask, Response
from kafka import KafkaConsumer
#connect to Kafka server and pass the topic we want to consume
consumer = KafkaConsumer('my-topic', group_id='view' bootstrap_servers=['0.0.0.0:9092'])
#Continuously listen to the connection and print messages as recieved
app = Flask(__name__)

@app.route('/')
def index():
    # return a multipart response
    return Response(kafkastream(),
                    mimetype='multipart/x-mixed-replace; boundary=frame')
def kafkastream():
    for msg in consumer:
        yield (b'--frame\r\n'
               b'Content-Type: image/png\r\n\r\n' + msg.value + b'\r\n\r\n')

if __name__ == '__main__':
    app.run(host='0.0.0.0', debug=True)

Running the program

Make sure Kafka is working by running brew services start kafka.

Next, open two terminals.

  • Data Integration: Kafka captures streams of events or data changes and feeds these to other data systems such as relational databases, key-value stores or data warehouses.

  • Stream processing: Kafka accepts each stream of events and stores it in an append-only queue called a log. Information in the log is immutable hence enables continuous, real-time processing and transformation of these streams and makes the results available system-wide.

  (env)$ python producer.py
  • Data Integration: Kafka captures streams of events or data changes and feeds these to other data systems such as relational databases, key-value stores or data warehouses.

  • Stream processing: Kafka accepts each stream of events and stores it in an append-only queue called a log. Information in the log is immutable hence enables continuous, real-time processing and transformation of these streams and makes the results available system-wide.

  (env)$ python consumer.py

This runs our flask web server.

Next, open your browser and navigate to http://0.0.0.0:5000,

Observations

  1. A Producer: The service that emits the source data.

  2. A Broker: Kafka acts as an intermediary between the producer and the consumer. It uses the power of API's to get and broadcast data

  3. A Consumer: The service that uses the data which the broker will broadcast.

Where to use Kafka:

  • Data Integration: Kafka captures streams of events or data changes and feeds these to other data systems such as relational databases, key-value stores or data warehouses.

  • Stream processing: Kafka accepts each stream of events and stores it in an append-only queue called a log. Information in the log is immutable hence enables continuous, real-time processing and transformation of these streams and makes the results available system-wide.

Conclusion :

Kafka is a fast, scalable and easy to use distributed streaming system. To use the system you need to know:

  • Data Integration: Kafka captures streams of events or data changes and feeds these to other data systems such as relational databases, key-value stores or data warehouses.

  • Stream processing: Kafka accepts each stream of events and stores it in an append-only queue called a log. Information in the log is immutable hence enables continuous, real-time processing and transformation of these streams and makes the results available system-wide.

We created a simple streaming application demonstrating the advantages of streaming data, how fast it is and how Kafka works as a broker.

Hope you have a picture of how a streaming system work.

Learn Python Online

Complete Python Bootcamp: Go from zero to hero in Python

Complete Python Masterclass

The Python Bible™ | Everything You Need to Program in Python

Learning Python for Data Analysis and Visualization

Python for Financial Analysis and Algorithmic Trading

Suggest:

Python 3 Tutorial for Beginners

Python Tutorial for Beginners

Learn Python by Immersion

Python Scripting Tutorial for Beginners

Python Tutorial for Beginners - Crash Course 2019 | Build a Game with Python

Python Tutorial for Beginners (2019) - Learn Python for Machine Learning and Web Development