Package grassyknoll :: Package backend :: Package litesql :: Module TableMaker
[hide private]

Source Code for Module grassyknoll.backend.litesql.TableMaker

  1  """helper for creating tables""" 
  2   
  3  import datetime 
  4  import operator 
  5  import sqlite3 
  6   
  7   
8 -def __register_adapters_and_converters():
9 """register some better adapters & converters. 10 11 Ripped from pysqlite2.dbapi2. see its 12 U{license<http://www.initd.org/tracker/pysqlite/wiki/About >} 13 14 These tests aren't very useful for humans. 15 XXX fix IGNORE - I couldn't get # doctest:+ELLIPSIS working 16 17 >>> conn=sqlite3.connect(":memory:", detect_types=sqlite3.PARSE_DECLTYPES) 18 >>> IGNORE=conn.execute("create table test (test_date date, test_time time, test_datetime datetime)") 19 >>> d=datetime.date(2007, 4, 4) 20 >>> t=datetime.time(11, 8, 42) 21 >>> t2=datetime.time(11, 8, 42, 1234) 22 >>> dt=datetime.datetime(2007, 4, 4, 11, 8, 42, 1234) 23 >>> IGNORE=conn.execute("insert into test values (?, ?, ?)", (d, t, dt)) 24 >>> d_, t_, dt_=conn.execute("select * from test").fetchone() 25 >>> d_ 26 datetime.date(2007, 4, 4) 27 >>> t_ 28 datetime.time(11, 8, 42) 29 >>> dt_ 30 datetime.datetime(2007, 4, 4, 11, 8, 42, 1234) 31 >>> IGNORE=conn.execute("delete from test") 32 >>> IGNORE=conn.execute("insert into test values (?, ?, ?)", (d, t2, dt)) 33 >>> d_, t2_, dt_=conn.execute("select * from test").fetchone() 34 >>> d_ 35 datetime.date(2007, 4, 4) 36 >>> t2_ 37 datetime.time(11, 8, 42, 1234) 38 >>> dt_ 39 datetime.datetime(2007, 4, 4, 11, 8, 42, 1234) 40 """ 41 def adapt_date(val): 42 return val.isoformat()
43 44 def adapt_time(val): 45 return val.isoformat() 46 47 def adapt_datetime(val): 48 # ISO format 49 return val.isoformat(" ") 50 51 def date_args(val): 52 return tuple(int(x) for x in val.split("-")) 53 54 def time_args(val): 55 full = val.split(".") 56 hours, minutes, seconds = [int(x) for x in full[0].split(":")] 57 if len(full) == 2: 58 microseconds = int(float("0.%s"%full[1]) * 1000000) 59 else: 60 microseconds = 0 61 return hours, minutes, seconds, microseconds 62 63 def convert_time(val): 64 return datetime.time(*time_args(val)) 65 66 def convert_date(val): 67 return datetime.date(*date_args(val)) 68 69 def convert_datetime(val): 70 datepart, timepart = val.split(" ") 71 return datetime.datetime(*(date_args(datepart)+time_args(timepart))) 72 73 sqlite3.register_adapter(datetime.date, adapt_date) 74 sqlite3.register_adapter(datetime.time, adapt_time) 75 sqlite3.register_adapter(datetime.datetime, adapt_datetime) 76 sqlite3.register_converter("date", convert_date) 77 sqlite3.register_converter("time", convert_time) 78 sqlite3.register_converter("datetime", convert_datetime) 79 80 __register_adapters_and_converters() 81 82 # a map of python type to column type 83 python2column={int: 'integer', 84 long: 'integer', 85 float: 'real', 86 str: 'text', 87 unicode: 'text', 88 buffer: 'blob', 89 datetime.date: 'date', 90 datetime.time: 'time', 91 datetime.datetime: 'datetime'} 92
93 -class Column(object):
94 """description of a column 95 96 XXX give me a better docstring! 97 """ 98 __slots__=['name', 'type', 'indexed', 'optional'] 99
100 - def __init__(self, type, indexed=False, optional=False):
101 if type not in python2column: 102 raise ValueError, type 103 104 self.type=type 105 self.indexed=indexed 106 self.optional=optional
107
108 - def columnSQL(self):
109 col="%s %s"%(self.name, python2column[self.type]) 110 if not self.optional: 111 col="%s NOT NULL"%col 112 return col
113
114 - def indexSQL(self, table):
115 if self.indexed: 116 return "CREATE INDEX idx_%s_%s ON %s (%s)"%(table.name, self.name, 117 table.name, self.name) 118 else: 119 return None
120
121 -class Table(object):
122 """description of a table 123 124 >>> table=Table('pants') 125 >>> table.size=Column(int, indexed=True) 126 >>> table.color=Column(unicode, indexed=True) 127 >>> table.bought_on=Column(datetime.date, optional=True) 128 >>> table.show() 129 CREATE TABLE pants ( 130 __id__ text PRIMARY KEY NOT NULL, 131 bought_on date, 132 color text NOT NULL, 133 size integer NOT NULL) 134 CREATE INDEX idx_pants_color ON pants (color) 135 CREATE INDEX idx_pants_size ON pants (size) 136 >>> table.addMultiIndex('size', 'bought_on') 137 >>> table.addMultiIndex('color', 'size') 138 >>> table.addMultiIndex('size', 'color') 139 >>> table.show() 140 CREATE TABLE pants ( 141 __id__ text PRIMARY KEY NOT NULL, 142 bought_on date, 143 color text NOT NULL, 144 size integer NOT NULL) 145 CREATE INDEX idx_pants_color ON pants (color) 146 CREATE INDEX idx_pants_size ON pants (size) 147 CREATE INDEX idx_pants_size_bought_on ON pants (size, bought_on) 148 CREATE INDEX idx_pants_color_size ON pants (color, size) 149 CREATE INDEX idx_pants_size_color ON pants (size, color) 150 """
151 - def __init__(self, name):
152 self.name=name 153 self.multi_indexes=[]
154
155 - def __setattr__(self, name, value):
156 if isinstance(value, Column): 157 value.name=name 158 159 super(Table, self).__setattr__(name, value)
160 161 @property
162 - def columns(self):
163 return [x for x in self.__dict__.itervalues() if isinstance(x, Column)]
164
165 - def addMultiIndex(self, *colnames):
166 for name in colnames: 167 col=getattr(self, name, None) 168 if not isinstance(col, Column): 169 raise ValueError, "no Column named %s"%name 170 171 colnames=tuple(colnames) 172 if len(colnames) < 2: 173 raise ValueError, "must supply at least 2 columns %r"%colnames 174 if colnames in self.multi_indexes: 175 raise ValueError, "already have multi-index for %r"%colnames 176 else: 177 self.multi_indexes.append(colnames)
178
179 - def columnSQL(self):
180 # sorting isn't strictly necessary here, since we'll always refer to 181 # columns by name, but it's nice 182 return ",\n".join(col.columnSQL() for col in 183 sorted(self.columns, 184 key=operator.attrgetter('name')))
185
186 - def tableSQL(self):
187 return "CREATE TABLE %s (\n__id__ text "\ 188 "PRIMARY KEY NOT NULL,\n%s)"%(self.name, self.columnSQL())
189
190 - def multiIndexSql(self, colnames):
191 assert len(colnames)>=2 192 return "CREATE INDEX idx_%s_%s ON %s (%s)"%(self.name, "_".join(colnames), 193 self.name, ", ".join(colnames))
194
195 - def build(self, func):
196 func(self.tableSQL()) 197 for col in self.columns: 198 idx=col.indexSQL(self) 199 if idx is not None: 200 func(idx) 201 202 for multi in self.multi_indexes: 203 func(self.multiIndexSql(multi))
204
205 - def create(self, connection):
206 self.build(connection.execute)
207
208 - def show(self):
209 # need to use instead of return b/c build doesn't return anything 210 def print_(x): print x 211 return self.build(print_)
212 213 if __name__=='__main__': 214 import doctest 215 doctest.testmod() 216