SQLAlchemy Core

An Introduction

Jason Myers / @jasonamyers


Background by maul555

Differences between Core and ORM

ORM - Domain Model


class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String)
    fullname = Column(String)
    password = Column(String)
            

Core - Schema-centric Model


from sqlalchemy import Table, Column, Integer, String, MetaData
metadata = MetaData()
users = Table('users', metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String),
    Column('fullname', String),
)
            

Structure

Copyright © 2014 Mochimochi Land

Structure

Installing

pip install sqlalchemy

pip install flask-sqlalchemy

bin/paster create -t pyramid_alchemy tutorial

Initializing


import sqlalchemy
from sqlalchemy import create_engine
engine = create_engine('sqlite:///:memory:')
            

Defining a Table


from sqlalchemy import Table, Column, Integer, String, MetaData, ForeignKey
metadata = MetaData()
actors = Table('actors', metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String),
    Column('fullname', String),
    Column('body_count', Integer)
)
roles = Table('roles', metadata,
    Column('id', Integer, primary_key=True),
    Column('actor_id', None, ForeignKey('actors.id')),
    Column('character_name', String, nullable=False)
)
            

Create the tables


metadata.create_all(engine)
            

Table Objects


actors.columns.items()

[
    ('id', Column('id', Integer(), table=actors, primary_key=True...)),
    ('name', Column('name', String(), table=actors)),
    ('fullname', Column('fullname', String(), table=actors)),
    ('body_count', Column('body_count', Integer(), table=actors))
]
            

Opening a connection


conn = engine.connect()
            

Single Insert


ins = actors.insert().\
    values(name='Graham', fullname='Graham Chapman', body_count=3)
result = conn.execute(ins)
result.inserted_primary_key

[1]
            

Looking at what was executed


print str(ins)
ins.compile().params

INSERT INTO actors (name, fullname, body_count) VALUES (:name, :fullname, :body_count)
{'body_count': 3, 'fullname': 'Graham Chapman', 'name': 'Graham'}
            

Multiple Insert


results = conn.execute(roles.insert(), [
    {'actor_id': 1, 'character_name' : 'King Arthur'},
    {'actor_id': 1, 'character_name' : 'Voice of God'},
    {'actor_id': 2, 'character_name' : 'Sir Lancelot'},
    {'actor_id': 2, 'character_name' : 'Black Knight'},
    {'actor_id': 3, 'character_name' : 'Patsy'},
    {'actor_id': 3, 'character_name' : 'Sir Bors'},
])
results.rowcount

6
            

Update


stmt = actors.update().where(actors.c.name == 'Graham').values(name='Gram')
result = conn.execute(stmt)
result.rowcount

1
            

Delete


result = conn.execute(actors.delete().where(actors.c.name == 'Terry'))
result.rowcount

1
            

Selecting


s = select([actors.c.name, actors.c.fullname])
result = conn.execute(s)
for row in result:
    print row

(u'Graham', u'Graham Chapman')
(u'John', u'John Cleese')
(u'Terry', u'Terry Gilliam')
            

Ordering


stmt = select([actors.c.name]).order_by(actors.c.name.desc())
conn.execute(stmt).fetchall()

[(u'Terry',), (u'John',), (u'Graham',)]
            

Limiting


stmt = select([actors.c.name, actors.c.fullname]).limit(1).offset(1)
conn.execute(stmt).first()

(u'John', u'John Cleese')
            

Count


from sqlalchemy.sql import func
stmt = select([func.count(actors)])
conn.execute(stmt).scalar()

2
            

Sum


stmt = select([func.count(actors), func.sum(actors.c.body_count)])
conn.execute(stmt).first()

(2, 5)
            

Joins


s = select([actors, roles]).where(actors.c.id == roles.c.actor_id)
for row in conn.execute(s):
    print row

(1, u'Graham', u'Graham Chapman', 1, 1, u'King Arthur')
(1, u'Graham', u'Graham Chapman', 2, 1, u'Voice of God')
(2, u'John', u'John Cleese', 3, 2, u'Sir Lancelot')
(2, u'John', u'John Cleese', 4, 2, u'Black Knight')
(3, u'Terry', u'Terry Gilliam', 5, 3, u'Patsy')
(3, u'Terry', u'Terry Gilliam', 6, 3, u'Sir Bors')
            

Grouping


stmt = select([actors.c.name, func.count(roles.c.id)]).\
    select_from(actors.join(roles)).\
    group_by(actors.c.name)
conn.execute(stmt).fetchall()

[(u'Graham', 2), (u'John', 2), (u'Terry', 2)]
            

Filtering


from sqlalchemy.sql import and_, or_, not_
stmt = select([actors.c.name, roles.c.character_name]).\
    where(
        and_(
          actors.c.name.like('Gra%'),
          roles.c.character_name.like('Vo%'),
          actors.c.id == roles.c.actor_id
        )
    )
conn.execute(stmt).fetchall()

[(u'Graham', u'Voice of God')]
            

And So on...

Common Dialects

  • Informix
  • MS SQL
  • Oracle
  • Postgres
  • SQLite
  • Custom

But what if...


class UnloadFromSelect(Executable, ClauseElement):

    def __init__(self, select, bucket, access_key, secret_key):
        self.select = select
        self.bucket = bucket
        self.access_key = access_key
        self.secret_key = secret_key

@compiles(UnloadFromSelect)
def visit_unload_from_select(element, compiler, **kw):
    return "unload ('%(query)s') to '%(bucket)s'
        credentials 'aws_access_key_id=%(access_key)s;
        aws_secret_access_key=%(secret_key)s' delimiter ','
        addquotes allowoverwrite" % {
        'query': compiler.process(element.select,
              unload_select=True, literal_binds=True),
        'bucket': element.bucket,
        'access_key': element.access_key,
        'secret_key': element.secret_key,
    }
        

Example Statement


unload = UnloadFromSelect(
    select([fields]),
    '/'.join(['s3:/', BUCKET, filename]),
    ACCESS_KEY,
    SECRET_KEY
)
            

Example Usage


unload (
  'select * from venue where venueid in (
        select venueid from venue order by venueid desc limit 10)'
)
to 's3://mybucket/venue_pipe_'
credentials 'aws_access_key_id=ACCESS_KEY;
  aws_secret_access_key=SECRET_KEY';
          

Dynamic Table Introspection


def build_table(engine, table_name):
    return Table(table_name, metadata, autoload=True, autoload_with=engine)
          

Chaining


s = select(
        [
            t.c.race,
            t.c.factor,
            func.sum(g.t.c.value).label('summed')
        ], t.c.race > 0
    ).where(
        and_(
            t.c.type == 'POVERTY',
            t.c.value != 0
        )
    ).group_by(
        t.c.race,
        t.c.factor
    ).order_by(
        t.c.race,
        t.c.factor)
            

Conditionals


s = select(
    [
        table.c.discharge_year,
        func.count(1).label(
            'patient_discharges'),
        table.c.zip_code,
    ], table.c.discharge_year.in_(years)
).group_by(table.c.discharge_year)
s = s.where(table.c.hospital_name == provider)

if 'total_charges' not in unavailable_fields:
    s = s.column(
            func.sum(table.c.total_charges
            ).label('patient_charges')
    )
            

s = s.group_by(table.c.zip_code)
s = s.order_by('discharges DESC')

cases = conn.execute(s).fetchall()
            

questions

Thank you

Jason Myers / @jasonamyers

Slides and IPython Notebook on http://bit.ly/pycon2014slides