Compare commits

...

3 Commits

Author SHA1 Message Date
fcd98c3212 test(ksycopg2): 添加 Python 3 兼容性测试套件
- 实现基本连接测试验证数据库连接功能
- 添加执行 SQL 语句的测试包括创建表和插入数据
- 集成选择查询测试验证数据检索功能
- 实现 LOB 大对象类型的数据处理测试
- 添加 OID 对象标识符类型的读写测试
- 包含新数据类型支持的全面测试覆盖
- 验证中文字符和特殊符号的编码处理
- 实现测试结果检查和失败计数统计功能
2026-03-20 13:12:29 +08:00
29a5cc6f47 Merge remote-tracking branch 'origin/main'
# Conflicts:
#	test_websocket.py
2026-03-13 17:24:45 +08:00
25008d7845 feat(test): 添加WebSocket连接测试脚本
- 新增taosTest.py文件实现TDengine数据库连接测试
- 实现创建子表、插入测试数据和查询功能
- 集成taosws库进行WebSocket数据库操作
- 更新test_websocket.py中的连接地址配置
- 添加时间戳基准和批量数据插入功能
- 实现稳定表和子表的查询对比测试
2026-03-13 17:24:34 +08:00
3 changed files with 372 additions and 13 deletions

100
taosTest.py Normal file
View File

@@ -0,0 +1,100 @@
import taosws
import time
def create_connection():
host = "192.168.1.87"
port = 6041
conn = taosws.connect(
host=host,
port=port,
user="root",
password="wDvfFffdwbm5U15K",
database="tms"
)
print(f"Connected to {host}:{port}")
return conn
def create_sub_table(conn):
sql = """
CREATE TABLE IF NOT EXISTS gj_jxjs_sk_test001
USING gj_jxjs_sk
TAGS ('test001','equip001')
"""
conn.execute(sql)
print("Sub table created")
def insert_test_data(conn):
base_ts = int(time.time() * 1000)
sql = f"""
INSERT INTO gj_jxjs_sk_test001 VALUES
({base_ts}, 'well1',1,1,1,20250311,120000,1,100.5,100.5,100.5,100.5,5.5,10.5,200,200,5.5,5.5,100,100,120,200,200,1,1,1,10,10,1,1,1,1,1,1,1,1,1,1,1,100,5,1,1,1,1,1),
({base_ts+1000}, 'well1',1,1,2,20250311,120001,1,101.5,101.5,101.5,101.5,5.6,10.6,201,201,5.6,5.6,101,101,121,201,201,1,1,1,11,11,1,1,1,1,1,1,1,1,1,1,1,101,5,1,1,1,1,1),
({base_ts+2000}, 'well1',1,1,3,20250311,120002,1,102.5,102.5,102.5,102.5,5.7,10.7,202,202,5.7,5.7,102,102,122,202,202,1,1,1,12,12,1,1,1,1,1,1,1,1,1,1,1,102,5,1,1,1,1,1)
"""
conn.execute(sql)
print("Insert success")
return base_ts
def query_stable(conn, start_ts, end_ts):
sql = f"""
SELECT ts, actual_date, actual_time, deptbitm, chkp, sppa, rpma, torqa, hkla, blkpos, woba
FROM gj_jxjs_sk
WHERE ts > {start_ts} AND ts < {end_ts}
ORDER BY ts ASC
"""
result = conn.query(sql)
print("\nQuery from STABLE (gj_jxjs_sk):")
for row in result:
print(row)
def query_sub_table(conn, start_ts, end_ts):
sql = f"""
SELECT ts, actual_date, actual_time, deptbitm, chkp, sppa, rpma, torqa, hkla, blkpos, woba
FROM gj_jxjs_sk_test001
WHERE ts > {start_ts} AND ts < {end_ts}
ORDER BY ts ASC
"""
result = conn.query(sql)
print("\nQuery from SUB TABLE (gj_jxjs_sk_test001):")
for row in result:
print(row)
if __name__ == "__main__":
conn = create_connection()
try:
# 1 创建子表
create_sub_table(conn)
# 2 插入测试数据
base_ts = insert_test_data(conn)
# 3 查询 STABLE
query_stable(conn, base_ts - 1000, base_ts + 5000)
# 4 查询子表
query_sub_table(conn, base_ts - 1000, base_ts + 5000)
finally:
conn.close()

251
test_ksycopg2-for-py3.py Normal file
View File

