Skip to content

Commit 6e45552

Browse files
authored
support edge iterator (#521)
1 parent 9c546ca commit 6e45552

File tree

10 files changed

+268
-0
lines changed

10 files changed

+268
-0
lines changed

geaflow/geaflow-core/geaflow-api/src/main/java/com/antgroup/geaflow/api/graph/function/vc/base/VertexCentricFunction.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import com.antgroup.geaflow.api.context.RuntimeContext;
2323
import com.antgroup.geaflow.api.function.Function;
24+
import com.antgroup.geaflow.common.iterator.CloseableIterator;
2425
import com.antgroup.geaflow.model.graph.edge.IEdge;
2526
import com.antgroup.geaflow.model.graph.vertex.IVertex;
2627
import com.antgroup.geaflow.state.pushdown.filter.IFilter;
@@ -93,16 +94,31 @@ interface EdgeQuery<K, EV> {
9394
*/
9495
List<IEdge<K, EV>> getEdges();
9596

97+
/**
98+
* Returns the both edges iterator.
99+
*/
100+
CloseableIterator<IEdge<K, EV>> getEdgesIterator();
101+
96102
/**
97103
* Returns the out edges.
98104
*/
99105
List<IEdge<K, EV>> getOutEdges();
100106

107+
/**
108+
* Returns the out edges iterator.
109+
*/
110+
CloseableIterator<IEdge<K, EV>> getOutEdgesIterator();
111+
101112
/**
102113
* Returns the in edges.
103114
*/
104115
List<IEdge<K, EV>> getInEdges();
105116

117+
/**
118+
* Returns the in edges iterator.
119+
*/
120+
CloseableIterator<IEdge<K, EV>> getInEdgesIterator();
121+
106122
/**
107123
* Get the edges which satisfies filter condition.
108124
*/

geaflow/geaflow-core/geaflow-runtime/geaflow-operator/src/main/java/com/antgroup/geaflow/operator/impl/graph/algo/vc/context/dynamic/DynamicEdgeQueryImpl.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222

2323
import com.antgroup.geaflow.api.graph.function.vc.base.VertexCentricFunction.EdgeQuery;
24+
import com.antgroup.geaflow.common.iterator.CloseableIterator;
2425
import com.antgroup.geaflow.model.graph.edge.IEdge;
2526
import com.antgroup.geaflow.state.DynamicEdgeState;
2627
import com.antgroup.geaflow.state.GraphState;
@@ -57,18 +58,36 @@ public List<IEdge<K, EV>> getEdges() {
5758
return edgeState.query(versionId, vId).asList();
5859
}
5960

61+
@Override
62+
public CloseableIterator<IEdge<K, EV>> getEdgesIterator() {
63+
DynamicEdgeState<K, VV, EV> edgeState = graphState.dynamicGraph().E();
64+
return edgeState.query(versionId, vId).iterator();
65+
}
66+
6067
@Override
6168
public List<IEdge<K, EV>> getOutEdges() {
6269
DynamicEdgeState<K, VV, EV> edgeState = graphState.dynamicGraph().E();
6370
return edgeState.query(versionId, vId).by(OutEdgeFilter.instance()).asList();
6471
}
6572

73+
@Override
74+
public CloseableIterator<IEdge<K, EV>> getOutEdgesIterator() {
75+
DynamicEdgeState<K, VV, EV> edgeState = graphState.dynamicGraph().E();
76+
return edgeState.query(versionId, vId).by(OutEdgeFilter.instance()).iterator();
77+
}
78+
6679
@Override
6780
public List<IEdge<K, EV>> getInEdges() {
6881
DynamicEdgeState<K, VV, EV> edgeState = graphState.dynamicGraph().E();
6982
return edgeState.query(versionId, vId).by(InEdgeFilter.instance()).asList();
7083
}
7184

85+
@Override
86+
public CloseableIterator<IEdge<K, EV>> getInEdgesIterator() {
87+
DynamicEdgeState<K, VV, EV> edgeState = graphState.dynamicGraph().E();
88+
return edgeState.query(versionId, vId).by(InEdgeFilter.instance()).iterator();
89+
}
90+
7291
@Override
7392
public List<IEdge<K, EV>> getEdges(IFilter edgeFilter) {
7493
DynamicEdgeState<K, VV, EV> edgeState = graphState.dynamicGraph().E();

geaflow/geaflow-core/geaflow-runtime/geaflow-operator/src/main/java/com/antgroup/geaflow/operator/impl/graph/algo/vc/context/statical/StaticEdgeQueryImpl.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package com.antgroup.geaflow.operator.impl.graph.algo.vc.context.statical;
2121

2222
import com.antgroup.geaflow.api.graph.function.vc.base.VertexCentricFunction.EdgeQuery;
23+
import com.antgroup.geaflow.common.iterator.CloseableIterator;
2324
import com.antgroup.geaflow.model.graph.edge.IEdge;
2425
import com.antgroup.geaflow.state.GraphState;
2526
import com.antgroup.geaflow.state.pushdown.filter.IFilter;
@@ -50,16 +51,31 @@ public List<IEdge<K, EV>> getEdges() {
5051
return graphState.staticGraph().E().query(vId).asList();
5152
}
5253

54+
@Override
55+
public CloseableIterator<IEdge<K, EV>> getEdgesIterator() {
56+
return graphState.staticGraph().E().query(vId).iterator();
57+
}
58+
5359
@Override
5460
public List<IEdge<K, EV>> getOutEdges() {
5561
return graphState.staticGraph().E().query(vId).by(OutEdgeFilter.instance()).asList();
5662
}
5763

64+
@Override
65+
public CloseableIterator<IEdge<K, EV>> getOutEdgesIterator() {
66+
return graphState.staticGraph().E().query(vId).by(OutEdgeFilter.instance()).iterator();
67+
}
68+
5869
@Override
5970
public List<IEdge<K, EV>> getInEdges() {
6071
return graphState.staticGraph().E().query(vId).by(InEdgeFilter.instance()).asList();
6172
}
6273

74+
@Override
75+
public CloseableIterator<IEdge<K, EV>> getInEdgesIterator() {
76+
return graphState.staticGraph().E().query(vId).by(InEdgeFilter.instance()).iterator();
77+
}
78+
6379
@Override
6480
public List<IEdge<K, EV>> getEdges(IFilter edgeFilter) {
6581
return graphState.staticGraph().E().query(vId).by(edgeFilter).asList();

geaflow/geaflow-dsl/geaflow-dsl-common/src/main/java/com/antgroup/geaflow/dsl/common/algo/AlgorithmRuntimeContext.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package com.antgroup.geaflow.dsl.common.algo;
2121

2222
import com.antgroup.geaflow.common.config.Configuration;
23+
import com.antgroup.geaflow.common.iterator.CloseableIterator;
2324
import com.antgroup.geaflow.dsl.common.data.Row;
2425
import com.antgroup.geaflow.dsl.common.data.RowEdge;
2526
import com.antgroup.geaflow.dsl.common.types.GraphSchema;
@@ -34,6 +35,8 @@ public interface AlgorithmRuntimeContext<K, M> {
3435

3536
List<RowEdge> loadDynamicEdges(EdgeDirection direction);
3637

38+
CloseableIterator<RowEdge> loadStaticEdgesIterator(EdgeDirection direction);
39+
3740
void sendMessage(K vertexId, M message);
3841

3942
void updateVertexValue(Row value);

geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/com/antgroup/geaflow/dsl/runtime/engine/GeaFlowAlgorithmDynamicRuntimeContext.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,20 @@ public List<RowEdge> loadStaticEdges(EdgeDirection direction) {
169169
return rowEdges;
170170
}
171171

172+
@Override
173+
public CloseableIterator<RowEdge> loadStaticEdgesIterator(EdgeDirection direction) {
174+
switch (direction) {
175+
case OUT:
176+
return (CloseableIterator) edgeQuery.getOutEdgesIterator();
177+
case IN:
178+
return (CloseableIterator) edgeQuery.getInEdgesIterator();
179+
case BOTH:
180+
return (CloseableIterator) edgeQuery.getEdgesIterator();
181+
default:
182+
throw new GeaFlowDSLException("Illegal edge direction: " + direction);
183+
}
184+
}
185+
172186
@Override
173187
public void sendMessage(Object vertexId, Object message) {
174188
incVCTraversalCtx.sendMessage(vertexId, message);

geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/com/antgroup/geaflow/dsl/runtime/engine/GeaFlowAlgorithmRuntimeContext.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.antgroup.geaflow.api.graph.function.vc.VertexCentricTraversalFunction.TraversalEdgeQuery;
2424
import com.antgroup.geaflow.api.graph.function.vc.VertexCentricTraversalFunction.VertexCentricTraversalFuncContext;
2525
import com.antgroup.geaflow.common.config.Configuration;
26+
import com.antgroup.geaflow.common.iterator.CloseableIterator;
2627
import com.antgroup.geaflow.dsl.common.algo.AlgorithmRuntimeContext;
2728
import com.antgroup.geaflow.dsl.common.data.Row;
2829
import com.antgroup.geaflow.dsl.common.data.RowEdge;
@@ -93,6 +94,20 @@ public List<RowEdge> loadDynamicEdges(EdgeDirection direction) {
9394
throw new RuntimeException("GeaFlowAlgorithmRuntimeContext not support loadDynamicEdges");
9495
}
9596

97+
@Override
98+
public CloseableIterator<RowEdge> loadStaticEdgesIterator(EdgeDirection direction) {
99+
switch (direction) {
100+
case OUT:
101+
return (CloseableIterator) edgeQuery.getOutEdgesIterator();
102+
case IN:
103+
return (CloseableIterator) edgeQuery.getInEdgesIterator();
104+
case BOTH:
105+
return (CloseableIterator) edgeQuery.getEdgesIterator();
106+
default:
107+
throw new GeaFlowDSLException("Illegal edge direction: " + direction);
108+
}
109+
}
110+
96111
@Override
97112
public void sendMessage(Object vertexId, Object message) {
98113
traversalContext.sendMessage(vertexId, message);

geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/com/antgroup/geaflow/dsl/runtime/query/GQLAlgorithmTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,15 @@ public void testAlgorithmCommonNeighbors() throws Exception {
258258
.checkSinkResult();
259259
}
260260

261+
@Test
262+
public void testEdgeIterator() throws Exception {
263+
QueryTester
264+
.build()
265+
.withQueryPath("/query/gql_edge_iterator_test.sql")
266+
.execute()
267+
.checkSinkResult();
268+
}
269+
261270
private void clearGraph() throws IOException {
262271
File file = new File(TEST_GRAPH_PATH);
263272
if (file.exists()) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package com.antgroup.geaflow.dsl.runtime.query.udf;
21+
22+
import com.antgroup.geaflow.common.iterator.CloseableIterator;
23+
import com.antgroup.geaflow.common.type.primitive.LongType;
24+
import com.antgroup.geaflow.dsl.common.algo.AlgorithmRuntimeContext;
25+
import com.antgroup.geaflow.dsl.common.algo.AlgorithmUserFunction;
26+
import com.antgroup.geaflow.dsl.common.data.Row;
27+
import com.antgroup.geaflow.dsl.common.data.RowEdge;
28+
import com.antgroup.geaflow.dsl.common.data.RowVertex;
29+
import com.antgroup.geaflow.dsl.common.data.impl.ObjectRow;
30+
import com.antgroup.geaflow.dsl.common.function.Description;
31+
import com.antgroup.geaflow.dsl.common.types.GraphSchema;
32+
import com.antgroup.geaflow.dsl.common.types.StructType;
33+
import com.antgroup.geaflow.dsl.common.types.TableField;
34+
import com.antgroup.geaflow.model.graph.edge.EdgeDirection;
35+
import java.util.Iterator;
36+
import java.util.Optional;
37+
import org.slf4j.Logger;
38+
import org.slf4j.LoggerFactory;
39+
40+
@Description(name = "test_edge_iterator", description = "built-in udga for WeakConnectedComponents")
41+
public class TestEdgeIteratorUdf implements AlgorithmUserFunction<Object, Long> {
42+
43+
private static final Logger LOGGER = LoggerFactory.getLogger(TestEdgeIteratorUdf.class);
44+
45+
private AlgorithmRuntimeContext<Object, Long> context;
46+
47+
private int iteration = 5;
48+
49+
private int edgeLimit = 100;
50+
51+
@Override
52+
public void init(AlgorithmRuntimeContext<Object, Long> context, Object[] parameters) {
53+
this.context = context;
54+
if (parameters.length > 0) {
55+
iteration = Integer.parseInt(String.valueOf(parameters[0]));
56+
}
57+
if (parameters.length > 1) {
58+
edgeLimit = Integer.parseInt(String.valueOf(parameters[1]));
59+
}
60+
}
61+
62+
@Override
63+
public void process(RowVertex vertex, Optional<Row> updatedValues, Iterator<Long> messages) {
64+
updatedValues.ifPresent(vertex::setValue);
65+
CloseableIterator<RowEdge> edgesIterator = context.loadStaticEdgesIterator(EdgeDirection.BOTH);
66+
if (context.getCurrentIterationId() < iteration) {
67+
int count = 0;
68+
while (edgesIterator.hasNext() && count < edgeLimit) {
69+
RowEdge next = edgesIterator.next();
70+
context.sendMessage(next.getTargetId(), context.getCurrentIterationId());
71+
count++;
72+
}
73+
}
74+
}
75+
76+
@Override
77+
public void finish(RowVertex graphVertex, Optional<Row> updatedValues) {
78+
updatedValues.ifPresent(graphVertex::setValue);
79+
long iteration = (long) graphVertex.getValue().getField(0, LongType.INSTANCE);
80+
context.take(ObjectRow.create(graphVertex.getId(), iteration));
81+
}
82+
83+
@Override
84+
public StructType getOutputType(GraphSchema graphSchema) {
85+
return new StructType(
86+
new TableField("id", graphSchema.getIdType(), false),
87+
new TableField("iteration", LongType.INSTANCE, false)
88+
);
89+
}
90+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
1,1
2+
5,1
3+
3,1
4+
4,1
5+
2,1
6+
6,1
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
set geaflow.dsl.window.size = -1;
21+
set geaflow.dsl.ignore.exception = true;
22+
create function test_edge_iterator as 'com.antgroup.geaflow.dsl.runtime.query.udf.TestEdgeIteratorUdf';
23+
24+
25+
CREATE GRAPH IF NOT EXISTS g4 (
26+
Vertex v4 (
27+
vid varchar ID,
28+
vvalue int
29+
),
30+
Edge e4 (
31+
srcId varchar SOURCE ID,
32+
targetId varchar DESTINATION ID
33+
)
34+
) WITH (
35+
storeType='rocksdb',
36+
shardCount = 1
37+
);
38+
39+
CREATE TABLE IF NOT EXISTS v_source (
40+
v_id varchar,
41+
v_value int,
42+
ts varchar,
43+
type varchar
44+
) WITH (
45+
type='file',
46+
geaflow.dsl.file.path = 'resource:///input/test_vertex'
47+
);
48+
49+
CREATE TABLE IF NOT EXISTS e_source (
50+
src_id varchar,
51+
dst_id varchar
52+
) WITH (
53+
type='file',
54+
geaflow.dsl.file.path = 'resource:///input/test_edge'
55+
);
56+
57+
CREATE TABLE IF NOT EXISTS tbl_result (
58+
v_id varchar,
59+
iteration long
60+
) WITH (
61+
type='file',
62+
geaflow.dsl.file.path = '${target}'
63+
);
64+
65+
USE GRAPH g4;
66+
67+
INSERT INTO g4.v4(vid, vvalue)
68+
SELECT
69+
v_id, v_value
70+
FROM v_source;
71+
72+
INSERT INTO g4.e4(srcId, targetId)
73+
SELECT
74+
src_id, dst_id
75+
FROM e_source;
76+
77+
INSERT INTO tbl_result(v_id, iteration)
78+
CALL test_edge_iterator(3, 2) YIELD (vid, iteration)
79+
RETURN vid, iteration
80+
;

0 commit comments

Comments
 (0)