Skip to content

Commit c4fe7e1

Browse files
authored
Merge pull request #498 from alex268/query_reader_fixes
Update DataReader implementation & added tests
2 parents 65f7521 + 256dda7 commit c4fe7e1

File tree

2 files changed

+228
-162
lines changed

2 files changed

+228
-162
lines changed

query/src/main/java/tech/ydb/query/tools/QueryReader.java

Lines changed: 57 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import java.util.SortedMap;
99
import java.util.TreeMap;
1010
import java.util.concurrent.CompletableFuture;
11-
import java.util.stream.Collectors;
1211

1312
import tech.ydb.core.Issue;
1413
import tech.ydb.core.Result;
@@ -25,12 +24,12 @@
2524
*/
2625
public class QueryReader implements Iterable<ResultSetReader> {
2726
private final QueryInfo info;
28-
private final List<Issue> isssues;
29-
private final List<ResultSetParts> results;
27+
private final List<Issue> issues;
28+
private final List<ResultSetReader> results;
3029

31-
QueryReader(QueryInfo info, List<Issue> issues, List<ResultSetParts> results) {
30+
private QueryReader(QueryInfo info, List<Issue> issues, List<ResultSetReader> results) {
3231
this.info = info;
33-
this.isssues = issues;
32+
this.issues = issues;
3433
this.results = results;
3534
}
3635

@@ -43,11 +42,11 @@ public int getResultSetCount() {
4342
}
4443

4544
public ResultSetReader getResultSet(int index) {
46-
return new CompositeResultSet(results.get(index).getParts());
45+
return results.get(index);
4746
}
4847

4948
public List<Issue> getIssueList() {
50-
return this.isssues;
49+
return this.issues;
5150
}
5251

5352
public static CompletableFuture<Result<QueryReader>> readFrom(QueryStream stream) {
@@ -57,45 +56,32 @@ public static CompletableFuture<Result<QueryReader>> readFrom(QueryStream stream
5756

5857
@Override
5958
public Iterator<ResultSetReader> iterator() {
60-
return new IteratorImpl(results.iterator());
61-
}
62-
63-
private class IteratorImpl implements Iterator<ResultSetReader> {
64-
private final Iterator<ResultSetParts> iter;
65-
66-
IteratorImpl(Iterator<ResultSetParts> iter) {
67-
this.iter = iter;
68-
}
69-
70-
@Override
71-
public boolean hasNext() {
72-
return iter.hasNext();
73-
}
74-
75-
@Override
76-
public ResultSetReader next() {
77-
return new CompositeResultSet(iter.next().getParts());
78-
}
59+
return results.iterator();
7960
}
8061

8162
private static class PartsCollector implements QueryStream.PartsHandler {
8263
private final List<Issue> issueList = new ArrayList<>();
83-
private final SortedMap<Long, ResultSetParts> results = new TreeMap<>();
64+
private final SortedMap<Long, List<QueryResultPart>> results = new TreeMap<>();
8465

8566
QueryReader toReader(QueryInfo info) {
86-
List<ResultSetParts> ordered = new ArrayList<>();
67+
List<List<QueryResultPart>> ordered = new ArrayList<>();
8768
long lastInserted = 0;
88-
for (Map.Entry<Long, ResultSetParts> entry: results.entrySet()) {
69+
for (Map.Entry<Long, List<QueryResultPart>> entry: results.entrySet()) {
8970
long key = entry.getKey();
9071
while (lastInserted + 1 < key) {
91-
ordered.add(new ResultSetParts(lastInserted));
72+
ordered.add(new ArrayList<>()); // add empty result for skipped indexes
9273
lastInserted++;
9374
}
9475
ordered.add(entry.getValue());
9576
lastInserted = key;
9677
}
9778

98-
return new QueryReader(info, issueList, ordered);
79+
List<ResultSetReader> resultsList = new ArrayList<>(ordered.size());
80+
for (List<QueryResultPart> queryResult: ordered) {
81+
resultsList.add(new CompositeResultSet(queryResult));
82+
}
83+
84+
return new QueryReader(info, issueList, resultsList);
9985
}
10086

10187
@Override
@@ -107,42 +93,27 @@ public void onIssues(Issue[] issues) {
10793
public void onNextPart(QueryResultPart part) {
10894
Long index = part.getResultSetIndex();
10995
if (!results.containsKey(index)) {
110-
results.put(index, new ResultSetParts(index));
96+
results.put(index, new ArrayList<>());
11197
}
112-
results.get(index).addPart(part);
113-
}
114-
}
115-
116-
static class ResultSetParts {
117-
private final long resultSetIndex;
118-
private final List<QueryResultPart> parts = new ArrayList<>();
119-
120-
ResultSetParts(long index) {
121-
this.resultSetIndex = index;
122-
}
123-
124-
public void addPart(QueryResultPart part) {
125-
parts.add(part);
126-
}
127-
128-
public long getIndex() {
129-
return resultSetIndex;
130-
}
131-
132-
public List<QueryResultPart> getParts() {
133-
return parts;
98+
results.get(index).add(part);
13499
}
135100
}
136101

137102
private static class CompositeResultSet implements ResultSetReader {
138-
private final List<ResultSetReader> parts;
103+
private final ResultSetReader[] parts;
139104
private final int rowsCount;
140105
private int partIndex = -1;
141106

142107
CompositeResultSet(List<QueryResultPart> list) {
143-
this.parts = list.stream().map(QueryResultPart::getResultSetReader).collect(Collectors.toList());
144-
this.rowsCount = list.stream().mapToInt(QueryResultPart::getResultSetRowsCount).sum();
145-
this.partIndex = parts.isEmpty() ? -1 : 0;
108+
this.parts = new ResultSetReader[list.size()];
109+
int count = 0;
110+
int idx = 0;
111+
for (QueryResultPart part: list) {
112+
this.parts[idx++] = part.getResultSetReader();
113+
count += part.getResultSetRowsCount();
114+
}
115+
this.rowsCount = count;
116+
this.partIndex = list.isEmpty() ? -1 : 0;
146117
}
147118

148119
@Override
@@ -155,47 +126,47 @@ public int getColumnCount() {
155126
if (partIndex < 0) {
156127
return 0;
157128
}
158-
return parts.get(partIndex).getColumnCount();
129+
return parts[partIndex].getColumnCount();
159130
}
160131

161132
@Override
162133
public String getColumnName(int index) {
163134
if (partIndex < 0) {
164135
return null;
165136
}
166-
return parts.get(partIndex).getColumnName(index);
137+
return parts[partIndex].getColumnName(index);
167138
}
168139

169140
@Override
170141
public int getColumnIndex(String name) {
171142
if (partIndex < 0) {
172143
return -1;
173144
}
174-
return parts.get(partIndex).getColumnIndex(name);
145+
return parts[partIndex].getColumnIndex(name);
175146
}
176147

177148
@Override
178149
public ValueReader getColumn(int index) {
179150
if (partIndex < 0) {
180151
return null;
181152
}
182-
return parts.get(partIndex).getColumn(index);
153+
return parts[partIndex].getColumn(index);
183154
}
184155

185156
@Override
186157
public ValueReader getColumn(String name) {
187158
if (partIndex < 0) {
188159
return null;
189160
}
190-
return parts.get(partIndex).getColumn(name);
161+
return parts[partIndex].getColumn(name);
191162
}
192163

193164
@Override
194165
public Type getColumnType(int index) {
195166
if (partIndex < 0) {
196167
return null;
197168
}
198-
return parts.get(partIndex).getColumnType(index);
169+
return parts[partIndex].getColumnType(index);
199170
}
200171

201172
@Override
@@ -205,20 +176,32 @@ public int getRowCount() {
205176

206177
@Override
207178
public void setRowIndex(int index) {
179+
// TODO: Enable after JDBC fixing
180+
// if (index < 0 || index >= rowsCount) {
181+
// throw new IndexOutOfBoundsException(String.format("Index %s out of bounds for length %s",
182+
// index, rowsCount));
183+
// }
184+
// int currentIdx = index;
185+
int currentIdx = Math.max(0, index);
208186
partIndex = 0;
209-
int currentIdx = index;
210-
while (partIndex < parts.size()) {
211-
int readerRows = parts.get(partIndex).getRowCount();
187+
while (partIndex < parts.length) {
188+
int readerRows = parts[partIndex].getRowCount();
212189
if (currentIdx < readerRows) {
213-
parts.get(partIndex).setRowIndex(currentIdx);
190+
parts[partIndex].setRowIndex(currentIdx);
214191
break;
215192
}
216-
parts.get(partIndex).setRowIndex(readerRows - 1);
193+
parts[partIndex].setRowIndex(readerRows);
217194
currentIdx -= readerRows;
218195
partIndex++;
219196
}
220-
for (int partStep = partIndex + 1; partStep < parts.size(); partStep++) {
221-
parts.get(partStep).setRowIndex(0);
197+
198+
// TODO: remove after JDBC fixing
199+
if (partIndex >= parts.length) {
200+
partIndex = parts.length - 1;
201+
}
202+
203+
for (int partStep = partIndex + 1; partStep < parts.length; partStep++) {
204+
parts[partStep].setRowIndex(0);
222205
}
223206
}
224207

@@ -227,10 +210,10 @@ public boolean next() {
227210
if (partIndex < 0) {
228211
return false;
229212
}
230-
boolean res = parts.get(partIndex).next();
231-
while (!res && partIndex < parts.size() - 1) {
213+
boolean res = parts[partIndex].next();
214+
while (!res && partIndex < parts.length - 1) {
232215
partIndex++;
233-
res = parts.get(partIndex).next();
216+
res = parts[partIndex].next();
234217
}
235218
return res;
236219
}

0 commit comments

Comments
 (0)