10
10
from typing import Union
11
11
12
12
import pandas as pd
13
+ import progressbar
13
14
from fastapi import BackgroundTasks
14
15
from fastapi import UploadFile
15
16
from pandas import ExcelFile
16
17
from sqlmodel import Session
17
18
from sqlmodel import SQLModel
19
+ from sqlmodel .main import SQLModelMetaclass
18
20
from sqlmodel .sql .expression import SelectOfScalar
19
21
22
+ from ecodev_core .db_upsertion import BATCH_SIZE
20
23
from ecodev_core .logger import log_critical
21
24
from ecodev_core .logger import logger_get
22
25
from ecodev_core .pydantic_utils import CustomFrozen
23
26
from ecodev_core .safe_utils import SimpleReturn
24
27
25
-
26
28
log = logger_get (__name__ )
27
29
28
30
@@ -72,7 +74,7 @@ async def insert_file(file: UploadFile, insertor: Insertor, session: Session) ->
72
74
insert_data (df_raw , insertor , session )
73
75
74
76
75
- def insert_data (df : Union [pd .DataFrame , ExcelFile ], insertor : Insertor , session : Session ) -> None :
77
+ def insert_data (df : Union [pd .DataFrame , ExcelFile ], insertor : Insertor , session : Session ) -> None :
76
78
"""
77
79
Inserts a csv/df into a database
78
80
"""
@@ -81,6 +83,24 @@ def insert_data(df: Union[pd.DataFrame, ExcelFile], insertor: Insertor, session
81
83
session .commit ()
82
84
83
85
86
+ def insert_batch_data (data : list [dict | SQLModelMetaclass ],
87
+ session : Session ,
88
+ raw_db_schema : SQLModelMetaclass | None = None ) -> None :
89
+ """
90
+ Insert the passed list of dicts (corresponding to db_schema) into db_schema db.
91
+ Warning: this only inserts data, without checking for pre-existence.
92
+ Ensure deleting the data before using it to avoid duplicates.
93
+ """
94
+ db_schema = raw_db_schema or data [0 ].__class__
95
+ batches = [data [i :i + BATCH_SIZE ] for i in range (0 , len (data ), BATCH_SIZE )]
96
+
97
+ for batch in progressbar .progressbar (batches , redirect_stdout = False ):
98
+ for row in batch :
99
+ new_object = db_schema (** row ) if isinstance (row , dict ) else row
100
+ session .add (new_object )
101
+ session .commit ()
102
+
103
+
84
104
def create_or_update (session : Session , row : Dict , insertor : Insertor ) -> SQLModel :
85
105
"""
86
106
Create a new row in db if the selector insertor does not find existing row in db. Update the row
0 commit comments