Recently, we had a need to execute multiple SQLAlchemy queries in a parallel fashion against a PostgreSQL database with Python 2 and psycopg2. We didn’t really need a full scale multithreading approach, so we turned to gevent. Gevent is a implementation of green threading that uses libevent. This post really isn’t about what threading model is better, as that is great subjective and changes based on your use case and needs.

Making Psycopg2 Coroutine Friendly

So the first thing we need to do to prepare to leverage gevent is make sure psycopg2 is configured properly. Because the main psycopg2 is a C extension, we can not just take advantage of gevent’s monkey patching. Nonetheless, psycopg2 exposes a hook that can be use with coroutine libraries to integrate with the event scheduler. There are a few libraries to help with this such as sqlalchemy_gevent or psycogreen that you can use to automatically set this part up for you. However, the code is fairly short and straight forward. Let’s look at the code from psycogreen, that I use in my applications.

    import psycopg2
    from psycopg2 import extensions

    from gevent.socket import wait_read, wait_write

    def make_psycopg_green():
        """Configure Psycopg to be used with gevent in non-blocking way."""
        if not hasattr(extensions, 'set_wait_callback'):
            raise ImportError(
                "support for coroutines not available in this Psycopg version (%s)"
                % psycopg2.__version__)

        extensions.set_wait_callback(gevent_wait_callback)

    def gevent_wait_callback(conn, timeout=None):
        """A wait callback useful to allow gevent to work with Psycopg."""
        while True:
            state = conn.poll()
            if state == extensions.POLL_OK:
                break
            elif state == extensions.POLL_READ:
                wait_read(conn.fileno(), timeout=timeout)
            elif state == extensions.POLL_WRITE:
                wait_write(conn.fileno(), timeout=timeout)
            else:
                raise psycopg2.OperationalError(
                    "Bad result from poll: %r" % state)
    

The code above will block the calling thread with a callback if we are busy reading or writing, which will return control back to the event loop that could start working on another thread if one needs to be worked on. The wait_read function blocks the current green let until the connection is ready to read from, and the wait_write does the same thing for writing to the connection.

With psycopg2 ready to work with gevent, we can start writing the code to execute our queries. I’m going to use a few different elements from gevent to control how the queries are executed, how many queries can be run at once, and how the query results are handled. There are many ways to use gevent and workers to accomplish this tasks, and some are quite simpler. My structure is set for future options and growth. I call this structure a QueryPool.

Building our Gevent based QueryPool

The QueryPool consists of an input queue to hold the queries we want to run, an output queue to hold the results, an overseer to load up the input queue, workers to actually get the queries and run them, and finally a function to drive the process.

The central element of our QueryPool, is the input queue. It is a special kind of FIFO queue called a JoinableQueue, which has a .join() method that blocks until all items in the queue have been acknowledged as processed with a task_done(). The input queue is loaded by an overseer as the first step of running the QueryPool. Then workers grab tasks one at a time from the input queue and when they finish their work they use the task_done() method to inform the queue that the item has been processed.

Let’s look at the __init__() for our Query pool:

    import gevent
    from gevent.queue import JoinableQueue, Queue
    class QueryPool(object):
        def __init__(self, queries, pool_size=5):
            self.queries = queries
            self.POOL_MAX = pool_size
            self.tasks = JoinableQueue()
            self.output_queue = Queue()
    

So in our __init__() method, we start by storing the queries. Next we set a POOL_MAX which is maximum number of workers we are going to use. This controls how many queries can be executed in parallel in our QueryPool. Then we setup the tasks queue, which is where are workers will pick up work from. Finally we setup the output queue that workers will publish the query results.

Defining worker methods

Now we can look at the workers, I use two methods in the QueryPool calls. The first one contains the logic to prepare a database connection and execute the SQLAlchemy query.

    def __query(self, query):
        conn = engine.connect()
        results = conn.execute(query).fetchall()
	return results
    

For the workers method within the QueryPool class, we start a loop that is going to run until their are not tasks left in the tasks (input) queue. It will get one task, call the __query() method with the details from the task, handle any exceptions, add the results to the output queue, and then mark the task as done. When adding the results to the output queue, we use the put method of gevent queues to add the results without blocking. This allows the event loop to look for the next thing to work on.

    def executor(self, number):
        while not self.tasks.empty():
            query = self.tasks.get()
            try:
                results = self.__query(query)
                self.output_queue.put(results)
            except Exception as exc_info:
                print exc_info
                print 'Query failed :('
            self.tasks.task_done()
    

Building an overseer method

Now that we have a something to handle the work, we need to load the task (input) queue. I use an overseer method that iterates over the supplied recipes and puts them on the task (input) queue.

    def overseer(self):
        for query in self.queries:
            self.tasks.put(query)
    

Building the run method

Finally, we are ready to build the run method that ties everything together and makes it work. Let’s look at the code first and then cover it step by step.

    def run(self):
        self.running = []
        gevent.spawn(self.overseer).join()
        for i in range(self.POOL_MAX):
            runner = gevent.spawn(self.executor, i)
            runner.start()
            self.running.append(runner)

        self.tasks.join()
        for runner in self.running:
            runner.kill()
      

The runner first spawn the overseer and immediately calls join on it, which blocks until the overseer is done loading items into the tasks (input) queue. This is overkill, but provides more options and would allow up to load asynchronously in the future. Next, we start a number of workers up to the POOL_MAX we defined earlier, and add them to a list so we can control them later. When we start them, they immediately begin working. Then we call .join() on the tasks (input) queue which will block execution until all the queries in the queue has been execute and acknowledge as completed. Finally, we clean up by using the .kill() method on all of the workers.

Using the QueryPool

Now we are ready to use our QueryPool class! I’ve created an Jupyter notebook available on GitHub with some example queries against a benchmarking database. I created my test database using the pgbench command to create a 5Gb database.

Execution Results

I setup 6 queries of varying complexity and execution time. I then executed those queries in a traditional serial execution style and then with our QueryPool using different worker counts.

  • Serial: 24 seconds
  • QueryPool - 5 workers: 8 seconds
  • QueryPool - 3 workers: 9 seconds
  • QueryPool - 2 workers: 15 seconds

History

  • 2016-04-28 Added missing return thanks to Mike B.
  • 2016-04-29 Updated information about .put() method based on feedback from David S.
  • 2016-04-30 Updated while 1 to while True based on feedback from /u/Asdayasman

If you use pyenv, and have installed or updated Pyenv and built Python 2.x after Feb 17 or rolled your own Python 2.x by hand, you may experience an issue where some packages will fail to run with an error message like the following:

Traceback (most recent call last):