@@ -0,0 +1,251 @@
# -*- coding: utf-8 -*-
import ksycopg2
import datetime
database = "training"
user = "system"
password = "PIvtluUE8FUXSEWJ"
host = "192.168.1.41"
port = "54321"
failed = 0
def check(name, val):
if val is None:
global failed
failed += 1
else:
if isinstance(val, ksycopg2.extensions.connection):
print("close connection")
val.close()
print("test", name, "success !", "\n")
def testConn():
try:
conn = ksycopg2.connect(
"dbname={} user={} password={} host={} port={}".format(database, user, password, host, port))
# conn.set_session(autocommit=True)
except Exception as e:
print(e)
return None
else:
return conn
def testConn2():
try:
conn = ksycopg2.connect(database=database, user=user, password=password, host=host, port=port)
cur = conn.cursor()
cur.execute("select version()")
rows = cur.fetchall()
print("database version:", rows[0])
cur.close()
except Exception as e:
print(e)
return None
else:
return conn
def testExecute():
conn = testConn()
if conn is not None:
cur = conn.cursor()
cur.execute('drop table if exists test_ksy')
cur.execute('create table test_ksy(id integer, name TEXT)')
cur.execute("insert into test_ksy values(%s, %s)", (1, "John"))
cur.execute("insert into test_ksy values(%s, %s)", (2, '中文测试文字'))
cur.execute("insert into test_ksy values(%s, %s)", (3, '@#¥%……'))
cur.close()
conn.commit()
conn.close()
return 0
else:
return None
def testSelect():
conn = testConn()
if conn is not None:
cur = conn.cursor()
cur.execute("select * from test_ksy")
rows = cur.fetchall()
for c in cur.description:
print(c.name, "\t", end="")
print()
for row in rows:
for cell in row:
print(cell, " ", end="")
print()
cur.close()
conn.commit()
conn.close()
return 0
else:
return None
def testLob():
conn = testConn()
if conn is not None:
cur = conn.cursor()
cur.execute('drop table if exists test_lob')
cur.execute('create table test_lob(id integer, b bytea, c text, ba bytea)')
ba = bytearray("中文测试字符bytearray", "UTF8")
b = bytes('中文测试字符bytes' * 2, "UTF8")
u = u'中文字unicode' * 3
s = '中文str' * 4
cur.execute("insert into test_lob values(%s, %s, %s, %s)", (1, ba, ba, ba))
cur.execute("insert into test_lob values(%s, %s, %s, %s)", (2, b, b, b))
cur.execute("insert into test_lob values(%s, %s, %s, %s)", (3, u, u, u))
cur.execute("insert into test_lob values(%s, %s, %s, %s)", (4, s, s, s))
cur.execute("select * from test_lob")
rows = cur.fetchall()
for row in rows:
for cell in row:
if isinstance(cell, memoryview):
print(type(cell), cell[:].tobytes().decode('UTF8'), " ", end="")
else:
print(type(cell), cell, " ", end="")
print()
cur.close()
conn.commit()
conn.close()
return 0
else:
return None
def testOID():
conn = testConn()
if conn is not None:
cur = conn.cursor()
cur.execute('drop table if exists test_oid')
cur.execute('create table test_oid(id integer, o OID)')
lo1 = conn.lobject()
lo1.write("raw data")
cur.execute("insert into test_oid values(%s, %s)", (1, lo1.oid))
lo2 = conn.lobject()
lo2.write(b'binary data')
cur.execute("insert into test_oid values(%s, %s)", (3, lo2.oid))
lo3 = conn.lobject()
lo3.write('中文数据 data')
cur.execute("insert into test_oid values(%s, %s)", (3, lo3.oid))
lo1.close()
lo2.close()
lo3.close()
cur.execute("select o from test_oid")
rows = cur.fetchall()
for c in cur.description:
print(c.name, "\t", end="")
print()
for row in rows:
for cell in row:
lo_out = conn.lobject(cell)
r = lo_out.read()
lo_out.close()
print("oid:", cell, ":", r, " ", end="")
print()
cur.close()
conn.commit()
conn.close()
return 0
else:
return None
def testNewType():
conn = testConn()
if conn is not None:
cur = conn.cursor()
cur.execute('drop table if exists test_newtype')
cur.execute('''
create table test_newtype(
num INTEGER,
bcb CHAR(30),
vcb VARCHAR(30),
dt TIMESTAMP,
blob BYTEA,
nclob TEXT
)
''')
cur.execute("insert into test_newtype values(%s, %s, %s, %s, %s, %s)",
(100,
'bpcharbyte_30',
'varcharbyte_30',
'2000-12-01 15:30:12',
b'binary_data',
'text_data'))
cur.execute("insert into test_newtype values(%s, %s, %s, %s, %s, %s)",
(200,
'中文测试数据',
'中文测试数据',
datetime.datetime.now(),
'人大金仓数据库'.encode("utf-8"),
'北京人大金仓'))
cur.execute("select * from test_newtype")
rows = cur.fetchall()
for row in rows:
for cell in row:
if isinstance(cell, memoryview):
print(cell.tobytes().decode('utf-8'), end=" ")
else:
print(cell, end=" ")
print()
cur.close()
conn.commit()
conn.close()
return 0
else:
return None
if __name__ == "__main__":
print("libkci version:", ksycopg2.__libkci_version__)
print("ksycopg2 version:", ksycopg2.__version__)
check("testConn", testConn())
check("testConn2", testConn2())
check("testExecute", testExecute())
check("testSelect", testSelect())
check("testLob", testLob())
check("testOID", testOID())
check("testNewType", testNewType())
print("failed case:", failed)

View File

@@ -1,20 +1,28 @@
import ssl import ssl
import websocket import websocket
# URL = "ws://192.168.1.202:9100/well-tool-test-system/ws/" URL = "wss://192.168.1.87/ws/" # 注意走 443不要再连 8080 了
# 如果你的 WS 路径是 /ws/,就写成上面这样;若是别的路径自己改
URL = "wss://192.168.1.87/ws/" def on_message(ws, msg): print("收到:", msg)
def on_error(ws, err): print("错误:", err)
def on_close(ws, code, reason): print("关闭:", code, reason)
def on_open(ws):
print("连接成功")
ws.send("hello server mac")
if __name__ == "__main__":
websocket.enableTrace(True)
ws = websocket.WebSocketApp( ws = websocket.WebSocketApp(
URL, URL,
on_open=lambda ws: print("连接成功"), on_open=on_open,
on_message=lambda ws, msg: print("消息:", msg), on_message=on_message,
on_error=lambda ws, err: print("错误:", err), on_error=on_error,
on_close=lambda ws, code, reason: print("关闭:", code, reason), on_close=on_close,
# header=["Origin: https://192.168.1.3"] # 如后端不校验 Origin 可删
header=[] # 如后端不校验 Origin 可删
) )
ws.run_forever(sslopt={
ws.run_forever(
sslopt={
"cert_reqs": ssl.CERT_NONE, "cert_reqs": ssl.CERT_NONE,
"check_hostname": False, "check_hostname": False,
} })
)