Kaydet (Commit) 1b383570 authored tarafından Tim Peters's avatar Tim Peters

Text files missing the SVN eol-style property.

üst cbd7b756
import sqlite3 import sqlite3
import datetime, time import datetime, time
def adapt_datetime(ts): def adapt_datetime(ts):
return time.mktime(ts.timetuple()) return time.mktime(ts.timetuple())
sqlite3.register_adapter(datetime.datetime, adapt_datetime) sqlite3.register_adapter(datetime.datetime, adapt_datetime)
con = sqlite3.connect(":memory:") con = sqlite3.connect(":memory:")
cur = con.cursor() cur = con.cursor()
now = datetime.datetime.now() now = datetime.datetime.now()
cur.execute("select ?", (now,)) cur.execute("select ?", (now,))
print cur.fetchone()[0] print cur.fetchone()[0]
import sqlite3 import sqlite3
class Point(object): class Point(object):
def __init__(self, x, y): def __init__(self, x, y):
self.x, self.y = x, y self.x, self.y = x, y
def __conform__(self, protocol): def __conform__(self, protocol):
if protocol is sqlite3.PrepareProtocol: if protocol is sqlite3.PrepareProtocol:
return "%f;%f" % (self.x, self.y) return "%f;%f" % (self.x, self.y)
con = sqlite3.connect(":memory:") con = sqlite3.connect(":memory:")
cur = con.cursor() cur = con.cursor()
p = Point(4.0, -3.2) p = Point(4.0, -3.2)
cur.execute("select ?", (p,)) cur.execute("select ?", (p,))
print cur.fetchone()[0] print cur.fetchone()[0]
import sqlite3 import sqlite3
class Point(object): class Point(object):
def __init__(self, x, y): def __init__(self, x, y):
self.x, self.y = x, y self.x, self.y = x, y
def adapt_point(point): def adapt_point(point):
return "%f;%f" % (point.x, point.y) return "%f;%f" % (point.x, point.y)
sqlite3.register_adapter(Point, adapt_point) sqlite3.register_adapter(Point, adapt_point)
con = sqlite3.connect(":memory:") con = sqlite3.connect(":memory:")
cur = con.cursor() cur = con.cursor()
p = Point(4.0, -3.2) p = Point(4.0, -3.2)
cur.execute("select ?", (p,)) cur.execute("select ?", (p,))
print cur.fetchone()[0] print cur.fetchone()[0]
import sqlite3 import sqlite3
def collate_reverse(string1, string2): def collate_reverse(string1, string2):
return -cmp(string1, string2) return -cmp(string1, string2)
con = sqlite3.connect(":memory:") con = sqlite3.connect(":memory:")
con.create_collation("reverse", collate_reverse) con.create_collation("reverse", collate_reverse)
cur = con.cursor() cur = con.cursor()
cur.execute("create table test(x)") cur.execute("create table test(x)")
cur.executemany("insert into test(x) values (?)", [("a",), ("b",)]) cur.executemany("insert into test(x) values (?)", [("a",), ("b",)])
cur.execute("select x from test order by x collate reverse") cur.execute("select x from test order by x collate reverse")
for row in cur: for row in cur:
print row print row
con.close() con.close()
import sqlite3 import sqlite3
con = sqlite3.connect("mydb") con = sqlite3.connect("mydb")
import sqlite3 import sqlite3
con = sqlite3.connect(":memory:") con = sqlite3.connect(":memory:")
import sqlite3 import sqlite3
class Point(object): class Point(object):
def __init__(self, x, y): def __init__(self, x, y):
self.x, self.y = x, y self.x, self.y = x, y
def __repr__(self): def __repr__(self):
return "(%f;%f)" % (self.x, self.y) return "(%f;%f)" % (self.x, self.y)
def adapt_point(point): def adapt_point(point):
return "%f;%f" % (point.x, point.y) return "%f;%f" % (point.x, point.y)
def convert_point(s): def convert_point(s):
x, y = map(float, s.split(";")) x, y = map(float, s.split(";"))
return Point(x, y) return Point(x, y)
# Register the adapter # Register the adapter
sqlite3.register_adapter(Point, adapt_point) sqlite3.register_adapter(Point, adapt_point)
# Register the converter # Register the converter
sqlite3.register_converter("point", convert_point) sqlite3.register_converter("point", convert_point)
p = Point(4.0, -3.2) p = Point(4.0, -3.2)
######################### #########################
# 1) Using declared types # 1) Using declared types
con = sqlite3.connect(":memory:", detect_types=sqlite3.PARSE_DECLTYPES) con = sqlite3.connect(":memory:", detect_types=sqlite3.PARSE_DECLTYPES)
cur = con.cursor() cur = con.cursor()
cur.execute("create table test(p point)") cur.execute("create table test(p point)")
cur.execute("insert into test(p) values (?)", (p,)) cur.execute("insert into test(p) values (?)", (p,))
cur.execute("select p from test") cur.execute("select p from test")
print "with declared types:", cur.fetchone()[0] print "with declared types:", cur.fetchone()[0]
cur.close() cur.close()
con.close() con.close()
####################### #######################
# 1) Using column names # 1) Using column names
con = sqlite3.connect(":memory:", detect_types=sqlite3.PARSE_COLNAMES) con = sqlite3.connect(":memory:", detect_types=sqlite3.PARSE_COLNAMES)
cur = con.cursor() cur = con.cursor()
cur.execute("create table test(p)") cur.execute("create table test(p)")
cur.execute("insert into test(p) values (?)", (p,)) cur.execute("insert into test(p) values (?)", (p,))
cur.execute('select p as "p [point]" from test') cur.execute('select p as "p [point]" from test')
print "with column names:", cur.fetchone()[0] print "with column names:", cur.fetchone()[0]
cur.close() cur.close()
con.close() con.close()
import sqlite3 import sqlite3
class CountCursorsConnection(sqlite3.Connection): class CountCursorsConnection(sqlite3.Connection):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
sqlite3.Connection.__init__(self, *args, **kwargs) sqlite3.Connection.__init__(self, *args, **kwargs)
self.numcursors = 0 self.numcursors = 0
def cursor(self, *args, **kwargs): def cursor(self, *args, **kwargs):
self.numcursors += 1 self.numcursors += 1
return sqlite3.Connection.cursor(self, *args, **kwargs) return sqlite3.Connection.cursor(self, *args, **kwargs)
con = sqlite3.connect(":memory:", factory=CountCursorsConnection) con = sqlite3.connect(":memory:", factory=CountCursorsConnection)
cur1 = con.cursor() cur1 = con.cursor()
cur2 = con.cursor() cur2 = con.cursor()
print con.numcursors print con.numcursors
# Not referenced from the documentation, but builds the database file the other # Not referenced from the documentation, but builds the database file the other
# code snippets expect. # code snippets expect.
import sqlite3 import sqlite3
import os import os
DB_FILE = "mydb" DB_FILE = "mydb"
if os.path.exists(DB_FILE): if os.path.exists(DB_FILE):
os.remove(DB_FILE) os.remove(DB_FILE)
con = sqlite3.connect(DB_FILE) con = sqlite3.connect(DB_FILE)
cur = con.cursor() cur = con.cursor()
cur.execute(""" cur.execute("""
create table people create table people
( (
name_last varchar(20), name_last varchar(20),
age integer age integer
) )
""") """)
cur.execute("insert into people (name_last, age) values ('Yeltsin', 72)") cur.execute("insert into people (name_last, age) values ('Yeltsin', 72)")
cur.execute("insert into people (name_last, age) values ('Putin', 51)") cur.execute("insert into people (name_last, age) values ('Putin', 51)")
con.commit() con.commit()
cur.close() cur.close()
con.close() con.close()
import sqlite3 import sqlite3
con = sqlite3.connect("mydb") con = sqlite3.connect("mydb")
cur = con.cursor() cur = con.cursor()
SELECT = "select name_last, age from people order by age, name_last" SELECT = "select name_last, age from people order by age, name_last"
# 1. Iterate over the rows available from the cursor, unpacking the # 1. Iterate over the rows available from the cursor, unpacking the
# resulting sequences to yield their elements (name_last, age): # resulting sequences to yield their elements (name_last, age):
cur.execute(SELECT) cur.execute(SELECT)
for (name_last, age) in cur: for (name_last, age) in cur:
print '%s is %d years old.' % (name_last, age) print '%s is %d years old.' % (name_last, age)
# 2. Equivalently: # 2. Equivalently:
cur.execute(SELECT) cur.execute(SELECT)
for row in cur: for row in cur:
print '%s is %d years old.' % (row[0], row[1]) print '%s is %d years old.' % (row[0], row[1])
import sqlite3 import sqlite3
# Create a connection to the database file "mydb": # Create a connection to the database file "mydb":
con = sqlite3.connect("mydb") con = sqlite3.connect("mydb")
# Get a Cursor object that operates in the context of Connection con: # Get a Cursor object that operates in the context of Connection con:
cur = con.cursor() cur = con.cursor()
# Execute the SELECT statement: # Execute the SELECT statement:
cur.execute("select * from people order by age") cur.execute("select * from people order by age")
# Retrieve all rows as a sequence and print that sequence: # Retrieve all rows as a sequence and print that sequence:
print cur.fetchall() print cur.fetchall()
import sqlite3 import sqlite3
con = sqlite3.connect("mydb") con = sqlite3.connect("mydb")
cur = con.cursor() cur = con.cursor()
who = "Yeltsin" who = "Yeltsin"
age = 72 age = 72
cur.execute("select name_last, age from people where name_last=? and age=?", (who, age)) cur.execute("select name_last, age from people where name_last=? and age=?", (who, age))
print cur.fetchone() print cur.fetchone()
import sqlite3 import sqlite3
con = sqlite3.connect("mydb") con = sqlite3.connect("mydb")
cur = con.cursor() cur = con.cursor()
who = "Yeltsin" who = "Yeltsin"
age = 72 age = 72
cur.execute("select name_last, age from people where name_last=:who and age=:age", cur.execute("select name_last, age from people where name_last=:who and age=:age",
{"who": who, "age": age}) {"who": who, "age": age})
print cur.fetchone() print cur.fetchone()
import sqlite3 import sqlite3
con = sqlite3.connect("mydb") con = sqlite3.connect("mydb")
cur = con.cursor() cur = con.cursor()
who = "Yeltsin" who = "Yeltsin"
age = 72 age = 72
cur.execute("select name_last, age from people where name_last=:who and age=:age", cur.execute("select name_last, age from people where name_last=:who and age=:age",
locals()) locals())
print cur.fetchone() print cur.fetchone()
import sqlite3 import sqlite3
class IterChars: class IterChars:
def __init__(self): def __init__(self):
self.count = ord('a') self.count = ord('a')
def __iter__(self): def __iter__(self):
return self return self
def next(self): def next(self):
if self.count > ord('z'): if self.count > ord('z'):
raise StopIteration raise StopIteration
self.count += 1 self.count += 1
return (chr(self.count - 1),) # this is a 1-tuple return (chr(self.count - 1),) # this is a 1-tuple
con = sqlite3.connect(":memory:") con = sqlite3.connect(":memory:")
cur = con.cursor() cur = con.cursor()
cur.execute("create table characters(c)") cur.execute("create table characters(c)")
theIter = IterChars() theIter = IterChars()
cur.executemany("insert into characters(c) values (?)", theIter) cur.executemany("insert into characters(c) values (?)", theIter)
cur.execute("select c from characters") cur.execute("select c from characters")
print cur.fetchall() print cur.fetchall()
import sqlite3 import sqlite3
def char_generator(): def char_generator():
import string import string
for c in string.letters[:26]: for c in string.letters[:26]:
yield (c,) yield (c,)
con = sqlite3.connect(":memory:") con = sqlite3.connect(":memory:")
cur = con.cursor() cur = con.cursor()
cur.execute("create table characters(c)") cur.execute("create table characters(c)")
cur.executemany("insert into characters(c) values (?)", char_generator()) cur.executemany("insert into characters(c) values (?)", char_generator())
cur.execute("select c from characters") cur.execute("select c from characters")
print cur.fetchall() print cur.fetchall()
import sqlite3 import sqlite3
con = sqlite3.connect(":memory:") con = sqlite3.connect(":memory:")
cur = con.cursor() cur = con.cursor()
cur.executescript(""" cur.executescript("""
create table person( create table person(
firstname, firstname,
lastname, lastname,
age age
); );
create table book( create table book(
title, title,
author, author,
published published
); );
insert into book(title, author, published) insert into book(title, author, published)
values ( values (
'Dirk Gently''s Holistic Detective Agency 'Dirk Gently''s Holistic Detective Agency
'Douglas Adams', 'Douglas Adams',
1987 1987
); );
""") """)
import sqlite3 import sqlite3
con = sqlite3.connect("mydb") con = sqlite3.connect("mydb")
cur = con.cursor() cur = con.cursor()
newPeople = ( newPeople = (
('Lebed' , 53), ('Lebed' , 53),
('Zhirinovsky' , 57), ('Zhirinovsky' , 57),
) )
for person in newPeople: for person in newPeople:
cur.execute("insert into people (name_last, age) values (?, ?)", person) cur.execute("insert into people (name_last, age) values (?, ?)", person)
# The changes will not be saved unless the transaction is committed explicitly: # The changes will not be saved unless the transaction is committed explicitly:
con.commit() con.commit()
import sqlite3 import sqlite3
import md5 import md5
def md5sum(t): def md5sum(t):
return md5.md5(t).hexdigest() return md5.md5(t).hexdigest()
con = sqlite3.connect(":memory:") con = sqlite3.connect(":memory:")
con.create_function("md5", 1, md5sum) con.create_function("md5", 1, md5sum)
cur = con.cursor() cur = con.cursor()
cur.execute("select md5(?)", ("foo",)) cur.execute("select md5(?)", ("foo",))
print cur.fetchone()[0] print cur.fetchone()[0]
import sqlite3 import sqlite3
class MySum: class MySum:
def __init__(self): def __init__(self):
self.count = 0 self.count = 0
def step(self, value): def step(self, value):
self.count += value self.count += value
def finalize(self): def finalize(self):
return self.count return self.count
con = sqlite3.connect(":memory:") con = sqlite3.connect(":memory:")
con.create_aggregate("mysum", 1, MySum) con.create_aggregate("mysum", 1, MySum)
cur = con.cursor() cur = con.cursor()
cur.execute("create table test(i)") cur.execute("create table test(i)")
cur.execute("insert into test(i) values (1)") cur.execute("insert into test(i) values (1)")
cur.execute("insert into test(i) values (2)") cur.execute("insert into test(i) values (2)")
cur.execute("select mysum(i) from test") cur.execute("select mysum(i) from test")
print cur.fetchone()[0] print cur.fetchone()[0]
import sqlite3 import sqlite3
import datetime import datetime
con = sqlite3.connect(":memory:", detect_types=sqlite3.PARSE_COLNAMES) con = sqlite3.connect(":memory:", detect_types=sqlite3.PARSE_COLNAMES)
cur = con.cursor() cur = con.cursor()
cur.execute('select ? as "x [timestamp]"', (datetime.datetime.now(),)) cur.execute('select ? as "x [timestamp]"', (datetime.datetime.now(),))
dt = cur.fetchone()[0] dt = cur.fetchone()[0]
print dt, type(dt) print dt, type(dt)
import sqlite3 import sqlite3
import datetime import datetime
con = sqlite3.connect(":memory:", detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES) con = sqlite3.connect(":memory:", detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES)
cur = con.cursor() cur = con.cursor()
cur.execute("create table test(d date, ts timestamp)") cur.execute("create table test(d date, ts timestamp)")
today = datetime.date.today() today = datetime.date.today()
now = datetime.datetime.now() now = datetime.datetime.now()
cur.execute("insert into test(d, ts) values (?, ?)", (today, now)) cur.execute("insert into test(d, ts) values (?, ?)", (today, now))
cur.execute("select d, ts from test") cur.execute("select d, ts from test")
row = cur.fetchone() row = cur.fetchone()
print today, "=>", row[0], type(row[0]) print today, "=>", row[0], type(row[0])
print now, "=>", row[1], type(row[1]) print now, "=>", row[1], type(row[1])
cur.execute('select current_date as "d [date]", current_timestamp as "ts [timestamp]"') cur.execute('select current_date as "d [date]", current_timestamp as "ts [timestamp]"')
row = cur.fetchone() row = cur.fetchone()
print "current_date", row[0], type(row[0]) print "current_date", row[0], type(row[0])
print "current_timestamp", row[1], type(row[1]) print "current_timestamp", row[1], type(row[1])
import sqlite3 import sqlite3
def dict_factory(cursor, row): def dict_factory(cursor, row):
d = {} d = {}
for idx, col in enumerate(cursor.description): for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx] d[col[0]] = row[idx]
return d return d
con = sqlite3.connect(":memory:") con = sqlite3.connect(":memory:")
con.row_factory = dict_factory con.row_factory = dict_factory
cur = con.cursor() cur = con.cursor()
cur.execute("select 1 as a") cur.execute("select 1 as a")
print cur.fetchone()["a"] print cur.fetchone()["a"]
import sqlite3 import sqlite3
persons = [ persons = [
("Hugo", "Boss"), ("Hugo", "Boss"),
("Calvin", "Klein") ("Calvin", "Klein")
] ]
con = sqlite3.connect(":memory:") con = sqlite3.connect(":memory:")
# Create the table # Create the table
con.execute("create table person(firstname, lastname)") con.execute("create table person(firstname, lastname)")
# Fill the table # Fill the table
con.executemany("insert into person(firstname, lastname) values (?, ?)", persons) con.executemany("insert into person(firstname, lastname) values (?, ?)", persons)
# Print the table contents # Print the table contents
for row in con.execute("select firstname, lastname from person"): for row in con.execute("select firstname, lastname from person"):
print row print row
# Using a dummy WHERE clause to not let SQLite take the shortcut table deletes. # Using a dummy WHERE clause to not let SQLite take the shortcut table deletes.
print "I just deleted", con.execute("delete from person where 1=1").rowcount, "rows" print "I just deleted", con.execute("delete from person where 1=1").rowcount, "rows"
import sqlite3 import sqlite3
FIELD_MAX_WIDTH = 20 FIELD_MAX_WIDTH = 20
TABLE_NAME = 'people' TABLE_NAME = 'people'
SELECT = 'select * from %s order by age, name_last' % TABLE_NAME SELECT = 'select * from %s order by age, name_last' % TABLE_NAME
con = sqlite3.connect("mydb") con = sqlite3.connect("mydb")
cur = con.cursor() cur = con.cursor()
cur.execute(SELECT) cur.execute(SELECT)
# Print a header. # Print a header.
for fieldDesc in cur.description: for fieldDesc in cur.description:
print fieldDesc[0].ljust(FIELD_MAX_WIDTH) , print fieldDesc[0].ljust(FIELD_MAX_WIDTH) ,
print # Finish the header with a newline. print # Finish the header with a newline.
print '-' * 78 print '-' * 78
# For each row, print the value of each field left-justified within # For each row, print the value of each field left-justified within
# the maximum possible width of that field. # the maximum possible width of that field.
fieldIndices = range(len(cur.description)) fieldIndices = range(len(cur.description))
for row in cur: for row in cur:
for fieldIndex in fieldIndices: for fieldIndex in fieldIndices:
fieldValue = str(row[fieldIndex]) fieldValue = str(row[fieldIndex])
print fieldValue.ljust(FIELD_MAX_WIDTH) , print fieldValue.ljust(FIELD_MAX_WIDTH) ,
print # Finish the row with a newline. print # Finish the row with a newline.
import sqlite3 import sqlite3
con = sqlite3.connect(":memory:") con = sqlite3.connect(":memory:")
cur = con.cursor() cur = con.cursor()
# Create the table # Create the table
con.execute("create table person(lastname, firstname)") con.execute("create table person(lastname, firstname)")
AUSTRIA = u"\xd6sterreich" AUSTRIA = u"\xd6sterreich"
# by default, rows are returned as Unicode # by default, rows are returned as Unicode
cur.execute("select ?", (AUSTRIA,)) cur.execute("select ?", (AUSTRIA,))
row = cur.fetchone() row = cur.fetchone()
assert row[0] == AUSTRIA assert row[0] == AUSTRIA
# but we can make pysqlite always return bytestrings ... # but we can make pysqlite always return bytestrings ...
con.text_factory = str con.text_factory = str
cur.execute("select ?", (AUSTRIA,)) cur.execute("select ?", (AUSTRIA,))
row = cur.fetchone() row = cur.fetchone()
assert type(row[0]) == str assert type(row[0]) == str
# the bytestrings will be encoded in UTF-8, unless you stored garbage in the # the bytestrings will be encoded in UTF-8, unless you stored garbage in the
# database ... # database ...
assert row[0] == AUSTRIA.encode("utf-8") assert row[0] == AUSTRIA.encode("utf-8")
# we can also implement a custom text_factory ... # we can also implement a custom text_factory ...
# here we implement one that will ignore Unicode characters that cannot be # here we implement one that will ignore Unicode characters that cannot be
# decoded from UTF-8 # decoded from UTF-8
con.text_factory = lambda x: unicode(x, "utf-8", "ignore") con.text_factory = lambda x: unicode(x, "utf-8", "ignore")
cur.execute("select ?", ("this is latin1 and would normally create errors" + u"\xe4\xf6\xfc".encode("latin1"),)) cur.execute("select ?", ("this is latin1 and would normally create errors" + u"\xe4\xf6\xfc".encode("latin1"),))
row = cur.fetchone() row = cur.fetchone()
assert type(row[0]) == unicode assert type(row[0]) == unicode
# pysqlite offers a builtin optimized text_factory that will return bytestring # pysqlite offers a builtin optimized text_factory that will return bytestring
# objects, if the data is in ASCII only, and otherwise return unicode objects # objects, if the data is in ASCII only, and otherwise return unicode objects
con.text_factory = sqlite3.OptimizedUnicode con.text_factory = sqlite3.OptimizedUnicode
cur.execute("select ?", (AUSTRIA,)) cur.execute("select ?", (AUSTRIA,))
row = cur.fetchone() row = cur.fetchone()
assert type(row[0]) == unicode assert type(row[0]) == unicode
cur.execute("select ?", ("Germany",)) cur.execute("select ?", ("Germany",))
row = cur.fetchone() row = cur.fetchone()
assert type(row[0]) == str assert type(row[0]) == str
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment