RabbitMQ & celery demo using image processing app on flask

Celery is a distributed system for processing messages on a task queue with a focus on real-time processing and support for task scheduling. When we have to run an expensive function that keeps user waiting for like “forever”, it is always better to use something like celery. In this blog we will be writing a face detection web app using flask, python-opencv and celery.

Before I can tell something, let me share a flask code snippet with you:

from time import sleep

@app.route("/")
def hello():
    sleep(10) # <---what would you see in this 10s?
    return "Hello World!"

Can you tell me what would you see in first 10s while we run our flask app? I know the answer, before getting the response it will keep the user waiting for 10s. We don’t love to wait 10s. We are so impatient, we want everything instantly thats the motivation that we have in modern computing. But life is cruel, we can’t get everything instantly, we understand that but our users DO NOT understand this simple truth. So we do what, we will try to sell them a feeling that we are working instantly, at least it is not taking forever to load. So we need to get over from that sleep block. How would we do that, that’s what I am going to discuss in this blog with a real life image processing app in flask.

Obviously in life we don’t need to write “sleep” to make our code run slower. We had to write plenty of function that makes our life slower. In this blog we will discuss we will be writing an application that enables user to upload a picture and we will help them to detect faces. So what is the function we have this face detection function which is very expensive. It takes almost 3-10s on my machine to detect the face of my favourite actress. Let me share my code:

#server.py

__author__ = 'sadaf2605'


import os
from flask import Flask, request, redirect, url_for
from werkzeug import secure_filename

import face_detect
from os.path import basename


UPLOAD_FOLDER = '/home/sadaf2605/flask_celery_upload_image/uploads'
ALLOWED_EXTENSIONS = set(['txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif'])

app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER


def allowed_file(filename):
    return '.' in filename and 
           filename.rsplit('.', 1)[1] in ALLOWED_EXTENSIONS

@app.route('/', methods=['GET', 'POST'])
def upload_file():
    if request.method == 'POST':
        import time
        start_time = time.time()
        file = request.files['file']

        if file and allowed_file(file.filename):
            filename = secure_filename(file.filename)

            base,ext=os.path.splitext(filename)


            file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
            face_detect.detect(os.path.join(app.config['UPLOAD_FOLDER'], filename),os.path.join(app.config['UPLOAD_FOLDER'], base+"-face"+ext))

            print "--- %s seconds ---" % str (time.time() - start_time)
            return redirect("/")
            return redirect(url_for('uploaded_file',
                                    filename="facedetect-"+filename))

    from os import listdir
    from os.path import isfile, join
    htmlpic=""
    for f in sorted(listdir(UPLOAD_FOLDER)):
        if isfile(join(UPLOAD_FOLDER,f)):
            print f
            htmlpic+="""
            
                
            
                """

    return '''
    
    

    
    Upload new File
    

Upload new File

'''+htmlpic from flask import send_from_directory @app.route('/uploads/') def uploaded_file(filename): return send_from_directory(app.config['UPLOAD_FOLDER'], filename) from werkzeug import SharedDataMiddleware app.add_url_rule('/uploads/', 'uploaded_file', build_only=True) app.wsgi_app = SharedDataMiddleware(app.wsgi_app, { '/uploads': app.config['UPLOAD_FOLDER'] }) if __name__ == "__main__": app.debug=True app.run()
#face_detect.py

import numpy as np
import cv2


face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_default.xml')

@app.task
def detect(src_img,dest_img):
    img = cv2.imread(src_img)
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

    faces = face_cascade.detectMultiScale(gray, 1.3, 1)
    for (x,y,w,h) in faces:
        cv2.rectangle(img,(x,y),(x+w,y+h),(255,0,0),5)
        roi_gray = gray[y:y+h, x:x+w]
        roi_color = img[y:y+h, x:x+w]


    cv2.imwrite(dest_img, img)

You can test this app running using

python server.py

But we don’t want our user to wait 10s to see the next page. So we will use celery and rabbitmq to help us. First of all lets install RABBITMQ and CELERY.

To install rabbitMq we will use aptitude because it installed all its dependency in the way, if you don’t have aptitude installed then:

sudo apt-get install aptitude

Now its time to install rabbitmq server

 sudo aptitude install rabbitmq-server

Now we will create user and server for rabbitmq.

sudo rabbitmqctl add_user rabbit_user password
sudo rabbitmqctl add_vhost /app_rabbit

we will set permission for our user to do everything

sudo rabbitmqctl set_permissions -p /app_rabbit rabbit_user ".*" ".*" ".*"

Now we need to restart rabbit server, so that the change gets implemented

sudo /etc/init.d/rabbitmq-server stop
sudo /etc/init.d/rabbitmq-server start

Now we will install celery:

pip install celery

Now we need to configure celery, and celery provides few decorator functions like @tasks to achieve our goal. Rabbitmq is default for celery. Now we need to know that celery communicate via broker url using a different port. We want to enqueue our image processing tasks, so we can define it in face_ditect.py but it will be better if we can put it in our server.py as it is the entry point… but whatever for now!

import numpy as np
import cv2

from celery import Celery

app= Celery(broker='amqp://rabbit_user:password@localhost:5672//app_rabbit' )


face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_default.xml')

@app.task
def detect(src_img,dest_img):
    img = cv2.imread(src_img)
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

    faces = face_cascade.detectMultiScale(gray, 1.3, 1)
    for (x,y,w,h) in faces:
        cv2.rectangle(img,(x,y),(x+w,y+h),(255,0,0),5)
        roi_gray = gray[y:y+h, x:x+w]
        roi_color = img[y:y+h, x:x+w]


    cv2.imwrite(dest_img, img)

