# -*- coding: utf-8 -*- import ksycopg2 import datetime database = "training" user = "system" password = "PIvtluUE8FUXSEWJ" host = "192.168.1.41" port = "54321" failed = 0 def check(name, val): if val is None: global failed failed += 1 else: if isinstance(val, ksycopg2.extensions.connection): print("close connection") val.close() print("test", name, "success !", "\n") def testConn(): try: conn = ksycopg2.connect( "dbname={} user={} password={} host={} port={}".format(database, user, password, host, port)) # conn.set_session(autocommit=True) except Exception as e: print(e) return None else: return conn def testConn2(): try: conn = ksycopg2.connect(database=database, user=user, password=password, host=host, port=port) cur = conn.cursor() cur.execute("select version()") rows = cur.fetchall() print("database version:", rows[0]) cur.close() except Exception as e: print(e) return None else: return conn def testExecute(): conn = testConn() if conn is not None: cur = conn.cursor() cur.execute('drop table if exists test_ksy') cur.execute('create table test_ksy(id integer, name TEXT)') cur.execute("insert into test_ksy values(%s, %s)", (1, "John")) cur.execute("insert into test_ksy values(%s, %s)", (2, '中文测试文字')) cur.execute("insert into test_ksy values(%s, %s)", (3, '!@#¥%……')) cur.close() conn.commit() conn.close() return 0 else: return None def testSelect(): conn = testConn() if conn is not None: cur = conn.cursor() cur.execute("select * from test_ksy") rows = cur.fetchall() for c in cur.description: print(c.name, "\t", end="") print() for row in rows: for cell in row: print(cell, " ", end="") print() cur.close() conn.commit() conn.close() return 0 else: return None def testLob(): conn = testConn() if conn is not None: cur = conn.cursor() cur.execute('drop table if exists test_lob') cur.execute('create table test_lob(id integer, b bytea, c text, ba bytea)') ba = bytearray("中文测试字符bytearray", "UTF8") b = bytes('中文测试字符bytes' * 2, "UTF8") u = u'中文字unicode' * 3 s = '中文str' * 4 cur.execute("insert into test_lob values(%s, %s, %s, %s)", (1, ba, ba, ba)) cur.execute("insert into test_lob values(%s, %s, %s, %s)", (2, b, b, b)) cur.execute("insert into test_lob values(%s, %s, %s, %s)", (3, u, u, u)) cur.execute("insert into test_lob values(%s, %s, %s, %s)", (4, s, s, s)) cur.execute("select * from test_lob") rows = cur.fetchall() for row in rows: for cell in row: if isinstance(cell, memoryview): print(type(cell), cell[:].tobytes().decode('UTF8'), " ", end="") else: print(type(cell), cell, " ", end="") print() cur.close() conn.commit() conn.close() return 0 else: return None def testOID(): conn = testConn() if conn is not None: cur = conn.cursor() cur.execute('drop table if exists test_oid') cur.execute('create table test_oid(id integer, o OID)') lo1 = conn.lobject() lo1.write("raw data") cur.execute("insert into test_oid values(%s, %s)", (1, lo1.oid)) lo2 = conn.lobject() lo2.write(b'binary data') cur.execute("insert into test_oid values(%s, %s)", (3, lo2.oid)) lo3 = conn.lobject() lo3.write('中文数据 data') cur.execute("insert into test_oid values(%s, %s)", (3, lo3.oid)) lo1.close() lo2.close() lo3.close() cur.execute("select o from test_oid") rows = cur.fetchall() for c in cur.description: print(c.name, "\t", end="") print() for row in rows: for cell in row: lo_out = conn.lobject(cell) r = lo_out.read() lo_out.close() print("oid:", cell, ":", r, " ", end="") print() cur.close() conn.commit() conn.close() return 0 else: return None def testNewType(): conn = testConn() if conn is not None: cur = conn.cursor() cur.execute('drop table if exists test_newtype') cur.execute(''' create table test_newtype( num INTEGER, bcb CHAR(30), vcb VARCHAR(30), dt TIMESTAMP, blob BYTEA, nclob TEXT ) ''') cur.execute("insert into test_newtype values(%s, %s, %s, %s, %s, %s)", (100, 'bpcharbyte_30', 'varcharbyte_30', '2000-12-01 15:30:12', b'binary_data', 'text_data')) cur.execute("insert into test_newtype values(%s, %s, %s, %s, %s, %s)", (200, '中文测试数据', '中文测试数据', datetime.datetime.now(), '人大金仓数据库'.encode("utf-8"), '北京人大金仓')) cur.execute("select * from test_newtype") rows = cur.fetchall() for row in rows: for cell in row: if isinstance(cell, memoryview): print(cell.tobytes().decode('utf-8'), end=" ") else: print(cell, end=" ") print() cur.close() conn.commit() conn.close() return 0 else: return None if __name__ == "__main__": print("libkci version:", ksycopg2.__libkci_version__) print("ksycopg2 version:", ksycopg2.__version__) check("testConn", testConn()) check("testConn2", testConn2()) check("testExecute", testExecute()) check("testSelect", testSelect()) check("testLob", testLob()) check("testOID", testOID()) check("testNewType", testNewType()) print("failed case:", failed)