From fcd98c3212222b640074769968642d5f8d600599 Mon Sep 17 00:00:00 2001 From: wsy182 <2392948297@qq.com> Date: Fri, 20 Mar 2026 13:12:29 +0800 Subject: [PATCH] =?UTF-8?q?test(ksycopg2):=20=E6=B7=BB=E5=8A=A0=20Python?= =?UTF-8?q?=203=20=E5=85=BC=E5=AE=B9=E6=80=A7=E6=B5=8B=E8=AF=95=E5=A5=97?= =?UTF-8?q?=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 实现基本连接测试验证数据库连接功能 - 添加执行 SQL 语句的测试包括创建表和插入数据 - 集成选择查询测试验证数据检索功能 - 实现 LOB 大对象类型的数据处理测试 - 添加 OID 对象标识符类型的读写测试 - 包含新数据类型支持的全面测试覆盖 - 验证中文字符和特殊符号的编码处理 - 实现测试结果检查和失败计数统计功能 --- test_ksycopg2-for-py3.py | 251 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 251 insertions(+) create mode 100644 test_ksycopg2-for-py3.py diff --git a/test_ksycopg2-for-py3.py b/test_ksycopg2-for-py3.py new file mode 100644 index 0000000..d860f2c --- /dev/null +++ b/test_ksycopg2-for-py3.py @@ -0,0 +1,251 @@ +# -*- coding: utf-8 -*- +import ksycopg2 +import datetime + +database = "training" +user = "system" +password = "PIvtluUE8FUXSEWJ" +host = "192.168.1.41" +port = "54321" + +failed = 0 + + +def check(name, val): + if val is None: + global failed + failed += 1 + else: + if isinstance(val, ksycopg2.extensions.connection): + print("close connection") + val.close() + print("test", name, "success !", "\n") + + +def testConn(): + try: + conn = ksycopg2.connect( + "dbname={} user={} password={} host={} port={}".format(database, user, password, host, port)) + + # conn.set_session(autocommit=True) + except Exception as e: + print(e) + return None + else: + return conn + + +def testConn2(): + try: + conn = ksycopg2.connect(database=database, user=user, password=password, host=host, port=port) + cur = conn.cursor() + cur.execute("select version()") + rows = cur.fetchall() + print("database version:", rows[0]) + + cur.close() + except Exception as e: + print(e) + return None + else: + return conn + + +def testExecute(): + conn = testConn() + if conn is not None: + cur = conn.cursor() + + cur.execute('drop table if exists test_ksy') + cur.execute('create table test_ksy(id integer, name TEXT)') + + cur.execute("insert into test_ksy values(%s, %s)", (1, "John")) + cur.execute("insert into test_ksy values(%s, %s)", (2, '中文测试文字')) + cur.execute("insert into test_ksy values(%s, %s)", (3, '!@#¥%……')) + + cur.close() + conn.commit() + conn.close() + return 0 + else: + return None + + +def testSelect(): + conn = testConn() + if conn is not None: + cur = conn.cursor() + + cur.execute("select * from test_ksy") + rows = cur.fetchall() + for c in cur.description: + print(c.name, "\t", end="") + print() + + for row in rows: + for cell in row: + print(cell, " ", end="") + print() + + cur.close() + conn.commit() + conn.close() + return 0 + else: + return None + + +def testLob(): + conn = testConn() + if conn is not None: + cur = conn.cursor() + + cur.execute('drop table if exists test_lob') + cur.execute('create table test_lob(id integer, b bytea, c text, ba bytea)') + ba = bytearray("中文测试字符bytearray", "UTF8") + b = bytes('中文测试字符bytes' * 2, "UTF8") + u = u'中文字unicode' * 3 + s = '中文str' * 4 + + cur.execute("insert into test_lob values(%s, %s, %s, %s)", (1, ba, ba, ba)) + cur.execute("insert into test_lob values(%s, %s, %s, %s)", (2, b, b, b)) + cur.execute("insert into test_lob values(%s, %s, %s, %s)", (3, u, u, u)) + cur.execute("insert into test_lob values(%s, %s, %s, %s)", (4, s, s, s)) + + cur.execute("select * from test_lob") + rows = cur.fetchall() + + for row in rows: + for cell in row: + if isinstance(cell, memoryview): + print(type(cell), cell[:].tobytes().decode('UTF8'), " ", end="") + else: + print(type(cell), cell, " ", end="") + print() + + cur.close() + conn.commit() + conn.close() + return 0 + else: + return None + + +def testOID(): + conn = testConn() + if conn is not None: + cur = conn.cursor() + + cur.execute('drop table if exists test_oid') + cur.execute('create table test_oid(id integer, o OID)') + + lo1 = conn.lobject() + lo1.write("raw data") + cur.execute("insert into test_oid values(%s, %s)", (1, lo1.oid)) + + lo2 = conn.lobject() + lo2.write(b'binary data') + cur.execute("insert into test_oid values(%s, %s)", (3, lo2.oid)) + + lo3 = conn.lobject() + lo3.write('中文数据 data') + cur.execute("insert into test_oid values(%s, %s)", (3, lo3.oid)) + + lo1.close() + lo2.close() + lo3.close() + + cur.execute("select o from test_oid") + rows = cur.fetchall() + for c in cur.description: + print(c.name, "\t", end="") + print() + + for row in rows: + for cell in row: + lo_out = conn.lobject(cell) + r = lo_out.read() + lo_out.close() + + print("oid:", cell, ":", r, " ", end="") + print() + + cur.close() + conn.commit() + conn.close() + return 0 + else: + return None + + +def testNewType(): + conn = testConn() + if conn is not None: + cur = conn.cursor() + + cur.execute('drop table if exists test_newtype') + cur.execute(''' + create table test_newtype( + num INTEGER, + bcb CHAR(30), + vcb VARCHAR(30), + dt TIMESTAMP, + blob BYTEA, + nclob TEXT + ) + ''') + + cur.execute("insert into test_newtype values(%s, %s, %s, %s, %s, %s)", + (100, + 'bpcharbyte_30', + 'varcharbyte_30', + '2000-12-01 15:30:12', + b'binary_data', + 'text_data')) + + cur.execute("insert into test_newtype values(%s, %s, %s, %s, %s, %s)", + (200, + '中文测试数据', + '中文测试数据', + datetime.datetime.now(), + '人大金仓数据库'.encode("utf-8"), + '北京人大金仓')) + + cur.execute("select * from test_newtype") + rows = cur.fetchall() + + for row in rows: + for cell in row: + if isinstance(cell, memoryview): + print(cell.tobytes().decode('utf-8'), end=" ") + else: + print(cell, end=" ") + print() + + cur.close() + conn.commit() + conn.close() + return 0 + else: + return None + + +if __name__ == "__main__": + print("libkci version:", ksycopg2.__libkci_version__) + print("ksycopg2 version:", ksycopg2.__version__) + + check("testConn", testConn()) + + check("testConn2", testConn2()) + + check("testExecute", testExecute()) + + check("testSelect", testSelect()) + + check("testLob", testLob()) + + check("testOID", testOID()) + + check("testNewType", testNewType()) + + print("failed case:", failed)