Skip to content

feat: native support blob type #357

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Jul 24, 2025
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ jobs:
id: determine-branch
with:
cond: ${{ github.base_ref == 'main' }}
if_true: '3.3.6'
if_true: "main"
if_false: '3.0'

- name: Checkout tdengine
Expand Down
5 changes: 5 additions & 0 deletions taos/bind2.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ def set_value(self, buffer_type, values, precision = PrecisionEnum.Milliseconds)
self.varbinary(values)
elif buffer_type == FieldType.C_GEOMETRY:
self.geometry(values)
elif buffer_type == FieldType.C_BLOB:
self.blob(values)

def numeric_common(self, values, ctypes_type, buffer_null_type, buffer_value_type):
if type(values) is not tuple and type(values) is not list:
Expand Down Expand Up @@ -218,6 +220,9 @@ def geometry(self, values):
self.buffer_type = FieldType.C_GEOMETRY
self._str_to_buffer(values, encode=False)

def blob(self, values):
self.buffer_type = FieldType.C_BLOB
self._str_to_buffer(values, encode=False)

class TaosStmt2BindV(ctypes.Structure):
_fields_ = [
Expand Down
4 changes: 3 additions & 1 deletion taos/cinterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ def taos_query(connection, sql):
"""
try:
ptr = c_char_p(sql.encode("utf-8"))
# if sql.lower().startswith("insert"):
# raise ProgrammingError(sql, 1)
res = c_void_p(_libtaos.taos_query(connection, ptr))
errno = taos_errno(res)
if errno != 0:
Expand Down Expand Up @@ -561,7 +563,7 @@ def taos_fetch_block_v3(result, fields=None, field_count=None, decode_binary=Tru
raise DatabaseError("Invalid data type returned from database")
offsets = []
is_null = []
if fields[i]["type"] in (FieldType.C_VARCHAR, FieldType.C_NCHAR, FieldType.C_JSON, FieldType.C_VARBINARY, FieldType.C_GEOMETRY):
if fields[i]["type"] in (FieldType.C_VARCHAR, FieldType.C_NCHAR, FieldType.C_JSON, FieldType.C_VARBINARY, FieldType.C_GEOMETRY, FieldType.C_BLOB):
offsets = taos_get_column_data_offset(result, i, num_of_rows)
f = convert_block_func_v3(fields[i]["type"], decode_binary=decode_binary)
blocks[i] = f(data, is_null, num_of_rows, offsets, precision)
Expand Down
1 change: 1 addition & 0 deletions taos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class FieldType(object):
C_JSON = 15
C_VARBINARY = 16
C_DECIMAL = 17
C_BLOB = 18
C_GEOMETRY = 20
C_DECIMAL64 = 21
# NULL value definition
Expand Down
17 changes: 17 additions & 0 deletions taos/field.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,21 @@ def _crow_varbinary_to_python_block(data, is_null, num_of_rows, nbytes=None, pre
res.append(cast(buffer, c_char_p).value)
return res

def _crow_blob_to_python_block(data, is_null, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C binary row to python row."""
assert nbytes is not None
res = []
for i in range(abs(num_of_rows)):
if is_null[i]:
res.append(None)
else:
rbyte = ctypes.cast(data + nbytes * i, ctypes.POINTER(ctypes.c_uint16))[:1].pop()
chars = ctypes.cast(c_char_p(data + nbytes * i + 4), ctypes.POINTER(c_char * rbyte))
buffer = create_string_buffer(rbyte + 1)
buffer[:rbyte] = chars[0][:rbyte]
res.append(cast(buffer, c_char_p).value)
return res


def _crow_nchar_to_python_block(data, is_null, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C nchar row to python row."""
Expand Down Expand Up @@ -301,6 +316,7 @@ def convert_block_func(field_type: FieldType, decode_binary=True):
FieldType.C_GEOMETRY: _crow_varbinary_to_python,
FieldType.C_DECIMAL: _crow_decimal_to_python,
FieldType.C_DECIMAL64: _crow_decimal_to_python,
FieldType.C_BLOB: _crow_blob_to_python_block,
}

CONVERT_FUNC_BLOCK = {
Expand All @@ -323,6 +339,7 @@ def convert_block_func(field_type: FieldType, decode_binary=True):
FieldType.C_GEOMETRY: _crow_varbinary_to_python_block,
FieldType.C_DECIMAL: _crow_decimal_to_python,
FieldType.C_DECIMAL64: _crow_decimal_to_python,
FieldType.C_BLOB: _crow_blob_to_python_block,
}


Expand Down
15 changes: 15 additions & 0 deletions taos/field_v3.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
_RTYPE = ctypes.c_uint16
_RTYPE_SIZE = ctypes.sizeof(_RTYPE)

_BLOB_RTYPE = ctypes.c_uint32
_BLOB_RTYPE_SIZE = ctypes.sizeof(_BLOB_RTYPE)

def _crow_binary_to_python_block_v3(data, is_null, num_of_rows, offsets, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C binary row to python row."""
Expand Down Expand Up @@ -48,6 +50,18 @@ def _crow_varbinary_to_python_block_v3(data, is_null, num_of_rows, offsets, prec
res.append(chars)
return res

def _crow_blob_to_python_block_v3(data, is_null, num_of_rows, offsets, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C varbinary row to python row."""
assert offsets is not None
res = []
for i in range(abs(num_of_rows)):
if offsets[i] == -1:
res.append(None)
else:
rbyte = _RTYPE.from_address(data + offsets[i]).value
chars = (ctypes.c_char * rbyte).from_address(data + offsets[i] + _BLOB_RTYPE_SIZE).raw
res.append(chars)
return res

def convert_block_func_v3(field_type: FieldType, decode_binary=True):
"""Get convert block func."""
Expand All @@ -63,6 +77,7 @@ def convert_block_func_v3(field_type: FieldType, decode_binary=True):
FieldType.C_JSON: _crow_nchar_to_python_block_v3,
FieldType.C_VARBINARY: _crow_varbinary_to_python_block_v3,
FieldType.C_GEOMETRY: _crow_varbinary_to_python_block_v3,
FieldType.C_BLOB: _crow_blob_to_python_block_v3,
}


Expand Down
2 changes: 1 addition & 1 deletion taos/tmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def value(self):

block_data = ctypes.cast(block, ctypes.POINTER(ctypes.c_void_p))[i]
if fields[i]["type"] in (
FieldType.C_VARCHAR, FieldType.C_NCHAR, FieldType.C_JSON, FieldType.C_VARBINARY, FieldType.C_GEOMETRY):
FieldType.C_VARCHAR, FieldType.C_NCHAR, FieldType.C_JSON, FieldType.C_VARBINARY, FieldType.C_GEOMETRY, FieldType.C_BLOB):
f = convert_block_func_v3(fields[i]["type"], self.decode_binary)
offsets = taos_get_column_data_offset(self.msg, i, num_rows)
blocks[i] = f(block_data, [], num_rows, offsets, precision)
Expand Down
2 changes: 2 additions & 0 deletions taos/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ def checkTypeValid(buffer_type, values):
checkString("varbinary", values, [bytes, bytearray])
elif buffer_type == FieldType.C_GEOMETRY:
checkString("geometry", values, [bytes, bytearray])
elif buffer_type == FieldType.C_BLOB:
checkString("blob", values, [bytes, bytearray])
else:
err = f"invalid datatype type={buffer_type} values= {values}"
raise DataTypeAndRangeError(err)
53 changes: 53 additions & 0 deletions tests/test_query_blob.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from datetime import datetime

from utils import tear_down_database
from taos import utils, IS_V3
from taos.error import InterfaceError
import taos


def test_query():
"""This test will use fetch_block for rows fetching, significantly faster than rows_iter"""
conn = taos.connect()
conn.execute("drop database if exists test_query_py")
conn.execute("create database if not exists test_query_py")
conn.execute("use test_query_py")
conn.execute("create table if not exists tb1 (ts timestamp, v blob) tags(jt json)")
n = conn.execute('insert into tn1 using tb1 tags(\'{"name":"value"}\') values(now, null)')
print("inserted %d rows" % n)
result = conn.query("select * from tb1")
fields = result.fields
print("fields: ", fields)
assert fields.count == 3

results= result.fetch_all()
print("results: ", results)
n = conn.execute('insert into tn1 using tb1 tags(\'{"name":"value"}\') values(now + 10s, "xxxxxxxxxxxxxxxxxxx")')
print("inserted %d rows" % n)
result = conn.query("select * from tb1")
fields = result.fields
print("fields: ", fields)
assert fields.count == 3
results= result.fetch_all()
print("results: ", results)

n = conn.execute('insert into tn1 using tb1 tags(\'{"name":"value"}\') values(now + 20s, "\\x7f8290")')
print("inserted %d rows" % n)
result = conn.query("select * from tb1")
fields = result.fields
print("fields: ", fields)
assert fields.count == 3
results= result.fetch_all()
print("results: ", results)
assert results[0][1] == None
assert results[1][1] == b"xxxxxxxxxxxxxxxxxxx"
assert results[2][1] == b"\x7f\x82\x90"
assert len(results) == 3

result.close()
db_name = "test_query_py"
tear_down_database(conn, db_name)
conn.close()

if __name__ == "__main__":
test_query()
33 changes: 18 additions & 15 deletions tests/test_stmt2.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,12 @@ def prepare(conn, dbname, stbname, ntb1, ntb2):
conn.execute("create database if not exists %s precision 'ms' " % dbname)
conn.select_db(dbname)
# stable
sql = f"create table if not exists {dbname}.{stbname}(ts timestamp, name binary(32), sex bool, score int) tags(grade nchar(8), class int)"
sql = f"create table if not exists {dbname}.{stbname}(ts timestamp, name binary(32), sex bool, score int, remarks blob) tags(grade nchar(8), class int)"
conn.execute(sql)
# normal table
sql = f"create table if not exists {dbname}.{ntb1} (ts timestamp, name varbinary(32), sex bool, score float, geo geometry(128))"
sql = f"create table if not exists {dbname}.{ntb1} (ts timestamp, name varbinary(32), sex bool, score float, geo geometry(128), remarks blob)"
conn.execute(sql)
sql = f"create table if not exists {dbname}.{ntb2} (ts timestamp, name varbinary(32), sex bool, score float, geo geometry(128))"
sql = f"create table if not exists {dbname}.{ntb2} (ts timestamp, name varbinary(32), sex bool, score float, geo geometry(128), remarks blob)"
conn.execute(sql)


Expand Down Expand Up @@ -187,7 +187,8 @@ def insert_bind_param_normal_tables(conn, stmt2, dbname, ntb):
[b"Mary", b"tom", b"Jack", b"Jane", None ],
[0, 3.14, True, 0, 1 ],
[98, 99.87, 60, 100, 99 ],
wkbs
wkbs,
[b"binary value_1", b"binary value_2", b"binary value_3", b"binary value_4", b"binary value_5"]
]
]

Expand All @@ -212,7 +213,8 @@ def insert_bind_param_with_table(conn, stmt2, dbname, stbname, ctb):
[1601481600000,1601481600004,"2024-09-19 10:00:00", "2024-09-19 10:00:01.123", datetime(2024,9,20,10,11,12,456)],
["Mary", "Tom", "Jack", "Jane", "alex" ],
[0, 1, 1, 0, 1 ],
[98, 80, 60, 100, 99 ]
[98, 80, 60, 100, 99 ],
[b"binary value_1", b"binary value_2", b"binary value_3", b"binary value_4", b"binary value_5"]
]
]

Expand Down Expand Up @@ -371,7 +373,8 @@ def insert_with_normal_tables(conn, stmt2, dbname, ntb):
[b"Mary", b"tom", b"Jack", b"Jane", None ],
[0, 3.14, True, 0, 1 ],
[98, 99.87, 60, 100, 99 ],
wkbs
wkbs,
[b"binary value_1", b"binary value_2", b"binary value_3", b"binary value_4", b"binary value_5"]
]
]

Expand Down Expand Up @@ -458,7 +461,7 @@ def test_stmt2_insert(conn):
prepare(conn, dbname, stbname, ntb1, ntb2)

ctb = 'ctb' # child table
stmt2 = conn.statement2(f"insert into {dbname}.{ctb} using {dbname}.{stbname} tags (?,?) values(?,?,?,?)")
stmt2 = conn.statement2(f"insert into {dbname}.{ctb} using {dbname}.{stbname} tags (?,?) values(?,?,?,?,?)")
insert_bind_param_with_table(conn, stmt2, dbname, stbname, ctb)
print("insert child table ........................... ok\n")
stmt2.close()
Expand All @@ -480,13 +483,13 @@ def test_stmt2_insert(conn):
# stmt2.close()

# ntb1
stmt2 = conn.statement2(f"insert into {dbname}.{ntb1} values(?,?,?,?,?)")
stmt2 = conn.statement2(f"insert into {dbname}.{ntb1} values(?,?,?,?,?,?)")
insert_with_normal_tables(conn, stmt2, dbname, ntb1)
print("insert normal tables .......................... ok\n")
stmt2.close()

# ntb2
stmt2 = conn.statement2(f"insert into {dbname}.{ntb2} values(?,?,?,?,?)")
stmt2 = conn.statement2(f"insert into {dbname}.{ntb2} values(?,?,?,?,?,?)")
insert_bind_param_normal_tables(conn, stmt2, dbname, ntb2)
print("insert normal tables (bind param) ............. ok\n")
stmt2.close()
Expand Down Expand Up @@ -582,12 +585,12 @@ def test_stmt2_query(conn):
# stmt2.close()
# print("insert bind & execute ......................... ok\n")

conn.execute(f"insert into d2 using {stbname} tags('grade1', 2) values('2020-10-01 00:00:00.000', 'Mary2', false, 298)")
conn.execute(f"insert into d2 using {stbname} tags('grade1', 2) values('2020-10-01 00:00:00.001', 'Tom2', true, 280)")
conn.execute(f"insert into d2 using {stbname} tags('grade1', 2) values('2020-10-01 00:00:00.002', 'Jack2', true, 260)")
conn.execute(f"insert into d2 using {stbname} tags('grade1', 2) values('2020-10-01 00:00:00.003', 'Jane2', false, 2100)")
conn.execute(f"insert into d2 using {stbname} tags('grade1', 2) values('2020-10-01 00:00:00.004', 'alex2', true, 299)")
conn.execute(f"insert into d2 using {stbname} tags('grade1', 2) values('2020-10-01 00:00:00.005', NULL, false, NULL)")
conn.execute(f"insert into d2 using {stbname} tags('grade1', 2) values('2020-10-01 00:00:00.000', 'Mary2', false, 298, 'XXX')")
conn.execute(f"insert into d2 using {stbname} tags('grade1', 2) values('2020-10-01 00:00:00.001', 'Tom2', true, 280, 'YYY')")
conn.execute(f"insert into d2 using {stbname} tags('grade1', 2) values('2020-10-01 00:00:00.002', 'Jack2', true, 260, 'ZZZ')")
conn.execute(f"insert into d2 using {stbname} tags('grade1', 2) values('2020-10-01 00:00:00.003', 'Jane2', false, 2100, 'WWW')")
conn.execute(f"insert into d2 using {stbname} tags('grade1', 2) values('2020-10-01 00:00:00.004', 'alex2', true, 299, 'ZZZ')")
conn.execute(f"insert into d2 using {stbname} tags('grade1', 2) values('2020-10-01 00:00:00.005', NULL, false, NULL, 'WWW')")


# statement2
Expand Down
Loading
Loading