diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 11a8b577..a43356d0 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -84,7 +84,7 @@ jobs: id: determine-branch with: cond: ${{ github.base_ref == 'main' }} - if_true: '3.3.6' + if_true: "main" if_false: '3.0' - name: Checkout tdengine diff --git a/taos/bind2.py b/taos/bind2.py index c7b23fd5..81e8994a 100644 --- a/taos/bind2.py +++ b/taos/bind2.py @@ -91,6 +91,8 @@ def set_value(self, buffer_type, values, precision = PrecisionEnum.Milliseconds) self.varbinary(values) elif buffer_type == FieldType.C_GEOMETRY: self.geometry(values) + elif buffer_type == FieldType.C_BLOB: + self.blob(values) def numeric_common(self, values, ctypes_type, buffer_null_type, buffer_value_type): if type(values) is not tuple and type(values) is not list: @@ -218,6 +220,9 @@ def geometry(self, values): self.buffer_type = FieldType.C_GEOMETRY self._str_to_buffer(values, encode=False) + def blob(self, values): + self.buffer_type = FieldType.C_BLOB + self._str_to_buffer(values, encode=False) class TaosStmt2BindV(ctypes.Structure): _fields_ = [ diff --git a/taos/cinterface.py b/taos/cinterface.py index ba092394..234e08a5 100644 --- a/taos/cinterface.py +++ b/taos/cinterface.py @@ -332,6 +332,8 @@ def taos_query(connection, sql): """ try: ptr = c_char_p(sql.encode("utf-8")) + # if sql.lower().startswith("insert"): + # raise ProgrammingError(sql, 1) res = c_void_p(_libtaos.taos_query(connection, ptr)) errno = taos_errno(res) if errno != 0: @@ -561,7 +563,7 @@ def taos_fetch_block_v3(result, fields=None, field_count=None, decode_binary=Tru raise DatabaseError("Invalid data type returned from database") offsets = [] is_null = [] - if fields[i]["type"] in (FieldType.C_VARCHAR, FieldType.C_NCHAR, FieldType.C_JSON, FieldType.C_VARBINARY, FieldType.C_GEOMETRY): + if fields[i]["type"] in (FieldType.C_VARCHAR, FieldType.C_NCHAR, FieldType.C_JSON, FieldType.C_VARBINARY, FieldType.C_GEOMETRY, FieldType.C_BLOB): offsets = taos_get_column_data_offset(result, i, num_of_rows) f = convert_block_func_v3(fields[i]["type"], decode_binary=decode_binary) blocks[i] = f(data, is_null, num_of_rows, offsets, precision) diff --git a/taos/constants.py b/taos/constants.py index 618e5af5..7b39479e 100644 --- a/taos/constants.py +++ b/taos/constants.py @@ -29,6 +29,7 @@ class FieldType(object): C_JSON = 15 C_VARBINARY = 16 C_DECIMAL = 17 + C_BLOB = 18 C_GEOMETRY = 20 C_DECIMAL64 = 21 # NULL value definition diff --git a/taos/field.py b/taos/field.py index aa4fc25c..5e9dc8e0 100644 --- a/taos/field.py +++ b/taos/field.py @@ -250,6 +250,21 @@ def _crow_varbinary_to_python_block(data, is_null, num_of_rows, nbytes=None, pre res.append(cast(buffer, c_char_p).value) return res +def _crow_blob_to_python_block(data, is_null, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN): + """Function to convert C binary row to python row.""" + assert nbytes is not None + res = [] + for i in range(abs(num_of_rows)): + if is_null[i]: + res.append(None) + else: + rbyte = ctypes.cast(data + nbytes * i, ctypes.POINTER(ctypes.c_uint16))[:1].pop() + chars = ctypes.cast(c_char_p(data + nbytes * i + 4), ctypes.POINTER(c_char * rbyte)) + buffer = create_string_buffer(rbyte + 1) + buffer[:rbyte] = chars[0][:rbyte] + res.append(cast(buffer, c_char_p).value) + return res + def _crow_nchar_to_python_block(data, is_null, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN): """Function to convert C nchar row to python row.""" @@ -301,6 +316,7 @@ def convert_block_func(field_type: FieldType, decode_binary=True): FieldType.C_GEOMETRY: _crow_varbinary_to_python, FieldType.C_DECIMAL: _crow_decimal_to_python, FieldType.C_DECIMAL64: _crow_decimal_to_python, + FieldType.C_BLOB: _crow_blob_to_python_block, } CONVERT_FUNC_BLOCK = { @@ -323,6 +339,7 @@ def convert_block_func(field_type: FieldType, decode_binary=True): FieldType.C_GEOMETRY: _crow_varbinary_to_python_block, FieldType.C_DECIMAL: _crow_decimal_to_python, FieldType.C_DECIMAL64: _crow_decimal_to_python, + FieldType.C_BLOB: _crow_blob_to_python_block, } diff --git a/taos/field_v3.py b/taos/field_v3.py index 5487c35c..bb9b32fa 100644 --- a/taos/field_v3.py +++ b/taos/field_v3.py @@ -6,6 +6,8 @@ _RTYPE = ctypes.c_uint16 _RTYPE_SIZE = ctypes.sizeof(_RTYPE) +_BLOB_RTYPE = ctypes.c_uint32 +_BLOB_RTYPE_SIZE = ctypes.sizeof(_BLOB_RTYPE) def _crow_binary_to_python_block_v3(data, is_null, num_of_rows, offsets, precision=FieldType.C_TIMESTAMP_UNKNOWN): """Function to convert C binary row to python row.""" @@ -48,6 +50,18 @@ def _crow_varbinary_to_python_block_v3(data, is_null, num_of_rows, offsets, prec res.append(chars) return res +def _crow_blob_to_python_block_v3(data, is_null, num_of_rows, offsets, precision=FieldType.C_TIMESTAMP_UNKNOWN): + """Function to convert C varbinary row to python row.""" + assert offsets is not None + res = [] + for i in range(abs(num_of_rows)): + if offsets[i] == -1: + res.append(None) + else: + rbyte = _RTYPE.from_address(data + offsets[i]).value + chars = (ctypes.c_char * rbyte).from_address(data + offsets[i] + _BLOB_RTYPE_SIZE).raw + res.append(chars) + return res def convert_block_func_v3(field_type: FieldType, decode_binary=True): """Get convert block func.""" @@ -63,6 +77,7 @@ def convert_block_func_v3(field_type: FieldType, decode_binary=True): FieldType.C_JSON: _crow_nchar_to_python_block_v3, FieldType.C_VARBINARY: _crow_varbinary_to_python_block_v3, FieldType.C_GEOMETRY: _crow_varbinary_to_python_block_v3, + FieldType.C_BLOB: _crow_blob_to_python_block_v3, } diff --git a/taos/tmq.py b/taos/tmq.py index 40039ebb..7de4959d 100644 --- a/taos/tmq.py +++ b/taos/tmq.py @@ -125,7 +125,7 @@ def value(self): block_data = ctypes.cast(block, ctypes.POINTER(ctypes.c_void_p))[i] if fields[i]["type"] in ( - FieldType.C_VARCHAR, FieldType.C_NCHAR, FieldType.C_JSON, FieldType.C_VARBINARY, FieldType.C_GEOMETRY): + FieldType.C_VARCHAR, FieldType.C_NCHAR, FieldType.C_JSON, FieldType.C_VARBINARY, FieldType.C_GEOMETRY, FieldType.C_BLOB): f = convert_block_func_v3(fields[i]["type"], self.decode_binary) offsets = taos_get_column_data_offset(self.msg, i, num_rows) blocks[i] = f(block_data, [], num_rows, offsets, precision) diff --git a/taos/utils.py b/taos/utils.py index 2b4aac2b..aa6faa54 100644 --- a/taos/utils.py +++ b/taos/utils.py @@ -195,6 +195,8 @@ def checkTypeValid(buffer_type, values): checkString("varbinary", values, [bytes, bytearray]) elif buffer_type == FieldType.C_GEOMETRY: checkString("geometry", values, [bytes, bytearray]) + elif buffer_type == FieldType.C_BLOB: + checkString("blob", values, [bytes, bytearray]) else: err = f"invalid datatype type={buffer_type} values= {values}" raise DataTypeAndRangeError(err) diff --git a/tests/test_query_blob.py b/tests/test_query_blob.py new file mode 100644 index 00000000..1287c820 --- /dev/null +++ b/tests/test_query_blob.py @@ -0,0 +1,53 @@ +from datetime import datetime + +from utils import tear_down_database +from taos import utils, IS_V3 +from taos.error import InterfaceError +import taos + + +def test_query(): + """This test will use fetch_block for rows fetching, significantly faster than rows_iter""" + conn = taos.connect() + conn.execute("drop database if exists test_query_py") + conn.execute("create database if not exists test_query_py") + conn.execute("use test_query_py") + conn.execute("create table if not exists tb1 (ts timestamp, v blob) tags(jt json)") + n = conn.execute('insert into tn1 using tb1 tags(\'{"name":"value"}\') values(now, null)') + print("inserted %d rows" % n) + result = conn.query("select * from tb1") + fields = result.fields + print("fields: ", fields) + assert fields.count == 3 + + results= result.fetch_all() + print("results: ", results) + n = conn.execute('insert into tn1 using tb1 tags(\'{"name":"value"}\') values(now + 10s, "xxxxxxxxxxxxxxxxxxx")') + print("inserted %d rows" % n) + result = conn.query("select * from tb1") + fields = result.fields + print("fields: ", fields) + assert fields.count == 3 + results= result.fetch_all() + print("results: ", results) + + n = conn.execute('insert into tn1 using tb1 tags(\'{"name":"value"}\') values(now + 20s, "\\x7f8290")') + print("inserted %d rows" % n) + result = conn.query("select * from tb1") + fields = result.fields + print("fields: ", fields) + assert fields.count == 3 + results= result.fetch_all() + print("results: ", results) + assert results[0][1] == None + assert results[1][1] == b"xxxxxxxxxxxxxxxxxxx" + assert results[2][1] == b"\x7f\x82\x90" + assert len(results) == 3 + + result.close() + db_name = "test_query_py" + tear_down_database(conn, db_name) + conn.close() + +if __name__ == "__main__": + test_query() diff --git a/tests/test_stmt2.py b/tests/test_stmt2.py index 556ac886..60076289 100644 --- a/tests/test_stmt2.py +++ b/tests/test_stmt2.py @@ -118,12 +118,12 @@ def prepare(conn, dbname, stbname, ntb1, ntb2): conn.execute("create database if not exists %s precision 'ms' " % dbname) conn.select_db(dbname) # stable - sql = f"create table if not exists {dbname}.{stbname}(ts timestamp, name binary(32), sex bool, score int) tags(grade nchar(8), class int)" + sql = f"create table if not exists {dbname}.{stbname}(ts timestamp, name binary(32), sex bool, score int, remarks blob) tags(grade nchar(8), class int)" conn.execute(sql) # normal table - sql = f"create table if not exists {dbname}.{ntb1} (ts timestamp, name varbinary(32), sex bool, score float, geo geometry(128))" + sql = f"create table if not exists {dbname}.{ntb1} (ts timestamp, name varbinary(32), sex bool, score float, geo geometry(128), remarks blob)" conn.execute(sql) - sql = f"create table if not exists {dbname}.{ntb2} (ts timestamp, name varbinary(32), sex bool, score float, geo geometry(128))" + sql = f"create table if not exists {dbname}.{ntb2} (ts timestamp, name varbinary(32), sex bool, score float, geo geometry(128), remarks blob)" conn.execute(sql) @@ -187,7 +187,8 @@ def insert_bind_param_normal_tables(conn, stmt2, dbname, ntb): [b"Mary", b"tom", b"Jack", b"Jane", None ], [0, 3.14, True, 0, 1 ], [98, 99.87, 60, 100, 99 ], - wkbs + wkbs, + [b"binary value_1", b"binary value_2", b"binary value_3", b"binary value_4", b"binary value_5"] ] ] @@ -212,7 +213,8 @@ def insert_bind_param_with_table(conn, stmt2, dbname, stbname, ctb): [1601481600000,1601481600004,"2024-09-19 10:00:00", "2024-09-19 10:00:01.123", datetime(2024,9,20,10,11,12,456)], ["Mary", "Tom", "Jack", "Jane", "alex" ], [0, 1, 1, 0, 1 ], - [98, 80, 60, 100, 99 ] + [98, 80, 60, 100, 99 ], + [b"binary value_1", b"binary value_2", b"binary value_3", b"binary value_4", b"binary value_5"] ] ] @@ -371,7 +373,8 @@ def insert_with_normal_tables(conn, stmt2, dbname, ntb): [b"Mary", b"tom", b"Jack", b"Jane", None ], [0, 3.14, True, 0, 1 ], [98, 99.87, 60, 100, 99 ], - wkbs + wkbs, + [b"binary value_1", b"binary value_2", b"binary value_3", b"binary value_4", b"binary value_5"] ] ] @@ -458,7 +461,7 @@ def test_stmt2_insert(conn): prepare(conn, dbname, stbname, ntb1, ntb2) ctb = 'ctb' # child table - stmt2 = conn.statement2(f"insert into {dbname}.{ctb} using {dbname}.{stbname} tags (?,?) values(?,?,?,?)") + stmt2 = conn.statement2(f"insert into {dbname}.{ctb} using {dbname}.{stbname} tags (?,?) values(?,?,?,?,?)") insert_bind_param_with_table(conn, stmt2, dbname, stbname, ctb) print("insert child table ........................... ok\n") stmt2.close() @@ -480,13 +483,13 @@ def test_stmt2_insert(conn): # stmt2.close() # ntb1 - stmt2 = conn.statement2(f"insert into {dbname}.{ntb1} values(?,?,?,?,?)") + stmt2 = conn.statement2(f"insert into {dbname}.{ntb1} values(?,?,?,?,?,?)") insert_with_normal_tables(conn, stmt2, dbname, ntb1) print("insert normal tables .......................... ok\n") stmt2.close() # ntb2 - stmt2 = conn.statement2(f"insert into {dbname}.{ntb2} values(?,?,?,?,?)") + stmt2 = conn.statement2(f"insert into {dbname}.{ntb2} values(?,?,?,?,?,?)") insert_bind_param_normal_tables(conn, stmt2, dbname, ntb2) print("insert normal tables (bind param) ............. ok\n") stmt2.close() @@ -582,12 +585,12 @@ def test_stmt2_query(conn): # stmt2.close() # print("insert bind & execute ......................... ok\n") - conn.execute(f"insert into d2 using {stbname} tags('grade1', 2) values('2020-10-01 00:00:00.000', 'Mary2', false, 298)") - conn.execute(f"insert into d2 using {stbname} tags('grade1', 2) values('2020-10-01 00:00:00.001', 'Tom2', true, 280)") - conn.execute(f"insert into d2 using {stbname} tags('grade1', 2) values('2020-10-01 00:00:00.002', 'Jack2', true, 260)") - conn.execute(f"insert into d2 using {stbname} tags('grade1', 2) values('2020-10-01 00:00:00.003', 'Jane2', false, 2100)") - conn.execute(f"insert into d2 using {stbname} tags('grade1', 2) values('2020-10-01 00:00:00.004', 'alex2', true, 299)") - conn.execute(f"insert into d2 using {stbname} tags('grade1', 2) values('2020-10-01 00:00:00.005', NULL, false, NULL)") + conn.execute(f"insert into d2 using {stbname} tags('grade1', 2) values('2020-10-01 00:00:00.000', 'Mary2', false, 298, 'XXX')") + conn.execute(f"insert into d2 using {stbname} tags('grade1', 2) values('2020-10-01 00:00:00.001', 'Tom2', true, 280, 'YYY')") + conn.execute(f"insert into d2 using {stbname} tags('grade1', 2) values('2020-10-01 00:00:00.002', 'Jack2', true, 260, 'ZZZ')") + conn.execute(f"insert into d2 using {stbname} tags('grade1', 2) values('2020-10-01 00:00:00.003', 'Jane2', false, 2100, 'WWW')") + conn.execute(f"insert into d2 using {stbname} tags('grade1', 2) values('2020-10-01 00:00:00.004', 'alex2', true, 299, 'ZZZ')") + conn.execute(f"insert into d2 using {stbname} tags('grade1', 2) values('2020-10-01 00:00:00.005', NULL, false, NULL, 'WWW')") # statement2 diff --git a/tests/test_tmq.py b/tests/test_tmq.py index 843c4971..9d28e031 100644 --- a/tests/test_tmq.py +++ b/tests/test_tmq.py @@ -1,7 +1,7 @@ import threading import time from time import sleep - +from decimal import Decimal import taos from taos.tmq import * @@ -15,7 +15,7 @@ def run(self): conn = taos.connect() conn.select_db("tmq_test") for i in range(50): - sql = f"insert into tb1 values (now + {1+i}s, true,1,{i},1,1,1,1,1,1,1,1,1,'1','1','binary value_1','POINT (3.0 5.0)','9876.123456','123456789012.0987654321')" + sql = f"insert into tb1 values (now + {1+i}s, true,1,{i},1,1,1,1,1,1,1,1,1,'1','1','binary value_1','POINT (3.0 5.0)','9876.123456','123456789012.0987654321','xxxxxxxxxxxxxxxxxxx')" conn.execute(sql) print(sql) sleep(0.02) @@ -101,7 +101,8 @@ def pre_test_tmq(precision: str): c15 varbinary(50), c16 geometry(512), c17 decimal(10,6), - c18 decimal(24,10) + c18 decimal(24,10), + c19 blob ) tags ( t1 bool, t2 tinyint unsigned, @@ -125,7 +126,7 @@ def pre_test_tmq(precision: str): conn.execute("create table if not exists tb1 using stb1 tags (true, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, '1', '1')") print("======== start create topic") conn.execute( - "create topic if not exists topic1 as select ts,c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13,c14,c15,c16,c17,c18 from stb1" + "create topic if not exists topic1 as select ts,c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13,c14,c15,c16,c17,c18,c19 from stb1" ) @@ -182,18 +183,27 @@ def test_tmq_assignment(): conn = taos.connect() conn.select_db("tmq_test") conn.execute( - "insert into t1 using stb1 tags(true, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, '1', '1') values (now-4s, true,1,1,1,1,1,1,1,1,1,1,1,'1','1','binary value_1','POINT (3.0 5.0)','9876.123456','123456789012.0987654321')" + "insert into t1 using stb1 tags(true, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, '1', '1') values (now-4s, true,1,1,1,1,1,1,1,1,1,1,1,'1','1','binary value_1','POINT (3.0 5.0)','9876.123456','123456789012.0987654321','axxxxxxxxxxxxxxxxxxxa')" ) conn.execute( - "insert into t2 using stb1 tags(false, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, '1', '1') values (now-3s, true,1,1,1,1,1,1,1,1,1,1,1,'1','1','binary value_1','POINT (3.0 5.0)','-9876.123456','-123456789012.0987654321')" + "insert into t2 using stb1 tags(false, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, '1', '1') values (now-3s, true,2,2,2,2,2,2,2,2,2,2,2,'2','2','binary value_1','POINT (3.0 5.0)','-9876.123456','-123456789012.0987654321','bxxxxxxxxxxxxxxxxxxxb')" ) conn.execute( - "insert into t3 using stb1 tags(true, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, '2', '2') values (now-2s, true,2,2,2,2,2,2,2,2,2,2,2,'2','2','binary value_1','POINT (3.0 5.0)','5676.123','567890121234.5432109876')" + "insert into t3 using stb1 tags(true, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, '2', '2') values (now-2s, true,3,3,3,3,3,3,3,3,3,3,3,'3','3','binary value_1','POINT (3.0 5.0)','5676.123','567890121234.5432109876','cxxxxxxxxxxxxxxxxxxxc')" ) consumer = Consumer({"group.id": "1", "auto.offset.reset": "earliest"}) consumer.subscribe(["topic1"]) + expected_full_data = [ + [None, True, 1, 1, 1, 1, 1, 1, 1, 1, 1.0, 1.0, None, '1', '1', + b'binary value_1', b'\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x08@\x00\x00\x00\x00\x00\x00\x14@', Decimal('9876.123456'), Decimal('123456789012.0987654321'), b'axxxxxxxxxxxxxxxxxxxa'], + [None, True, 2, 2, 2, 2, 2, 2, 2, 2, 2.0, 2.0, None, '2', '2', + b'binary value_1', b'\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x08@\x00\x00\x00\x00\x00\x00\x14@', Decimal('-9876.123456'), Decimal('-123456789012.0987654321'), b'bxxxxxxxxxxxxxxxxxxxb'], + [None, True, 3, 3, 3, 3, 3, 3, 3, 3, 3.0, 3.0, None, '3', '3', + b'binary value_1', b'\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x08@\x00\x00\x00\x00\x00\x00\x14@', Decimal('5676.123'), Decimal('567890121234.5432109876'), b'cxxxxxxxxxxxxxxxxxxxc'] + ] + try: assignment = consumer.assignment() assert assignment[0].offset == 0 @@ -206,26 +216,35 @@ def test_tmq_assignment(): database = message.database() print(f"topic: {topic}, database: {database}") - for block in message: + + + for i, block in enumerate(message): nrows = block.nrows() ncols = block.ncols() - for row in block: - print(row) values = block.fetchall() + print(f"nrows: {nrows}, ncols: {ncols}, values: {values}, {i}, {expected_full_data[i]}") + # 调用检查函数 + check_values(values, nrows, ncols, expected_data=expected_full_data) print(f"nrows: {nrows}, ncols: {ncols}, values: {values}") consumer.commit(message) + print("====== after insert, start check assignment") + table_num = 10 data_num = 10 for i in range(table_num): for j in range(data_num): conn.execute( - f"insert into t{i} using stb1 tags(true, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, '1', '1') values (now, true,1,1,1,1,1,1,1,1,1,1,1,'1','1','binary value_1','POINT (3.0 5.0)','9876.123456','123456789012.0987654321')" + f"insert into t{i} using stb1 tags(true, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, '1', '1') values (now, true,1,1,1,1,1,1,1,1,1,1,1,'1','1','binary value_1','POINT (3.0 5.0)','9876.123456','123456789012.0987654321','xxxxxxxxxxxxxxxxxxx')" ) message = consumer.poll(5) - print(f"< after > insert message: {message}") + + expected_full_data = [ + [None, True, 1, 1, 1, 1, 1, 1, 1, 1, 1.0, 1.0, None, '1', '1', + b'binary value_1', b'\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x08@\x00\x00\x00\x00\x00\x00\x14@', Decimal('9876.123456'), Decimal('123456789012.0987654321'), b'xxxxxxxxxxxxxxxxxxx'] + ] if message: topic = message.topic() database = message.database() @@ -234,11 +253,10 @@ def test_tmq_assignment(): for block in message: nrows = block.nrows() ncols = block.ncols() - for row in block: - print(row) values = block.fetchall() + check_values(values, nrows, ncols, expected_full_data) print(f"nrows: {nrows}, ncols: {ncols}, values: {values}") - + consumer.commit(message) consumer.commit(message) @@ -251,7 +269,31 @@ def test_tmq_assignment(): finally: consumer.unsubscribe() consumer.close() - after_ter_tmq() + # after_ter_tmq() + +def check_values(values, nrows, ncols, expected_data=None): + assert values is not None, "values should not be None" + assert isinstance(values, list), "values should be a list" + assert len(values) == nrows, f"values length {len(values)} should equal nrows {nrows}" + + if isinstance(expected_data, list): + print(f"Checking {len(values)} rows against expected data") + + for i, actual_row in enumerate(values): + index = actual_row[2] - 1 + if index < len(expected_data): + expected_row = expected_data[index] + print(f"Checking row {i}: actual length={len(actual_row)}, expected length={len(expected_row)}") + assert len(actual_row) == len(expected_row), f"row {i} length mismatch: got {len(actual_row)}, expected {len(expected_row)}" + + for j in range(1, len(actual_row)): + if j < len(expected_row) and expected_row[j] is not None: + assert actual_row[j] == expected_row[j], \ + f"row {i} col {j} mismatch: expected {expected_row[j]}, got {actual_row[j]}" + else: + print(f"Warning: No expected data for row {i}") + + print(f"✅ Values validation passed: {len(values)} rows") def test_tmq_seek(): @@ -260,10 +302,10 @@ def test_tmq_seek(): pre_test_tmq("") conn = taos.connect() conn.select_db("tmq_test") - conn.execute("insert into tb1 values (now-4s, true,1,1,1,1,1,1,1,1,1,1,1,'1','1','binary value_1','POINT (3.0 5.0)','9876.123456','123456789012.0987654321')") - conn.execute("insert into tb1 values (now-3s, true,1,1,1,1,1,1,1,1,1,1,1,'1','1','binary value_1','POINT (3.0 5.0)','9876.123456','123456789012.0987654321')") - conn.execute("insert into tb1 values (now-2s, true,2,2,2,2,2,2,2,2,2,2,2,'2','2','binary value_1','POINT (3.0 5.0)','9876.123456','123456789012.0987654321')") - conn.execute("insert into tb1 values (now-1s, true,2,2,2,2,2,2,2,2,2,2,2,'2','2','binary value_1','POINT (3.0 5.0)','9876.123456','123456789012.0987654321')") + conn.execute("insert into tb1 values (now-4s, true,1,1,1,1,1,1,1,1,1,1,1,'1','1','binary value_1','POINT (3.0 5.0)','9876.123456','123456789012.0987654321','xxxxxxxxxxxxxxxxxxx')") + conn.execute("insert into tb1 values (now-3s, true,1,1,1,1,1,1,1,1,1,1,1,'1','1','binary value_1','POINT (3.0 5.0)','9876.123456','123456789012.0987654321','xxxxxxxxxxxxxxxxxxx')") + conn.execute("insert into tb1 values (now-2s, true,2,2,2,2,2,2,2,2,2,2,2,'2','2','binary value_1','POINT (3.0 5.0)','9876.123456','123456789012.0987654321','xxxxxxxxxxxxxxxxxxx')") + conn.execute("insert into tb1 values (now-1s, true,2,2,2,2,2,2,2,2,2,2,2,'2','2','binary value_1','POINT (3.0 5.0)','9876.123456','123456789012.0987654321','xxxxxxxxxxxxxxxxxxx')") conf = { "group.id": "1", @@ -311,17 +353,17 @@ def test_tmq_committed_and_position(): conn = taos.connect() conn.select_db("tmq_test") - conn.execute("insert into tb1 values (now-4s, true,1,1,1,1,1,1,1,1,1,1,1,'1','1','binary value_1','POINT (3.0 5.0)','9876.123456','123456789012.0987654321')") - conn.execute("insert into tb1 values (now-3s, true,1,1,1,1,1,1,1,1,1,1,1,'1','1','binary value_1','POINT (3.0 5.0)','9876.123456','123456789012.0987654321')") - conn.execute("insert into tb1 values (now-2s, true,2,2,2,2,2,2,2,2,2,2,2,'2','2','binary value_1','POINT (3.0 5.0)','9876.123456','123456789012.0987654321')") - conn.execute("insert into tb1 values (now-1s, true,2,2,2,2,2,2,2,2,2,2,2,'2','2','binary value_1','POINT (3.0 5.0)','9876.123456','123456789012.0987654321')") + conn.execute("insert into tb1 values (now-4s, true,1,1,1,1,1,1,1,1,1,1,1,'1','1','binary value_1','POINT (3.0 5.0)','9876.123456','123456789012.0987654321','xxxxxxxxxxxxxxxxxxx')") + conn.execute("insert into tb1 values (now-3s, true,1,1,1,1,1,1,1,1,1,1,1,'1','1','binary value_1','POINT (3.0 5.0)','9876.123456','123456789012.0987654321','xxxxxxxxxxxxxxxxxxx')") + conn.execute("insert into tb1 values (now-2s, true,2,2,2,2,2,2,2,2,2,2,2,'2','2','binary value_1','POINT (3.0 5.0)','9876.123456','123456789012.0987654321','xxxxxxxxxxxxxxxxxxx')") + conn.execute("insert into tb1 values (now-1s, true,2,2,2,2,2,2,2,2,2,2,2,'2','2','binary value_1','POINT (3.0 5.0)','9876.123456','123456789012.0987654321','xxxxxxxxxxxxxxxxxxx')") consumer = Consumer({"group.id": "1"}) consumer.subscribe(["topic1"]) try: consumer.poll(1) - res = consumer.poll(1) + consumer.poll(1) consumer.commit() topic_partitions = consumer.assignment() committees = consumer.committed(topic_partitions)