Imported Upstream version 2.6.3
[joel/debian/python-pysqlite2.git] / lib / test / dbapi.py
1 #-*- coding: ISO-8859-1 -*-
2 # pysqlite2/test/dbapi.py: tests for DB-API compliance
3 #
4 # Copyright (C) 2004-2009 Gerhard Häring <gh@ghaering.de>
5 #
6 # This file is part of pysqlite.
7 #
8 # This software is provided 'as-is', without any express or implied
9 # warranty.  In no event will the authors be held liable for any damages
10 # arising from the use of this software.
11 #
12 # Permission is granted to anyone to use this software for any purpose,
13 # including commercial applications, and to alter it and redistribute it
14 # freely, subject to the following restrictions:
15 #
16 # 1. The origin of this software must not be misrepresented; you must not
17 #    claim that you wrote the original software. If you use this software
18 #    in a product, an acknowledgment in the product documentation would be
19 #    appreciated but is not required.
20 # 2. Altered source versions must be plainly marked as such, and must not be
21 #    misrepresented as being the original software.
22 # 3. This notice may not be removed or altered from any source distribution.
23
24 import unittest
25 import sys
26 import threading
27 import pysqlite2.dbapi2 as sqlite
28
29 class ModuleTests(unittest.TestCase):
30     def CheckAPILevel(self):
31         self.assertEqual(sqlite.apilevel, "2.0",
32                          "apilevel is %s, should be 2.0" % sqlite.apilevel)
33
34     def CheckThreadSafety(self):
35         self.assertEqual(sqlite.threadsafety, 1,
36                          "threadsafety is %d, should be 1" % sqlite.threadsafety)
37
38     def CheckParamStyle(self):
39         self.assertEqual(sqlite.paramstyle, "qmark",
40                          "paramstyle is '%s', should be 'qmark'" %
41                          sqlite.paramstyle)
42
43     def CheckWarning(self):
44         self.assert_(issubclass(sqlite.Warning, StandardError),
45                      "Warning is not a subclass of StandardError")
46
47     def CheckError(self):
48         self.assertTrue(issubclass(sqlite.Error, StandardError),
49                         "Error is not a subclass of StandardError")
50
51     def CheckInterfaceError(self):
52         self.assertTrue(issubclass(sqlite.InterfaceError, sqlite.Error),
53                         "InterfaceError is not a subclass of Error")
54
55     def CheckDatabaseError(self):
56         self.assertTrue(issubclass(sqlite.DatabaseError, sqlite.Error),
57                         "DatabaseError is not a subclass of Error")
58
59     def CheckDataError(self):
60         self.assertTrue(issubclass(sqlite.DataError, sqlite.DatabaseError),
61                         "DataError is not a subclass of DatabaseError")
62
63     def CheckOperationalError(self):
64         self.assertTrue(issubclass(sqlite.OperationalError, sqlite.DatabaseError),
65                         "OperationalError is not a subclass of DatabaseError")
66
67     def CheckIntegrityError(self):
68         self.assertTrue(issubclass(sqlite.IntegrityError, sqlite.DatabaseError),
69                         "IntegrityError is not a subclass of DatabaseError")
70
71     def CheckInternalError(self):
72         self.assertTrue(issubclass(sqlite.InternalError, sqlite.DatabaseError),
73                         "InternalError is not a subclass of DatabaseError")
74
75     def CheckProgrammingError(self):
76         self.assertTrue(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError),
77                         "ProgrammingError is not a subclass of DatabaseError")
78
79     def CheckNotSupportedError(self):
80         self.assertTrue(issubclass(sqlite.NotSupportedError,
81                                    sqlite.DatabaseError),
82                         "NotSupportedError is not a subclass of DatabaseError")
83
84 class ConnectionTests(unittest.TestCase):
85     def setUp(self):
86         self.cx = sqlite.connect(":memory:")
87         cu = self.cx.cursor()
88         cu.execute("create table test(id integer primary key, name text)")
89         cu.execute("insert into test(name) values (?)", ("foo",))
90
91     def tearDown(self):
92         self.cx.close()
93
94     def CheckCommit(self):
95         self.cx.commit()
96
97     def CheckCommitAfterNoChanges(self):
98         """
99         A commit should also work when no changes were made to the database.
100         """
101         self.cx.commit()
102         self.cx.commit()
103
104     def CheckRollback(self):
105         self.cx.rollback()
106
107     def CheckRollbackAfterNoChanges(self):
108         """
109         A rollback should also work when no changes were made to the database.
110         """
111         self.cx.rollback()
112         self.cx.rollback()
113
114     def CheckCursor(self):
115         cu = self.cx.cursor()
116
117     def CheckFailedOpen(self):
118         YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db"
119         try:
120             con = sqlite.connect(YOU_CANNOT_OPEN_THIS)
121         except sqlite.OperationalError:
122             return
123         self.fail("should have raised an OperationalError")
124
125     def CheckClose(self):
126         self.cx.close()
127
128     def CheckExceptions(self):
129         # Optional DB-API extension.
130         self.assertEqual(self.cx.Warning, sqlite.Warning)
131         self.assertEqual(self.cx.Error, sqlite.Error)
132         self.assertEqual(self.cx.InterfaceError, sqlite.InterfaceError)
133         self.assertEqual(self.cx.DatabaseError, sqlite.DatabaseError)
134         self.assertEqual(self.cx.DataError, sqlite.DataError)
135         self.assertEqual(self.cx.OperationalError, sqlite.OperationalError)
136         self.assertEqual(self.cx.IntegrityError, sqlite.IntegrityError)
137         self.assertEqual(self.cx.InternalError, sqlite.InternalError)
138         self.assertEqual(self.cx.ProgrammingError, sqlite.ProgrammingError)
139         self.assertEqual(self.cx.NotSupportedError, sqlite.NotSupportedError)
140
141 class CursorTests(unittest.TestCase):
142     def setUp(self):
143         self.cx = sqlite.connect(":memory:")
144         self.cu = self.cx.cursor()
145         self.cu.execute("create table test(id integer primary key, name text, income number)")
146         self.cu.execute("insert into test(name) values (?)", ("foo",))
147
148     def tearDown(self):
149         self.cu.close()
150         self.cx.close()
151
152     def CheckExecuteNoArgs(self):
153         self.cu.execute("delete from test")
154
155     def CheckExecuteIllegalSql(self):
156         try:
157             self.cu.execute("select asdf")
158             self.fail("should have raised an OperationalError")
159         except sqlite.OperationalError:
160             return
161         except:
162             self.fail("raised wrong exception")
163
164     def CheckExecuteTooMuchSql(self):
165         try:
166             self.cu.execute("select 5+4; select 4+5")
167             self.fail("should have raised a ProgrammingError")
168         except sqlite.ProgrammingError:
169             return
170         except:
171             self.fail("raised wrong exception")
172
173     def CheckExecuteTooMuchSql2(self):
174         self.cu.execute("select 5+4; -- foo bar")
175
176     def CheckExecuteTooMuchSql3(self):
177         self.cu.execute("""
178             select 5+4;
179
180             /*
181             foo
182             */
183             """)
184
185     def CheckExecuteWrongSqlArg(self):
186         try:
187             self.cu.execute(42)
188             self.fail("should have raised a ValueError")
189         except ValueError:
190             return
191         except:
192             self.fail("raised wrong exception.")
193
194     def CheckExecuteArgInt(self):
195         self.cu.execute("insert into test(id) values (?)", (42,))
196
197     def CheckExecuteArgFloat(self):
198         self.cu.execute("insert into test(income) values (?)", (2500.32,))
199
200     def CheckExecuteArgString(self):
201         self.cu.execute("insert into test(name) values (?)", ("Hugo",))
202
203     def CheckExecuteWrongNoOfArgs1(self):
204         # too many parameters
205         try:
206             self.cu.execute("insert into test(id) values (?)", (17, "Egon"))
207             self.fail("should have raised ProgrammingError")
208         except sqlite.ProgrammingError:
209             pass
210
211     def CheckExecuteWrongNoOfArgs2(self):
212         # too little parameters
213         try:
214             self.cu.execute("insert into test(id) values (?)")
215             self.fail("should have raised ProgrammingError")
216         except sqlite.ProgrammingError:
217             pass
218
219     def CheckExecuteWrongNoOfArgs3(self):
220         # no parameters, parameters are needed
221         try:
222             self.cu.execute("insert into test(id) values (?)")
223             self.fail("should have raised ProgrammingError")
224         except sqlite.ProgrammingError:
225             pass
226
227     def CheckExecuteParamList(self):
228         self.cu.execute("insert into test(name) values ('foo')")
229         self.cu.execute("select name from test where name=?", ["foo"])
230         row = self.cu.fetchone()
231         self.assertEqual(row[0], "foo")
232
233     def CheckExecuteParamSequence(self):
234         class L(object):
235             def __len__(self):
236                 return 1
237             def __getitem__(self, x):
238                 assert x == 0
239                 return "foo"
240
241         self.cu.execute("insert into test(name) values ('foo')")
242         self.cu.execute("select name from test where name=?", L())
243         row = self.cu.fetchone()
244         self.assertEqual(row[0], "foo")
245
246     def CheckExecuteDictMapping(self):
247         self.cu.execute("insert into test(name) values ('foo')")
248         self.cu.execute("select name from test where name=:name", {"name": "foo"})
249         row = self.cu.fetchone()
250         self.assertEqual(row[0], "foo")
251
252     def CheckExecuteDictMapping_Mapping(self):
253         # Test only works with Python 2.5 or later
254         if sys.version_info < (2, 5, 0):
255             return
256
257         class D(dict):
258             def __missing__(self, key):
259                 return "foo"
260
261         self.cu.execute("insert into test(name) values ('foo')")
262         self.cu.execute("select name from test where name=:name", D())
263         row = self.cu.fetchone()
264         self.assertEqual(row[0], "foo")
265
266     def CheckExecuteDictMappingTooLittleArgs(self):
267         self.cu.execute("insert into test(name) values ('foo')")
268         try:
269             self.cu.execute("select name from test where name=:name and id=:id", {"name": "foo"})
270             self.fail("should have raised ProgrammingError")
271         except sqlite.ProgrammingError:
272             pass
273
274     def CheckExecuteDictMappingNoArgs(self):
275         self.cu.execute("insert into test(name) values ('foo')")
276         try:
277             self.cu.execute("select name from test where name=:name")
278             self.fail("should have raised ProgrammingError")
279         except sqlite.ProgrammingError:
280             pass
281
282     def CheckExecuteDictMappingUnnamed(self):
283         self.cu.execute("insert into test(name) values ('foo')")
284         try:
285             self.cu.execute("select name from test where name=?", {"name": "foo"})
286             self.fail("should have raised ProgrammingError")
287         except sqlite.ProgrammingError:
288             pass
289
290     def CheckClose(self):
291         self.cu.close()
292
293     def CheckRowcountExecute(self):
294         self.cu.execute("delete from test")
295         self.cu.execute("insert into test(name) values ('foo')")
296         self.cu.execute("insert into test(name) values ('foo')")
297         self.cu.execute("update test set name='bar'")
298         self.assertEqual(self.cu.rowcount, 2)
299
300     def CheckRowcountSelect(self):
301         """
302         pysqlite does not know the rowcount of SELECT statements, because we
303         don't fetch all rows after executing the select statement. The rowcount
304         has thus to be -1.
305         """
306         self.cu.execute("select 5 union select 6")
307         self.assertEqual(self.cu.rowcount, -1)
308
309     def CheckRowcountExecutemany(self):
310         self.cu.execute("delete from test")
311         self.cu.executemany("insert into test(name) values (?)", [(1,), (2,), (3,)])
312         self.assertEqual(self.cu.rowcount, 3)
313
314     def CheckTotalChanges(self):
315         self.cu.execute("insert into test(name) values ('foo')")
316         self.cu.execute("insert into test(name) values ('foo')")
317         if self.cx.total_changes < 2:
318             self.fail("total changes reported wrong value")
319
320     # Checks for executemany:
321     # Sequences are required by the DB-API, iterators
322     # enhancements in pysqlite.
323
324     def CheckExecuteManySequence(self):
325         self.cu.executemany("insert into test(income) values (?)", [(x,) for x in range(100, 110)])
326
327     def CheckExecuteManyIterator(self):
328         class MyIter:
329             def __init__(self):
330                 self.value = 5
331
332             def next(self):
333                 if self.value == 10:
334                     raise StopIteration
335                 else:
336                     self.value += 1
337                     return (self.value,)
338
339         self.cu.executemany("insert into test(income) values (?)", MyIter())
340
341     def CheckExecuteManyGenerator(self):
342         def mygen():
343             for i in range(5):
344                 yield (i,)
345
346         self.cu.executemany("insert into test(income) values (?)", mygen())
347
348     def CheckExecuteManyWrongSqlArg(self):
349         try:
350             self.cu.executemany(42, [(3,)])
351             self.fail("should have raised a ValueError")
352         except ValueError:
353             return
354         except:
355             self.fail("raised wrong exception.")
356
357     def CheckExecuteManySelect(self):
358         try:
359             self.cu.executemany("select ?", [(3,)])
360             self.fail("should have raised a ProgrammingError")
361         except sqlite.ProgrammingError:
362             return
363         except:
364             self.fail("raised wrong exception.")
365
366     def CheckExecuteManyNotIterable(self):
367         try:
368             self.cu.executemany("insert into test(income) values (?)", 42)
369             self.fail("should have raised a TypeError")
370         except TypeError:
371             return
372         except Exception, e:
373             print "raised", e.__class__
374             self.fail("raised wrong exception.")
375
376     def CheckFetchIter(self):
377         # Optional DB-API extension.
378         self.cu.execute("delete from test")
379         self.cu.execute("insert into test(id) values (?)", (5,))
380         self.cu.execute("insert into test(id) values (?)", (6,))
381         self.cu.execute("select id from test order by id")
382         lst = []
383         for row in self.cu:
384             lst.append(row[0])
385         self.assertEqual(lst[0], 5)
386         self.assertEqual(lst[1], 6)
387
388     def CheckFetchone(self):
389         self.cu.execute("select name from test")
390         row = self.cu.fetchone()
391         self.assertEqual(row[0], "foo")
392         row = self.cu.fetchone()
393         self.assertEqual(row, None)
394
395     def CheckFetchoneNoStatement(self):
396         cur = self.cx.cursor()
397         row = cur.fetchone()
398         self.assertEqual(row, None)
399
400     def CheckArraySize(self):
401         # must default ot 1
402         self.assertEqual(self.cu.arraysize, 1)
403
404         # now set to 2
405         self.cu.arraysize = 2
406
407         # now make the query return 3 rows
408         self.cu.execute("delete from test")
409         self.cu.execute("insert into test(name) values ('A')")
410         self.cu.execute("insert into test(name) values ('B')")
411         self.cu.execute("insert into test(name) values ('C')")
412         self.cu.execute("select name from test")
413         res = self.cu.fetchmany()
414
415         self.assertEqual(len(res), 2)
416
417     def CheckFetchmany(self):
418         self.cu.execute("select name from test")
419         res = self.cu.fetchmany(100)
420         self.assertEqual(len(res), 1)
421         res = self.cu.fetchmany(100)
422         self.assertEqual(res, [])
423
424     def CheckFetchmanyKwArg(self):
425         """Checks if fetchmany works with keyword arguments"""
426         self.cu.execute("select name from test")
427         res = self.cu.fetchmany(size=100)
428         self.assertEqual(len(res), 1)
429
430     def CheckFetchall(self):
431         self.cu.execute("select name from test")
432         res = self.cu.fetchall()
433         self.assertEqual(len(res), 1)
434         res = self.cu.fetchall()
435         self.assertEqual(res, [])
436
437     def CheckSetinputsizes(self):
438         self.cu.setinputsizes([3, 4, 5])
439
440     def CheckSetoutputsize(self):
441         self.cu.setoutputsize(5, 0)
442
443     def CheckSetoutputsizeNoColumn(self):
444         self.cu.setoutputsize(42)
445
446     def CheckCursorConnection(self):
447         # Optional DB-API extension.
448         self.assertEqual(self.cu.connection, self.cx)
449
450     def CheckWrongCursorCallable(self):
451         try:
452             def f(): pass
453             cur = self.cx.cursor(f)
454             self.fail("should have raised a TypeError")
455         except TypeError:
456             return
457         self.fail("should have raised a ValueError")
458
459     def CheckCursorWrongClass(self):
460         class Foo: pass
461         foo = Foo()
462         try:
463             cur = sqlite.Cursor(foo)
464             self.fail("should have raised a ValueError")
465         except TypeError:
466             pass
467
468 class ThreadTests(unittest.TestCase):
469     def setUp(self):
470         self.con = sqlite.connect(":memory:")
471         self.cur = self.con.cursor()
472         self.cur.execute("create table test(id integer primary key, name text, bin binary, ratio number, ts timestamp)")
473
474     def tearDown(self):
475         self.cur.close()
476         self.con.close()
477
478     def CheckConCursor(self):
479         def run(con, errors):
480             try:
481                 cur = con.cursor()
482                 errors.append("did not raise ProgrammingError")
483                 return
484             except sqlite.ProgrammingError:
485                 return
486             except:
487                 errors.append("raised wrong exception")
488
489         errors = []
490         t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
491         t.start()
492         t.join()
493         if len(errors) > 0:
494             self.fail("\n".join(errors))
495
496     def CheckConCommit(self):
497         def run(con, errors):
498             try:
499                 con.commit()
500                 errors.append("did not raise ProgrammingError")
501                 return
502             except sqlite.ProgrammingError:
503                 return
504             except:
505                 errors.append("raised wrong exception")
506
507         errors = []
508         t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
509         t.start()
510         t.join()
511         if len(errors) > 0:
512             self.fail("\n".join(errors))
513
514     def CheckConRollback(self):
515         def run(con, errors):
516             try:
517                 con.rollback()
518                 errors.append("did not raise ProgrammingError")
519                 return
520             except sqlite.ProgrammingError:
521                 return
522             except:
523                 errors.append("raised wrong exception")
524
525         errors = []
526         t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
527         t.start()
528         t.join()
529         if len(errors) > 0:
530             self.fail("\n".join(errors))
531
532     def CheckConClose(self):
533         def run(con, errors):
534             try:
535                 con.close()
536                 errors.append("did not raise ProgrammingError")
537                 return
538             except sqlite.ProgrammingError:
539                 return
540             except:
541                 errors.append("raised wrong exception")
542
543         errors = []
544         t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
545         t.start()
546         t.join()
547         if len(errors) > 0:
548             self.fail("\n".join(errors))
549
550     def CheckCurImplicitBegin(self):
551         def run(cur, errors):
552             try:
553                 cur.execute("insert into test(name) values ('a')")
554                 errors.append("did not raise ProgrammingError")
555                 return
556             except sqlite.ProgrammingError:
557                 return
558             except:
559                 errors.append("raised wrong exception")
560
561         errors = []
562         t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
563         t.start()
564         t.join()
565         if len(errors) > 0:
566             self.fail("\n".join(errors))
567
568     def CheckCurClose(self):
569         def run(cur, errors):
570             try:
571                 cur.close()
572                 errors.append("did not raise ProgrammingError")
573                 return
574             except sqlite.ProgrammingError:
575                 return
576             except:
577                 errors.append("raised wrong exception")
578
579         errors = []
580         t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
581         t.start()
582         t.join()
583         if len(errors) > 0:
584             self.fail("\n".join(errors))
585
586     def CheckCurExecute(self):
587         def run(cur, errors):
588             try:
589                 cur.execute("select name from test")
590                 errors.append("did not raise ProgrammingError")
591                 return
592             except sqlite.ProgrammingError:
593                 return
594             except:
595                 errors.append("raised wrong exception")
596
597         errors = []
598         self.cur.execute("insert into test(name) values ('a')")
599         t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
600         t.start()
601         t.join()
602         if len(errors) > 0:
603             self.fail("\n".join(errors))
604
605     def CheckCurIterNext(self):
606         def run(cur, errors):
607             try:
608                 row = cur.fetchone()
609                 errors.append("did not raise ProgrammingError")
610                 return
611             except sqlite.ProgrammingError:
612                 return
613             except:
614                 errors.append("raised wrong exception")
615
616         errors = []
617         self.cur.execute("insert into test(name) values ('a')")
618         self.cur.execute("select name from test")
619         t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
620         t.start()
621         t.join()
622         if len(errors) > 0:
623             self.fail("\n".join(errors))
624
625 class ConstructorTests(unittest.TestCase):
626     def CheckDate(self):
627         d = sqlite.Date(2004, 10, 28)
628
629     def CheckTime(self):
630         t = sqlite.Time(12, 39, 35)
631
632     def CheckTimestamp(self):
633         ts = sqlite.Timestamp(2004, 10, 28, 12, 39, 35)
634
635     def CheckDateFromTicks(self):
636         d = sqlite.DateFromTicks(42)
637
638     def CheckTimeFromTicks(self):
639         t = sqlite.TimeFromTicks(42)
640
641     def CheckTimestampFromTicks(self):
642         ts = sqlite.TimestampFromTicks(42)
643
644     def CheckBinary(self):
645         b = sqlite.Binary(chr(0) + "'")
646
647 class ExtensionTests(unittest.TestCase):
648     def CheckScriptStringSql(self):
649         con = sqlite.connect(":memory:")
650         cur = con.cursor()
651         cur.executescript("""
652             -- bla bla
653             /* a stupid comment */
654             create table a(i);
655             insert into a(i) values (5);
656             """)
657         cur.execute("select i from a")
658         res = cur.fetchone()[0]
659         self.assertEqual(res, 5)
660
661     def CheckScriptStringUnicode(self):
662         con = sqlite.connect(":memory:")
663         cur = con.cursor()
664         cur.executescript(u"""
665             create table a(i);
666             insert into a(i) values (5);
667             select i from a;
668             delete from a;
669             insert into a(i) values (6);
670             """)
671         cur.execute("select i from a")
672         res = cur.fetchone()[0]
673         self.assertEqual(res, 6)
674
675     def CheckScriptSyntaxError(self):
676         con = sqlite.connect(":memory:")
677         cur = con.cursor()
678         raised = False
679         try:
680             cur.executescript("create table test(x); asdf; create table test2(x)")
681         except sqlite.OperationalError:
682             raised = True
683         self.assertEqual(raised, True, "should have raised an exception")
684
685     def CheckScriptErrorNormal(self):
686         con = sqlite.connect(":memory:")
687         cur = con.cursor()
688         raised = False
689         try:
690             cur.executescript("create table test(sadfsadfdsa); select foo from hurz;")
691         except sqlite.OperationalError:
692             raised = True
693         self.assertEqual(raised, True, "should have raised an exception")
694
695     def CheckConnectionExecute(self):
696         con = sqlite.connect(":memory:")
697         result = con.execute("select 5").fetchone()[0]
698         self.assertEqual(result, 5, "Basic test of Connection.execute")
699
700     def CheckConnectionExecutemany(self):
701         con = sqlite.connect(":memory:")
702         con.execute("create table test(foo)")
703         con.executemany("insert into test(foo) values (?)", [(3,), (4,)])
704         result = con.execute("select foo from test order by foo").fetchall()
705         self.assertEqual(result[0][0], 3, "Basic test of Connection.executemany")
706         self.assertEqual(result[1][0], 4, "Basic test of Connection.executemany")
707
708     def CheckConnectionExecutescript(self):
709         con = sqlite.connect(":memory:")
710         con.executescript("create table test(foo); insert into test(foo) values (5);")
711         result = con.execute("select foo from test").fetchone()[0]
712         self.assertEqual(result, 5, "Basic test of Connection.executescript")
713
714 class ClosedConTests(unittest.TestCase):
715     def setUp(self):
716         pass
717
718     def tearDown(self):
719         pass
720
721     def CheckClosedConCursor(self):
722         con = sqlite.connect(":memory:")
723         con.close()
724         try:
725             cur = con.cursor()
726             self.fail("Should have raised a ProgrammingError")
727         except sqlite.ProgrammingError:
728             pass
729         except:
730             self.fail("Should have raised a ProgrammingError")
731
732     def CheckClosedConCommit(self):
733         con = sqlite.connect(":memory:")
734         con.close()
735         try:
736             con.commit()
737             self.fail("Should have raised a ProgrammingError")
738         except sqlite.ProgrammingError:
739             pass
740         except:
741             self.fail("Should have raised a ProgrammingError")
742
743     def CheckClosedConRollback(self):
744         con = sqlite.connect(":memory:")
745         con.close()
746         try:
747             con.rollback()
748             self.fail("Should have raised a ProgrammingError")
749         except sqlite.ProgrammingError:
750             pass
751         except:
752             self.fail("Should have raised a ProgrammingError")
753
754     def CheckClosedCurExecute(self):
755         con = sqlite.connect(":memory:")
756         cur = con.cursor()
757         con.close()
758         try:
759             cur.execute("select 4")
760             self.fail("Should have raised a ProgrammingError")
761         except sqlite.ProgrammingError:
762             pass
763         except:
764             self.fail("Should have raised a ProgrammingError")
765
766     def CheckClosedCreateFunction(self):
767         con = sqlite.connect(":memory:")
768         con.close()
769         def f(x): return 17
770         try:
771             con.create_function("foo", 1, f)
772             self.fail("Should have raised a ProgrammingError")
773         except sqlite.ProgrammingError:
774             pass
775         except:
776             self.fail("Should have raised a ProgrammingError")
777
778     def CheckClosedCreateAggregate(self):
779         con = sqlite.connect(":memory:")
780         con.close()
781         class Agg:
782             def __init__(self):
783                 pass
784             def step(self, x):
785                 pass
786             def finalize(self):
787                 return 17
788         try:
789             con.create_aggregate("foo", 1, Agg)
790             self.fail("Should have raised a ProgrammingError")
791         except sqlite.ProgrammingError:
792             pass
793         except:
794             self.fail("Should have raised a ProgrammingError")
795
796     def CheckClosedSetAuthorizer(self):
797         con = sqlite.connect(":memory:")
798         con.close()
799         def authorizer(*args):
800             return sqlite.DENY
801         try:
802             con.set_authorizer(authorizer)
803             self.fail("Should have raised a ProgrammingError")
804         except sqlite.ProgrammingError:
805             pass
806         except:
807             self.fail("Should have raised a ProgrammingError")
808
809     def CheckClosedSetProgressCallback(self):
810         con = sqlite.connect(":memory:")
811         con.close()
812         def progress(): pass
813         try:
814             con.set_progress_handler(progress, 100)
815             self.fail("Should have raised a ProgrammingError")
816         except sqlite.ProgrammingError:
817             pass
818         except:
819             self.fail("Should have raised a ProgrammingError")
820
821     def CheckClosedCall(self):
822         con = sqlite.connect(":memory:")
823         con.close()
824         try:
825             con()
826             self.fail("Should have raised a ProgrammingError")
827         except sqlite.ProgrammingError:
828             pass
829         except:
830             self.fail("Should have raised a ProgrammingError")
831
832 class ClosedCurTests(unittest.TestCase):
833     def setUp(self):
834         pass
835
836     def tearDown(self):
837         pass
838
839     def CheckClosed(self):
840         con = sqlite.connect(":memory:")
841         cur = con.cursor()
842         cur.close()
843
844         for method_name in ("execute", "executemany", "executescript", "fetchall", "fetchmany", "fetchone"):
845             if method_name in ("execute", "executescript"):
846                 params = ("select 4 union select 5",)
847             elif method_name == "executemany":
848                 params = ("insert into foo(bar) values (?)", [(3,), (4,)])
849             else:
850                 params = []
851
852             try:
853                 method = getattr(cur, method_name)
854
855                 method(*params)
856                 self.fail("Should have raised a ProgrammingError: method " + method_name)
857             except sqlite.ProgrammingError:
858                 pass
859             except:
860                 self.fail("Should have raised a ProgrammingError: " + method_name)
861
862 def suite():
863     module_suite = unittest.makeSuite(ModuleTests, "Check")
864     connection_suite = unittest.makeSuite(ConnectionTests, "Check")
865     cursor_suite = unittest.makeSuite(CursorTests, "Check")
866     thread_suite = unittest.makeSuite(ThreadTests, "Check")
867     constructor_suite = unittest.makeSuite(ConstructorTests, "Check")
868     ext_suite = unittest.makeSuite(ExtensionTests, "Check")
869     closed_con_suite = unittest.makeSuite(ClosedConTests, "Check")
870     closed_cur_suite = unittest.makeSuite(ClosedCurTests, "Check")
871     return unittest.TestSuite((module_suite, connection_suite, cursor_suite, thread_suite, constructor_suite, ext_suite, closed_con_suite, closed_cur_suite))
872
873 def test():
874     runner = unittest.TextTestRunner()
875     runner.run(suite())
876
877 if __name__ == "__main__":
878     test()