Kaydet (Commit) 0518f470 authored tarafından Petri Lehtinen's avatar Petri Lehtinen

sqlite3: Handle strings with embedded zeros correctly

Closes #13676.
üst 6ab98136
...@@ -203,6 +203,13 @@ class CursorTests(unittest.TestCase): ...@@ -203,6 +203,13 @@ class CursorTests(unittest.TestCase):
def CheckExecuteArgString(self): def CheckExecuteArgString(self):
self.cu.execute("insert into test(name) values (?)", ("Hugo",)) self.cu.execute("insert into test(name) values (?)", ("Hugo",))
def CheckExecuteArgStringWithZeroByte(self):
self.cu.execute("insert into test(name) values (?)", ("Hu\x00go",))
self.cu.execute("select name from test where id=?", (self.cu.lastrowid,))
row = self.cu.fetchone()
self.assertEqual(row[0], "Hu\x00go")
def CheckExecuteWrongNoOfArgs1(self): def CheckExecuteWrongNoOfArgs1(self):
# too many parameters # too many parameters
try: try:
......
...@@ -189,13 +189,52 @@ class TextFactoryTests(unittest.TestCase): ...@@ -189,13 +189,52 @@ class TextFactoryTests(unittest.TestCase):
def tearDown(self): def tearDown(self):
self.con.close() self.con.close()
class TextFactoryTestsWithEmbeddedZeroBytes(unittest.TestCase):
def setUp(self):
self.con = sqlite.connect(":memory:")
self.con.execute("create table test (value text)")
self.con.execute("insert into test (value) values (?)", ("a\x00b",))
def CheckString(self):
# text_factory defaults to unicode
row = self.con.execute("select value from test").fetchone()
self.assertIs(type(row[0]), unicode)
self.assertEqual(row[0], "a\x00b")
def CheckCustom(self):
# A custom factory should receive an str argument
self.con.text_factory = lambda x: x
row = self.con.execute("select value from test").fetchone()
self.assertIs(type(row[0]), str)
self.assertEqual(row[0], "a\x00b")
def CheckOptimizedUnicodeAsString(self):
# ASCII -> str argument
self.con.text_factory = sqlite.OptimizedUnicode
row = self.con.execute("select value from test").fetchone()
self.assertIs(type(row[0]), str)
self.assertEqual(row[0], "a\x00b")
def CheckOptimizedUnicodeAsUnicode(self):
# Non-ASCII -> unicode argument
self.con.text_factory = sqlite.OptimizedUnicode
self.con.execute("delete from test")
self.con.execute("insert into test (value) values (?)", (u'\0',))
row = self.con.execute("select value from test").fetchone()
self.assertIs(type(row[0]), unicode)
self.assertEqual(row[0], u"\x00")
def tearDown(self):
self.con.close()
def suite(): def suite():
connection_suite = unittest.makeSuite(ConnectionFactoryTests, "Check") connection_suite = unittest.makeSuite(ConnectionFactoryTests, "Check")
cursor_suite = unittest.makeSuite(CursorFactoryTests, "Check") cursor_suite = unittest.makeSuite(CursorFactoryTests, "Check")
row_suite_compat = unittest.makeSuite(RowFactoryTestsBackwardsCompat, "Check") row_suite_compat = unittest.makeSuite(RowFactoryTestsBackwardsCompat, "Check")
row_suite = unittest.makeSuite(RowFactoryTests, "Check") row_suite = unittest.makeSuite(RowFactoryTests, "Check")
text_suite = unittest.makeSuite(TextFactoryTests, "Check") text_suite = unittest.makeSuite(TextFactoryTests, "Check")
return unittest.TestSuite((connection_suite, cursor_suite, row_suite_compat, row_suite, text_suite)) text_zero_bytes_suite = unittest.makeSuite(TextFactoryTestsWithEmbeddedZeroBytes, "Check")
return unittest.TestSuite((connection_suite, cursor_suite, row_suite_compat, row_suite, text_suite, text_zero_bytes_suite))
def test(): def test():
runner = unittest.TextTestRunner() runner = unittest.TextTestRunner()
......
...@@ -90,6 +90,8 @@ Core and Builtins ...@@ -90,6 +90,8 @@ Core and Builtins
Library Library
------- -------
- Issue #13676: Handle strings with embedded zeros correctly in sqlite3.
- Issue #13506: Add '' to path for IDLE Shell when started and restarted with Restart Shell. - Issue #13506: Add '' to path for IDLE Shell when started and restarted with Restart Shell.
Original patches by Marco Scataglini and Roger Serwy. Original patches by Marco Scataglini and Roger Serwy.
......
...@@ -268,16 +268,17 @@ PyObject* _pysqlite_build_column_name(const char* colname) ...@@ -268,16 +268,17 @@ PyObject* _pysqlite_build_column_name(const char* colname)
} }
} }
PyObject* pysqlite_unicode_from_string(const char* val_str, int optimize) PyObject* pysqlite_unicode_from_string(const char* val_str, Py_ssize_t size, int optimize)
{ {
const char* check; const char* check;
Py_ssize_t pos;
int is_ascii = 0; int is_ascii = 0;
if (optimize) { if (optimize) {
is_ascii = 1; is_ascii = 1;
check = val_str; check = val_str;
while (*check) { for (pos = 0; pos < size; pos++) {
if (*check & 0x80) { if (*check & 0x80) {
is_ascii = 0; is_ascii = 0;
break; break;
...@@ -288,9 +289,9 @@ PyObject* pysqlite_unicode_from_string(const char* val_str, int optimize) ...@@ -288,9 +289,9 @@ PyObject* pysqlite_unicode_from_string(const char* val_str, int optimize)
} }
if (is_ascii) { if (is_ascii) {
return PyString_FromString(val_str); return PyString_FromStringAndSize(val_str, size);
} else { } else {
return PyUnicode_DecodeUTF8(val_str, strlen(val_str), NULL); return PyUnicode_DecodeUTF8(val_str, size, NULL);
} }
} }
...@@ -375,10 +376,11 @@ PyObject* _pysqlite_fetch_one_row(pysqlite_Cursor* self) ...@@ -375,10 +376,11 @@ PyObject* _pysqlite_fetch_one_row(pysqlite_Cursor* self)
converted = PyFloat_FromDouble(sqlite3_column_double(self->statement->st, i)); converted = PyFloat_FromDouble(sqlite3_column_double(self->statement->st, i));
} else if (coltype == SQLITE_TEXT) { } else if (coltype == SQLITE_TEXT) {
val_str = (const char*)sqlite3_column_text(self->statement->st, i); val_str = (const char*)sqlite3_column_text(self->statement->st, i);
nbytes = sqlite3_column_bytes(self->statement->st, i);
if ((self->connection->text_factory == (PyObject*)&PyUnicode_Type) if ((self->connection->text_factory == (PyObject*)&PyUnicode_Type)
|| (self->connection->text_factory == pysqlite_OptimizedUnicode)) { || (self->connection->text_factory == pysqlite_OptimizedUnicode)) {
converted = pysqlite_unicode_from_string(val_str, converted = pysqlite_unicode_from_string(val_str, nbytes,
self->connection->text_factory == pysqlite_OptimizedUnicode ? 1 : 0); self->connection->text_factory == pysqlite_OptimizedUnicode ? 1 : 0);
if (!converted) { if (!converted) {
...@@ -391,9 +393,9 @@ PyObject* _pysqlite_fetch_one_row(pysqlite_Cursor* self) ...@@ -391,9 +393,9 @@ PyObject* _pysqlite_fetch_one_row(pysqlite_Cursor* self)
PyErr_SetString(pysqlite_OperationalError, buf); PyErr_SetString(pysqlite_OperationalError, buf);
} }
} else if (self->connection->text_factory == (PyObject*)&PyString_Type) { } else if (self->connection->text_factory == (PyObject*)&PyString_Type) {
converted = PyString_FromString(val_str); converted = PyString_FromStringAndSize(val_str, nbytes);
} else { } else {
converted = PyObject_CallFunction(self->connection->text_factory, "s", val_str); converted = PyObject_CallFunction(self->connection->text_factory, "s#", val_str, nbytes);
} }
} else { } else {
/* coltype == SQLITE_BLOB */ /* coltype == SQLITE_BLOB */
......
...@@ -166,13 +166,13 @@ int pysqlite_statement_bind_parameter(pysqlite_Statement* self, int pos, PyObjec ...@@ -166,13 +166,13 @@ int pysqlite_statement_bind_parameter(pysqlite_Statement* self, int pos, PyObjec
rc = sqlite3_bind_double(self->st, pos, PyFloat_AsDouble(parameter)); rc = sqlite3_bind_double(self->st, pos, PyFloat_AsDouble(parameter));
break; break;
case TYPE_STRING: case TYPE_STRING:
string = PyString_AS_STRING(parameter); PyString_AsStringAndSize(parameter, &string, &buflen);
rc = sqlite3_bind_text(self->st, pos, string, -1, SQLITE_TRANSIENT); rc = sqlite3_bind_text(self->st, pos, string, buflen, SQLITE_TRANSIENT);
break; break;
case TYPE_UNICODE: case TYPE_UNICODE:
stringval = PyUnicode_AsUTF8String(parameter); stringval = PyUnicode_AsUTF8String(parameter);
string = PyString_AsString(stringval); PyString_AsStringAndSize(stringval, &string, &buflen);
rc = sqlite3_bind_text(self->st, pos, string, -1, SQLITE_TRANSIENT); rc = sqlite3_bind_text(self->st, pos, string, buflen, SQLITE_TRANSIENT);
Py_DECREF(stringval); Py_DECREF(stringval);
break; break;
case TYPE_BUFFER: case TYPE_BUFFER:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment