- 实现基本连接测试验证数据库连接功能 - 添加执行 SQL 语句的测试包括创建表和插入数据 - 集成选择查询测试验证数据检索功能 - 实现 LOB 大对象类型的数据处理测试 - 添加 OID 对象标识符类型的读写测试 - 包含新数据类型支持的全面测试覆盖 - 验证中文字符和特殊符号的编码处理 - 实现测试结果检查和失败计数统计功能
252 lines
6.4 KiB
Python
252 lines
6.4 KiB
Python
# -*- coding: utf-8 -*-
|
||
import ksycopg2
|
||
import datetime
|
||
|
||
database = "training"
|
||
user = "system"
|
||
password = "PIvtluUE8FUXSEWJ"
|
||
host = "192.168.1.41"
|
||
port = "54321"
|
||
|
||
failed = 0
|
||
|
||
|
||
def check(name, val):
|
||
if val is None:
|
||
global failed
|
||
failed += 1
|
||
else:
|
||
if isinstance(val, ksycopg2.extensions.connection):
|
||
print("close connection")
|
||
val.close()
|
||
print("test", name, "success !", "\n")
|
||
|
||
|
||
def testConn():
|
||
try:
|
||
conn = ksycopg2.connect(
|
||
"dbname={} user={} password={} host={} port={}".format(database, user, password, host, port))
|
||
|
||
# conn.set_session(autocommit=True)
|
||
except Exception as e:
|
||
print(e)
|
||
return None
|
||
else:
|
||
return conn
|
||
|
||
|
||
def testConn2():
|
||
try:
|
||
conn = ksycopg2.connect(database=database, user=user, password=password, host=host, port=port)
|
||
cur = conn.cursor()
|
||
cur.execute("select version()")
|
||
rows = cur.fetchall()
|
||
print("database version:", rows[0])
|
||
|
||
cur.close()
|
||
except Exception as e:
|
||
print(e)
|
||
return None
|
||
else:
|
||
return conn
|
||
|
||
|
||
def testExecute():
|
||
conn = testConn()
|
||
if conn is not None:
|
||
cur = conn.cursor()
|
||
|
||
cur.execute('drop table if exists test_ksy')
|
||
cur.execute('create table test_ksy(id integer, name TEXT)')
|
||
|
||
cur.execute("insert into test_ksy values(%s, %s)", (1, "John"))
|
||
cur.execute("insert into test_ksy values(%s, %s)", (2, '中文测试文字'))
|
||
cur.execute("insert into test_ksy values(%s, %s)", (3, '!@#¥%……'))
|
||
|
||
cur.close()
|
||
conn.commit()
|
||
conn.close()
|
||
return 0
|
||
else:
|
||
return None
|
||
|
||
|
||
def testSelect():
|
||
conn = testConn()
|
||
if conn is not None:
|
||
cur = conn.cursor()
|
||
|
||
cur.execute("select * from test_ksy")
|
||
rows = cur.fetchall()
|
||
for c in cur.description:
|
||
print(c.name, "\t", end="")
|
||
print()
|
||
|
||
for row in rows:
|
||
for cell in row:
|
||
print(cell, " ", end="")
|
||
print()
|
||
|
||
cur.close()
|
||
conn.commit()
|
||
conn.close()
|
||
return 0
|
||
else:
|
||
return None
|
||
|
||
|
||
def testLob():
|
||
conn = testConn()
|
||
if conn is not None:
|
||
cur = conn.cursor()
|
||
|
||
cur.execute('drop table if exists test_lob')
|
||
cur.execute('create table test_lob(id integer, b bytea, c text, ba bytea)')
|
||
ba = bytearray("中文测试字符bytearray", "UTF8")
|
||
b = bytes('中文测试字符bytes' * 2, "UTF8")
|
||
u = u'中文字unicode' * 3
|
||
s = '中文str' * 4
|
||
|
||
cur.execute("insert into test_lob values(%s, %s, %s, %s)", (1, ba, ba, ba))
|
||
cur.execute("insert into test_lob values(%s, %s, %s, %s)", (2, b, b, b))
|
||
cur.execute("insert into test_lob values(%s, %s, %s, %s)", (3, u, u, u))
|
||
cur.execute("insert into test_lob values(%s, %s, %s, %s)", (4, s, s, s))
|
||
|
||
cur.execute("select * from test_lob")
|
||
rows = cur.fetchall()
|
||
|
||
for row in rows:
|
||
for cell in row:
|
||
if isinstance(cell, memoryview):
|
||
print(type(cell), cell[:].tobytes().decode('UTF8'), " ", end="")
|
||
else:
|
||
print(type(cell), cell, " ", end="")
|
||
print()
|
||
|
||
cur.close()
|
||
conn.commit()
|
||
conn.close()
|
||
return 0
|
||
else:
|
||
return None
|
||
|
||
|
||
def testOID():
|
||
conn = testConn()
|
||
if conn is not None:
|
||
cur = conn.cursor()
|
||
|
||
cur.execute('drop table if exists test_oid')
|
||
cur.execute('create table test_oid(id integer, o OID)')
|
||
|
||
lo1 = conn.lobject()
|
||
lo1.write("raw data")
|
||
cur.execute("insert into test_oid values(%s, %s)", (1, lo1.oid))
|
||
|
||
lo2 = conn.lobject()
|
||
lo2.write(b'binary data')
|
||
cur.execute("insert into test_oid values(%s, %s)", (3, lo2.oid))
|
||
|
||
lo3 = conn.lobject()
|
||
lo3.write('中文数据 data')
|
||
cur.execute("insert into test_oid values(%s, %s)", (3, lo3.oid))
|
||
|
||
lo1.close()
|
||
lo2.close()
|
||
lo3.close()
|
||
|
||
cur.execute("select o from test_oid")
|
||
rows = cur.fetchall()
|
||
for c in cur.description:
|
||
print(c.name, "\t", end="")
|
||
print()
|
||
|
||
for row in rows:
|
||
for cell in row:
|
||
lo_out = conn.lobject(cell)
|
||
r = lo_out.read()
|
||
lo_out.close()
|
||
|
||
print("oid:", cell, ":", r, " ", end="")
|
||
print()
|
||
|
||
cur.close()
|
||
conn.commit()
|
||
conn.close()
|
||
return 0
|
||
else:
|
||
return None
|
||
|
||
|
||
def testNewType():
|
||
conn = testConn()
|
||
if conn is not None:
|
||
cur = conn.cursor()
|
||
|
||
cur.execute('drop table if exists test_newtype')
|
||
cur.execute('''
|
||
create table test_newtype(
|
||
num INTEGER,
|
||
bcb CHAR(30),
|
||
vcb VARCHAR(30),
|
||
dt TIMESTAMP,
|
||
blob BYTEA,
|
||
nclob TEXT
|
||
)
|
||
''')
|
||
|
||
cur.execute("insert into test_newtype values(%s, %s, %s, %s, %s, %s)",
|
||
(100,
|
||
'bpcharbyte_30',
|
||
'varcharbyte_30',
|
||
'2000-12-01 15:30:12',
|
||
b'binary_data',
|
||
'text_data'))
|
||
|
||
cur.execute("insert into test_newtype values(%s, %s, %s, %s, %s, %s)",
|
||
(200,
|
||
'中文测试数据',
|
||
'中文测试数据',
|
||
datetime.datetime.now(),
|
||
'人大金仓数据库'.encode("utf-8"),
|
||
'北京人大金仓'))
|
||
|
||
cur.execute("select * from test_newtype")
|
||
rows = cur.fetchall()
|
||
|
||
for row in rows:
|
||
for cell in row:
|
||
if isinstance(cell, memoryview):
|
||
print(cell.tobytes().decode('utf-8'), end=" ")
|
||
else:
|
||
print(cell, end=" ")
|
||
print()
|
||
|
||
cur.close()
|
||
conn.commit()
|
||
conn.close()
|
||
return 0
|
||
else:
|
||
return None
|
||
|
||
|
||
if __name__ == "__main__":
|
||
print("libkci version:", ksycopg2.__libkci_version__)
|
||
print("ksycopg2 version:", ksycopg2.__version__)
|
||
|
||
check("testConn", testConn())
|
||
|
||
check("testConn2", testConn2())
|
||
|
||
check("testExecute", testExecute())
|
||
|
||
check("testSelect", testSelect())
|
||
|
||
check("testLob", testLob())
|
||
|
||
check("testOID", testOID())
|
||
|
||
check("testNewType", testNewType())
|
||
|
||
print("failed case:", failed)
|