Now it won’t change your life radically, because we are not using the decorator functions that celery provided us. To put that task in queue we need use delay function of decorator function. so we need to call detec_image.delay(src_img,dest_image) and we actually need to keep our celery server running other wise it will only put it in queue and wait for the server to run. In -A parameter of celery we need to mention which file the decorator functions are located.

To run celery server

celery worker -A detect_face -l INFO

So now finally we can change our server.py

__author__ = 'sadaf2605'


import os
from flask import Flask, request, redirect, url_for
from werkzeug import secure_filename

import face_detect
from os.path import basename


UPLOAD_FOLDER = '/home/sadaf2605/flask_celery_upload_image/uploads'
ALLOWED_EXTENSIONS = set(['txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif'])

app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER


def allowed_file(filename):
    return '.' in filename and 
           filename.rsplit('.', 1)[1] in ALLOWED_EXTENSIONS

@app.route('/', methods=['GET', 'POST'])
def upload_file():
    if request.method == 'POST':
        import time
        start_time = time.time()
        file = request.files['file']

        if file and allowed_file(file.filename):
            filename = secure_filename(file.filename)

            base,ext=os.path.splitext(filename)


            file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
            face_detect.detect.delay(os.path.join(app.config['UPLOAD_FOLDER'], filename),os.path.join(app.config['UPLOAD_FOLDER'], base+"-face"+ext))

            print "--- %s seconds ---" % str (time.time() - start_time)
            return redirect("/")
            return redirect(url_for('uploaded_file',
                                    filename="facedetect-"+filename))

    from os import listdir
    from os.path import isfile, join
    htmlpic=""
    for f in sorted(listdir(UPLOAD_FOLDER)):
        if isfile(join(UPLOAD_FOLDER,f)):
            print f
            htmlpic+="""
            
                
            
                """

    return '''
    
    

    
    Upload new File
    

Upload new File

'''+htmlpic from flask import send_from_directory @app.route('/uploads/') def uploaded_file(filename): return send_from_directory(app.config['UPLOAD_FOLDER'], filename) from werkzeug import SharedDataMiddleware app.add_url_rule('/uploads/', 'uploaded_file', build_only=True) app.wsgi_app = SharedDataMiddleware(app.wsgi_app, { '/uploads': app.config['UPLOAD_FOLDER'] }) if __name__ == "__main__": app.debug=True app.run()

So after uploading now in the front page the picture won’t show up you will need to refresh it couple of time after 5-6s to show up. Keep on refreshing and you may like to send me a pull request at: https://github.com/sadaf2605/facedetection-flaskwebapp-rabbitmq

ssl (https) from python flask

Basically here I am in this blog I am going share a code snippet, and I am going to describe what else stupid things I tried and did not work to do that. Well, don’t miss my point, when I am sharing my stupid ordeal, it does not mean, I am proving myself stupid but I am trying to save your time, I am basically telling what did not work in last couple of hour so that it could save your time.

So using flask when I shared a static file migrated directly from django, while working with django we have figured out that we actually don’t need powerful tool like django, instead we can use something very lightweight like flask and it is much easier to switch from django to flask as for both of them default front end templating is jinja. Now after this shift, I had to face a little bit trouble with https, because when I tried https, it is completely blank, I lost my mind, what the hell is going on here? Then I realized for flask probably I need to define my ssl. They got this nice documentation at their website (http://flask.pocoo.org/snippets/111/), I follwed them, and it did not work! Why it won’t work? Alright after couple of trial and google search I realized this is an old school way of doing this, fine, I need to go to new school, obviously! I got myself a new dress and now context looks more pretty {“cert.crt”,”key.key”}. I am impressed but what the hell? it did not work as well, why it won’t work? I lost my mind! hours of fighting, and I got this error:

Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 810, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 763, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/local/lib/python2.7/dist-packages/werkzeug/serving.py", line 602, in inner
    passthrough_errors, ssl_context).serve_forever()
  File "/usr/local/lib/python2.7/dist-packages/werkzeug/serving.py", line 506, in make_server
    passthrough_errors, ssl_context)
  File "/usr/local/lib/python2.7/dist-packages/werkzeug/serving.py", line 450, in __init__
    self.socket = ssl_context.wrap_socket(self.socket,
AttributeError: 'OpenSSL.SSL.Context' object has no attribute 'wrap_socket'

It is because I am using a python 2.7 version below 2.7.9.

What else did I try? You won’t want to know, I tried to install pyopenssl on heroku using pip, but looks like it is a ported version and failed to compile on heroku. Now I will write about what I had to do to make it work.

I have to make my cirtificate and keys:

$ openssl genrsa -des3 -passout pass:x -out server.pass.key 2048
 openssl rsa -passin pass:x -in server.pass.key -out server.key
 rm server.pass.key
 openssl req -new -key server.key -out server.csr
openssl x509 -req -days 365 -in server.csr -signkey server.key -out server.crt

So now we have server.key and server.csr two files in our directory

Now python script:

from flask import Flask, request, send_from_directory
from flask import render_template

#from OpenSSL import SSL
#context = SSL.Context(SSL.SSLv23_METHOD)
#context.use_privatekey_file('server.key')
#context.use_certificate_file('server.crt')

import os

ASSETS_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), '/static')
app = Flask(__name__, static_url_path='/static')

@app.route('/js/')
def send_js(path):
    return send_from_directory('js', path)

@app.route('/signup')
def signup():
    return render_template('signup.html')

if __name__ == '__main__':
    context = ('server.crt', 'server.key')
    app.run(ssl_context=context, threaded=True, debug=True)

Done! Up and running if you got a version >=2.7.9.!