diff --git a/test_ksycopg2-for-py3.py b/test_ksycopg2-for-py3.py new file mode 100644 index 0000000..d860f2c --- /dev/null +++ b/test_ksycopg2-for-py3.py @@ -0,0 +1,251 @@ +# -*- coding: utf-8 -*- +import ksycopg2 +import datetime + +database = "training" +user = "system" +password = "PIvtluUE8FUXSEWJ" +host = "192.168.1.41" +port = "54321" + +failed = 0 + + +def check(name, val): + if val is None: + global failed + failed += 1 + else: + if isinstance(val, ksycopg2.extensions.connection): + print("close connection") + val.close() + print("test", name, "success !", "\n") + + +def testConn(): + try: + conn = ksycopg2.connect( + "dbname={} user={} password={} host={} port={}".format(database, user, password, host, port)) + + # conn.set_session(autocommit=True) + except Exception as e: + print(e) + return None + else: + return conn + + +def testConn2(): + try: + conn = ksycopg2.connect(database=database, user=user, password=password, host=host, port=port) + cur = conn.cursor() + cur.execute("select version()") + rows = cur.fetchall() + print("database version:", rows[0]) + + cur.close() + except Exception as e: + print(e) + return None + else: + return conn + + +def testExecute(): + conn = testConn() + if conn is not None: + cur = conn.cursor() + + cur.execute('drop table if exists test_ksy') + cur.execute('create table test_ksy(id integer, name TEXT)') + + cur.execute("insert into test_ksy values(%s, %s)", (1, "John")) + cur.execute("insert into test_ksy values(%s, %s)", (2, '中文测试文字')) + cur.execute("insert into test_ksy values(%s, %s)", (3, '!@#¥%……')) + + cur.close() + conn.commit() + conn.close() + return 0 + else: + return None + + +def testSelect(): + conn = testConn() + if conn is not None: + cur = conn.cursor() + + cur.execute("select * from test_ksy") + rows = cur.fetchall() + for c in cur.description: + print(c.name, "\t", end="") + print() + + for row in rows: + for cell in row: + print(cell, " ", end="") + print() + + cur.close() + conn.commit() + conn.close() + return 0 + else: + return None + + +def testLob(): + conn = testConn() + if conn is not None: + cur = conn.cursor() + + cur.execute('drop table if exists test_lob') + cur.execute('create table test_lob(id integer, b bytea, c text, ba bytea)') + ba = bytearray("中文测试字符bytearray", "UTF8") + b = bytes('中文测试字符bytes' * 2, "UTF8") + u = u'中文字unicode' * 3 + s = '中文str' * 4 + + cur.execute("insert into test_lob values(%s, %s, %s, %s)", (1, ba, ba, ba)) + cur.execute("insert into test_lob values(%s, %s, %s, %s)", (2, b, b, b)) + cur.execute("insert into test_lob values(%s, %s, %s, %s)", (3, u, u, u)) + cur.execute("insert into test_lob values(%s, %s, %s, %s)", (4, s, s, s)) + + cur.execute("select * from test_lob") + rows = cur.fetchall() + + for row in rows: + for cell in row: + if isinstance(cell, memoryview): + print(type(cell), cell[:].tobytes().decode('UTF8'), " ", end="") + else: + print(type(cell), cell, " ", end="") + print() + + cur.close() + conn.commit() + conn.close() + return 0 + else: + return None + + +def testOID(): + conn = testConn() + if conn is not None: + cur = conn.cursor() + + cur.execute('drop table if exists test_oid') + cur.execute('create table test_oid(id integer, o OID)') + + lo1 = conn.lobject() + lo1.write("raw data") + cur.execute("insert into test_oid values(%s, %s)", (1, lo1.oid)) + + lo2 = conn.lobject() + lo2.write(b'binary data') + cur.execute("insert into test_oid values(%s, %s)", (3, lo2.oid)) + + lo3 = conn.lobject() + lo3.write('中文数据 data') + cur.execute("insert into test_oid values(%s, %s)", (3, lo3.oid)) + + lo1.close() + lo2.close() + lo3.close() + + cur.execute("select o from test_oid") + rows = cur.fetchall() + for c in cur.description: + print(c.name, "\t", end="") + print() + + for row in rows: + for cell in row: + lo_out = conn.lobject(cell) + r = lo_out.read() + lo_out.close() + + print("oid:", cell, ":", r, " ", end="") + print() + + cur.close() + conn.commit() + conn.close() + return 0 + else: + return None + + +def testNewType(): + conn = testConn() + if conn is not None: + cur = conn.cursor() + + cur.execute('drop table if exists test_newtype') + cur.execute(''' + create table test_newtype( + num INTEGER, + bcb CHAR(30), + vcb VARCHAR(30), + dt TIMESTAMP, + blob BYTEA, + nclob TEXT + ) + ''') + + cur.execute("insert into test_newtype values(%s, %s, %s, %s, %s, %s)", + (100, + 'bpcharbyte_30', + 'varcharbyte_30', + '2000-12-01 15:30:12', + b'binary_data', + 'text_data')) + + cur.execute("insert into test_newtype values(%s, %s, %s, %s, %s, %s)", + (200, + '中文测试数据', + '中文测试数据', + datetime.datetime.now(), + '人大金仓数据库'.encode("utf-8"), + '北京人大金仓')) + + cur.execute("select * from test_newtype") + rows = cur.fetchall() + + for row in rows: + for cell in row: + if isinstance(cell, memoryview): + print(cell.tobytes().decode('utf-8'), end=" ") + else: + print(cell, end=" ") + print() + + cur.close() + conn.commit() + conn.close() + return 0 + else: + return None + + +if __name__ == "__main__": + print("libkci version:", ksycopg2.__libkci_version__) + print("ksycopg2 version:", ksycopg2.__version__) + + check("testConn", testConn()) + + check("testConn2", testConn2()) + + check("testExecute", testExecute()) + + check("testSelect", testSelect()) + + check("testLob", testLob()) + + check("testOID", testOID()) + + check("testNewType", testNewType()) + + print("failed case:", failed)