SoFunction
Updated on 2025-05-20

Example of using sse in Flask framework

Multiple Blueprints can be registered in a Flask application, and each Blueprint can correspond to an SSE interface. For example:

from flask import Flask, Response, Blueprint
import time

app = Flask(__name__)

bp1 = Blueprint('sse1', __name__)
bp2 = Blueprint('sse2', __name__)

@('/sse1')
def sse1():
    def generate():
        while True:
            yield 'data: {}\n\n'.format(())
            (1)
    return Response(generate(), mimetype='text/event-stream')

@('/sse2')
def sse2():
    def generate():
        while True:
            yield 'data: {}\n\n'.format(('%Y-%m-%d %H:%M:%S'))
            (1)
    return Response(generate(), mimetype='text/event-stream')

app.register_blueprint(bp1)
app.register_blueprint(bp2)

if __name__ == '__main__':
    (debug=True)

In this example, we create two Blueprints corresponding to the `/sse1` and `/sse2` interfaces, respectively. Each function in Blueprint returns an SSE data stream encapsulated using a response object of type `Response`. Finally, register these two Blueprints in the Flask application to enable two SSE interfaces at the same time.

from flask import Flask, Response, request

app = Flask(__name__)

# Store all subscriberssubscribers = {}

# Subscribe to the interface, each browser corresponds to a topic@('/subscribe/<topic>')
def subscribe(topic):
    def stream():
        # Add the currently requested client to the subscriber list        (topic, []).append(stream)
        # Infinite loop, waiting for new messages        while True:
            # Wait for new news            message = ('message')
            if message:
                # Push new messages to all subscribers                for subscriber in (topic, []):
                    (message)
            # Sleep for a period of time to reduce server pressure            (1)

    # Set the response header to tell the client that this is an SSE stream    response = Response(stream(), mimetype='text/event-stream')
    ['Cache-Control'] = 'no-cache'
    ['Connection'] = 'keep-alive'
    return response

# Push interface, push messages to all subscribers of the specified topic@('/publish/<topic>')
def publish(topic):
    # Get the message to be pushed    message = ('message')
    if message:
        # Push new messages to all subscribers        for subscriber in (topic, []):
            (message)
    return 'OK'

In the above code, we define two interfaces, `/subscribe/<topic>` is used to subscribe to the specified topic, and `/publish/<topic>` is used to push messages to all subscribers of the specified topic.
In the subscription interface, we add the currently requested client to the subscriber list and wait for a new message through an infinite loop. When a new message arrives, we push this message to all subscribers.
In the push interface, we get the message to be pushed and push this message to all subscribers of the specified topic.
Using this code, you can easily implement a simple SSE push service, with each browser corresponding to a topic.

import uuid
from flask import Flask, jsonify, request, Response, g
from flask_cors import CORS
from flask_sse import sse
import time
import json

app = Flask(__name__)
cros = CORS(app)
['REDIS_URL'] = 'redis://localhost'
app.register_blueprint(sse, url_prefix='/stream')
# SSE push function

# SSE push routes

@('/register', methods=["GET"])
def register():
    # Get the client identifier    client_id = str(uuid.uuid4())

    # Return SSE response    return jsonify({"client_id": client_id})

# SSE push routes

@('/sse', methods=['POST'])
def stream():
    # Get the client identifier    data = request.get_json()
    client_id = data['clientId']
    print("client_id", client_id)

    def aa():
        # Send SSE data loop        for i in range(10):
            data = 'Hello, %s!' % client_id+str(i)
            (data, channel=client_id, type='message')
            (1)
        ("end", channel=client_id, type='message')
     # Return SSE response    response = Response(aa(), mimetype='text/event-stream')
    ('Cache-Control', 'no-cache')
    ('Connection', 'keep-alive')
    ('X-Accel-Buffering', 'no')
    return response


if __name__ == '__main__':
    (debug=True, port=5000)

In the above code, we use the Flask-SSE extension to manage SSE channels. We create a unique identifier for each client in the `/register` route and store it in the request environment. In the `/sse` route, we use the `` method to send SSE data.

This is the article about the use examples of sse in the Flask framework. For more related content on Flask usage sse, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!