File "/Users/jasonmyers/.virtualenvs/test2/bin/jupyter-notebook", line 7, in 
from notebook.notebookapp import main
File "/Users/jasonmyers/.virtualenvs/test2/lib/python2.7/site-packages/notebook/
notebookapp.py", line 32, in 
from zmq.eventloop import ioloop
File "/Users/jasonmyers/.virtualenvs/test2/lib/python2.7/site-packages/zmq/
__init__.py", line 66, in 
from zmq import backend
File "/Users/jasonmyers/.virtualenvs/test2/lib/python2.7/site-packages/zmq/backend/
__init__.py", line 40, in 
reraise(*exc_info)
File "/Users/jasonmyers/.virtualenvs/test2/lib/python2.7/site-packages/zmq/backend/
__init__.py", line 27, in 
_ns = select_backend(first)
File "/Users/jasonmyers/.virtualenvs/test2/lib/python2.7/site-packages/zmq/backend/
select.py", line 27, in select_backend
mod = __import__(name, fromlist=public_api)
File "/Users/jasonmyers/.virtualenvs/test2/lib/python2.7/site-packages/zmq/backend/cython/
__init__.py", line 6, in 
from . import (constants, error, message, context,
ImportError: dlopen(/Users/jasonmyers/.virtualenvs/test2/lib/python2.7/site-packages/zmq/
backend/cython/constants.so, 2): Symbol not found: _PyUnicodeUCS2_DecodeUTF8
Referenced from: /Users/jasonmyers/.virtualenvs/test2/lib/python2.7/site-packages/zmq/backend/
cython/constants.so
Expected in: flat namespace
in /Users/jasonmyers/.virtualenvs/test2/lib/python2.7/site-packages/zmq/backend/cython/constants.so

This is due to the fact that recently python wheel packages switched to “wide” unicode (ucs4) via PEP-513. Since not all of the wheel packages have been updated, you can end up with package that use “narrow” unicode (ucs2). I’ve seen this in jupyter, ipython, Pillow, and other Python packages. To fix this you need to install the packages without using the wheels until they release an update. For example: pip install --no-use-wheel jupyter This will take much longer to do the install; however, jupyter will work properly after being installed this way. More details at https://github.com/yyuu/pyenv/issues/257.

One of the readers of Essential SQLAlchemy sent me an email with more questions about how .join() works.  In the example below, he wanted to know why join was only required for User, LineItem, and Cookie objects. Why isn’t Order required?

query = session.query(Order.order_id, User.username, User.phone,
                      Cookie.cookie_name, LineItem.quantity,
                      LineItem.extended_cost)
query = query.join(User).join(LineItem).join(Cookie)
results = query.filter(User.username == 'cookiemon').all()

To answer that question, lets take a look at the SQL generated by the ORM for our query.

query = session.query(Order.order_id, User.username, User.phone,
                      Cookie.cookie_name, LineItem.quantity,
                      LineItem.extended_cost)
print(query)
SELECT orders.order_id AS orders_order_id, 
       users.username AS users_username, 
       users.phone AS users_phone, 
       cookies.cookie_name AS cookies_cookie_name, 
       line_items.quantity AS line_items_quantity, 
       line_items.extended_cost AS line_items_extended_cost 
FROM orders, users, cookies, line_items

We can see that the FROM clause contains the Orders, Users, Cookies, and LineItems ORM objects __tablename__s for each object in the query. Also, notice the order is based on where they appeared in the SELECT clause. Just like in SQL, we need to define how the tables are related with JOIN clauses. These JOIN clauses need to follow the order of the relationships between the tables. This means we need to make sure that the table to the left of the JOIN clause has a relationship with the table in the .join() statement. This can be a bit confusing when we have chained .join() statements as shown in the first example. The table in the prior .join() statement to the left must have a relationship with the table in the current .join() statement that was being evaluated. Lets look at the SQL generated after all the .join() statements.

query = query.join(User).join(LineItem).join(Cookie)
print(query)

SELECT orders.order_id AS orders_order_id, 
       users.username AS users_username, 
       users.phone AS users_phone, 
       cookies.cookie_name AS cookies_cookie_name, 
       line_items.quantity AS line_items_quantity, 
       line_items.extended_cost AS line_items_extended_cost
FROM orders JOIN users ON users.user_id = orders.user_id 
     JOIN line_items ON orders.order_id = line_items.order_id 
     JOIN cookies ON cookies.cookie_id = line_items.cookie_id

We can see now that the FROM clause contains the JOIN clauses in the order we chained them into the query. So Order is the target of the first JOIN with User, which is why we didn’t have to have a .join() for it. You can see and play with this more with an ipython notebook available at https://github.com/jasonamyers/blog-12-31-15-join-question/blob/master/blog-join-example.ipynb.

  1. Install Ubuntu (Whatever flavor you like)
  2. sudo apt-get upgrade && sudo apt-get upgrade
  3. Copy over SSH keys
  4. chmod 600 .ssh/*
  5. sudo apt-get install -y emacs git vim make gnome-tweak-tool curl build-essential libssl-dev zlib1g-dev libbz2-dev libreadline-dev libsqlite3-dev wget llvm libncurses5-dev
  6. Run Tweak Tool -> Click Typing -> set Caps Lock key behavior to “Make Caps Lock an additional Ctrl” -> set Alt/Win key behavior to “Alt and Meta are on Alt keys”
  7. Setup Firefox Sync
  8. Copy down dotfiles: git clone git@github.com:jasonamyers/dots
  9. cd dots
  10. ./setup.sh
  11. Download the lastest version of Go from https://golang.org/dl/
  12. Install Go: sudo tar -C /usr/local -xzf go1.5.2.linux-amd64.tar.gz
  13. ./stage2-setup.sh
  14. Open emacs, it will compile for a while and end in an error about go auto loads
  15. Run M-x update-file-autoloads, Use `~/Misc/emacs/go-mode.el/go-mode.el’ as the first answer, and ‘~/Misc/emacs/go-mode.el/go-mode-load.el’ as the autoloads files
  16. Exit and Save emacs
  17. Test it with: racer complete std::io::B

Additionally

When I code, I typically want the following from my editor:

  • Syntax highlight and indentation
  • Auto completion
  • Error checking
  • Jump to definition
  • Inline documentation

When coding in rust, I want these same features.  Thankfully there are a several great packages to help us achieve those desires. Before we do any futher I wanna make sure to give credit and a huge thanks to Bassam and his blog post. This is just to update based on what changes I found I needed.

Installing Packages

You’ll need to start by updating your package list contents by:

  • M-x package-refresh-contents

Next, we will install the follow packages in Emacs by M-x package-install :

  • company - An autocompletion framework - Website
  • company-racer - A backend that enables company autocompletion with racer - Website
  • racer - A rust autocompletion engine - Website
  • flycheck - on the fly syntax checking - Website
  • flycheck-rust - A rust backend for flycheck - Website
  • rust-mode - An Emacs mode for editing rust files - Website

Configuring Emacs

Now we need to configure these packages inside of our emacs configuration.  Here is a snippet of my .emacs.d/init.el:

;; Reduce the time after which the company auto completion popup opens
(setq company-idle-delay 0.2)

;; Reduce the number of characters before company kicks in
(setq company-minimum-prefix-length 1)

;; Set path to racer binary
(setq racer-cmd "/usr/local/bin/racer")

;; Set path to rust src directory
(setq racer-rust-src-path "/Users/jasomyer/.rust/src/")

;; Load rust-mode when you open `.rs` files
(add-to-list 'auto-mode-alist '("\\.rs\\'" . rust-mode))

;; Setting up configurations when you load rust-mode
(add-hook 'racer-mode-hook #'company-mode)
(add-hook 'rust-mode-hook #'racer-mode)
(add-hook 'racer-mode-hook #'eldoc-mode)
(add-hook 'racer-mode-hook #'company-mode)
(global-set-key (kbd "TAB") #'company-indent-or-complete-common) ;
(setq company-tooltip-align-annotations t)

Installing Rust

In case you don’t have Rust installed on your system you can install it either by:

  • Downloading the installation binary from Rust’s website.
  • or using Homebrew: brew install rust.

Building and Installing Racer

To generate the Racer binary that company-racer uses for it’s magical powers, you will need to clone the _racer _repository from Github to your home directory and run cargo build --release.

  • git clone https://github.com/phildawes/racer.git /tmp/racer
  • cd /tmp/racer
  • cargo build --release
  • mv /tmp/racer/target/release/racer /usr/local/bin
  • rm -rf /tmp/racer

After running these commands and restarting your terminal you should be able to run the racer command which should complain about a missing $RUST_SRC_PATH.

Setting $RUST_SRC_PATH

If you go back to the elisp function you added to our init.el earlier you will be able to see that you defined racer-rust-src-path which points to .rust/src in your home directory. You will need to add the Rust source code there so Racer can use that to load methods for completion.

  • git clone https://github.com/rust-lang/rust.git ~/.rust
  • Add export RUST_SRC_PATH=/Users/YOURUSERNAME/.rust/src to your .bashrc or shell init file.
  • Restart your shell and emacs then open a Rust file.

You should now have all the features we desired earlier.  You can test this by:

  1. Open a rust file and try typing use std::io::B and press <tab>. You should see some completion options.
  2. Place your cursor over a symbol and hit M-. to jump to the definition.

If that does not work, make sure that racer works from the command line by doing the following:

  • racer complete std::io::B

You should see some autocompletion options show up.