Skip to content

Commit 60e3631

Browse files
committed
changes
1 parent 000c287 commit 60e3631

10 files changed

+331
-0
lines changed
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package org.apache.flink.connector.redis.source;
2+
3+
import org.apache.flink.api.connector.source.SourceOutput;
4+
import org.apache.flink.connector.base.source.reader.RecordEmitter;
5+
6+
import java.util.function.Function;
7+
8+
public class RedisRecordEmitter<T> implements RecordEmitter<String, T, RedisSourceSplit> {
9+
10+
final Function<String, T> deserializer;
11+
12+
public RedisRecordEmitter(Function<String, T> deserializer) {
13+
this.deserializer = deserializer;
14+
}
15+
16+
@Override
17+
public void emitRecord(String element, SourceOutput<T> output, RedisSourceSplit testSourceSplit) throws Exception {
18+
output.collect(this.deserializer.apply(element));
19+
// output.collect(this.deserializer.apply(element), element.getTimestamp());
20+
}
21+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package org.apache.flink.connector.redis.source;
2+
3+
import org.apache.flink.api.connector.source.*;
4+
import org.apache.flink.configuration.Configuration;
5+
import org.apache.flink.core.io.SimpleVersionedSerializer;
6+
7+
import java.util.List;
8+
import java.util.function.Function;
9+
import java.util.function.Supplier;
10+
11+
public class RedisSource<T> implements Source<T, RedisSourceSplit, RedisStateEnumerator> {
12+
13+
private final Configuration configuration;
14+
private final Function<String, T> deserializer;
15+
16+
public RedisSource(Function<String, T> deserializer) {
17+
this.deserializer = deserializer;
18+
this.configuration = new Configuration();
19+
}
20+
21+
@Override
22+
public Boundedness getBoundedness() {
23+
return Boundedness.CONTINUOUS_UNBOUNDED;
24+
}
25+
26+
@Override
27+
public SourceReader<T, RedisSourceSplit> createReader(SourceReaderContext sourceReaderContext) throws Exception {
28+
final Supplier<RedisSplitReader> splitReaderSupplier = () -> new RedisSplitReader(events);
29+
RedisRecordEmitter<T> recordEmitter = new RedisRecordEmitter<>(this.deserializer);
30+
31+
return new RedisSourceReader<>(splitReaderSupplier, recordEmitter, this.configuration, sourceReaderContext);
32+
}
33+
34+
@Override
35+
public SplitEnumerator<RedisSourceSplit, RedisStateEnumerator> createEnumerator(SplitEnumeratorContext<RedisSourceSplit> splitEnumeratorContext) throws Exception {
36+
return new RedisSplitEnumerator(splitEnumeratorContext);
37+
}
38+
39+
@Override
40+
public SplitEnumerator<RedisSourceSplit, RedisStateEnumerator> restoreEnumerator(SplitEnumeratorContext<RedisSourceSplit> splitEnumeratorContext, RedisStateEnumerator c) throws Exception {
41+
return new RedisSplitEnumerator(splitEnumeratorContext);
42+
}
43+
44+
@Override
45+
public SimpleVersionedSerializer<RedisSourceSplit> getSplitSerializer() {
46+
return new RedisSplitSerializer();
47+
}
48+
49+
@Override
50+
public SimpleVersionedSerializer<RedisStateEnumerator> getEnumeratorCheckpointSerializer() {
51+
return new RedisStateSerializer();
52+
}
53+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package org.apache.flink.connector.redis.source;
2+
3+
import org.apache.flink.api.connector.source.SourceReaderContext;
4+
import org.apache.flink.configuration.ConfigOptions;
5+
import org.apache.flink.configuration.Configuration;
6+
import org.apache.flink.connector.base.source.reader.RecordEmitter;
7+
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
8+
9+
import java.util.Map;
10+
import java.util.function.Supplier;
11+
12+
public class RedisSourceReader<T> extends SingleThreadMultiplexSourceReaderBase<String, T, RedisSourceSplit, RedisSourceSplit> {
13+
14+
public RedisSourceReader(Supplier<RedisSplitReader> splitReaderSupplier,
15+
RecordEmitter<String, T, RedisSourceSplit> recordEmitter,
16+
Configuration config,
17+
SourceReaderContext context) {
18+
super(splitReaderSupplier::get, recordEmitter, config, context);
19+
}
20+
21+
@Override
22+
public void start() {
23+
// we request a split only if we did not get splits during the checkpoint restore
24+
if (this.getNumberOfCurrentlyAssignedSplits() == 0) {
25+
this.context.sendSplitRequest();
26+
}
27+
}
28+
29+
@Override
30+
protected void onSplitFinished(Map<String, RedisSourceSplit> map) {
31+
boolean finishSplits = map.isEmpty();
32+
if (finishSplits) {
33+
this.notifyNoMoreSplits();
34+
} else {
35+
this.context.sendSplitRequest();
36+
}
37+
}
38+
39+
@Override
40+
protected RedisSourceSplit initializedState(RedisSourceSplit testSourceSplit) {
41+
return testSourceSplit;
42+
}
43+
44+
@Override
45+
protected RedisSourceSplit toSplitType(String s, RedisSourceSplit testSourceSplit) {
46+
return testSourceSplit;
47+
}
48+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package org.apache.flink.connector.redis.source;
2+
3+
import org.apache.flink.api.connector.source.SourceSplit;
4+
5+
public class RedisSourceSplit implements SourceSplit {
6+
7+
final long id;
8+
9+
public RedisSourceSplit(long id) {
10+
this.id = id;
11+
}
12+
13+
@Override
14+
public String splitId() {
15+
return String.valueOf(this.id);
16+
}
17+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package org.apache.flink.connector.redis.source;
2+
3+
import org.apache.flink.api.connector.source.SplitEnumerator;
4+
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
5+
6+
import javax.annotation.Nullable;
7+
import java.io.IOException;
8+
import java.util.List;
9+
10+
public class RedisSplitEnumerator implements SplitEnumerator<RedisSourceSplit, RedisStateEnumerator> {
11+
12+
private final SplitEnumeratorContext<RedisSourceSplit> context;
13+
private boolean firstEmitted = false;
14+
15+
public RedisSplitEnumerator(SplitEnumeratorContext<RedisSourceSplit> context) {
16+
this.context = context;
17+
}
18+
19+
@Override
20+
public void start() {
21+
}
22+
23+
@Override
24+
public void handleSplitRequest(int subtaskId, @Nullable String s) {
25+
if (!this.firstEmitted) {
26+
this.context.assignSplit(new RedisSourceSplit(subtaskId), subtaskId);
27+
this.firstEmitted = true;
28+
} else {
29+
this.context.assignSplit(new RedisSourceSplit(System.currentTimeMillis()), subtaskId);
30+
}
31+
}
32+
33+
@Override
34+
public void addSplitsBack(List<RedisSourceSplit> list, int i) {
35+
}
36+
37+
@Override
38+
public void addReader(int i) {
39+
}
40+
41+
@Override
42+
public RedisStateEnumerator snapshotState(long l) throws Exception {
43+
return new RedisStateEnumerator();
44+
}
45+
46+
@Override
47+
public void close() throws IOException {
48+
}
49+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package org.apache.flink.connector.redis.source;
2+
3+
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
4+
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
5+
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
6+
7+
import java.io.IOException;
8+
import java.util.ArrayList;
9+
import java.util.List;
10+
11+
public class RedisSplitReader implements SplitReader<String, RedisSourceSplit> {
12+
13+
private final List<String> events;
14+
private RedisSourceSplit split;
15+
16+
public RedisSplitReader(List<String> events) {
17+
this.events = events;
18+
}
19+
20+
@Override
21+
public RecordsWithSplitIds<String> fetch() throws IOException {
22+
if (this.split == null) {
23+
return null;
24+
}
25+
if (split.splitId().equals("0")) {
26+
return new RedisSplitRecords(this.split.splitId(), events);
27+
} else {
28+
return new RedisSplitRecords(this.split.splitId(), new ArrayList<>());
29+
}
30+
}
31+
32+
@Override
33+
public void handleSplitsChanges(SplitsChange<RedisSourceSplit> splitsChange) {
34+
if (splitsChange.splits().isEmpty()) return;
35+
this.split = splitsChange.splits().get(0);
36+
}
37+
38+
@Override
39+
public void wakeUp() {
40+
}
41+
42+
@Override
43+
public void close() throws Exception {
44+
}
45+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package org.apache.flink.connector.redis.source;
2+
3+
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
4+
5+
import javax.annotation.Nullable;
6+
import java.util.*;
7+
8+
public class RedisSplitRecords implements RecordsWithSplitIds<String> {
9+
10+
private final String splitId;
11+
private final Iterator<String> recordIterator;
12+
13+
public RedisSplitRecords(String splitId, List<String> records) {
14+
this.splitId = splitId;
15+
this.recordIterator = records.iterator();
16+
}
17+
18+
@Nullable
19+
@Override
20+
public String nextSplit() {
21+
if (this.recordIterator.hasNext()) {
22+
return this.splitId;
23+
}
24+
return null;
25+
}
26+
27+
@Nullable
28+
@Override
29+
public String nextRecordFromSplit() {
30+
if (this.recordIterator.hasNext()) {
31+
return this.recordIterator.next();
32+
} else {
33+
return null;
34+
}
35+
}
36+
37+
@Override
38+
public Set<String> finishedSplits() {
39+
HashSet<String> ids = new HashSet<>();
40+
ids.add(splitId);
41+
return ids;
42+
}
43+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package org.apache.flink.connector.redis.source;
2+
3+
import org.apache.flink.core.io.SimpleVersionedSerializer;
4+
5+
import java.io.IOException;
6+
import java.nio.ByteBuffer;
7+
8+
public class RedisSplitSerializer implements SimpleVersionedSerializer<RedisSourceSplit> {
9+
10+
private static final int CURRENT_VERSION = 0;
11+
12+
@Override
13+
public int getVersion() {
14+
return CURRENT_VERSION;
15+
}
16+
17+
@Override
18+
public byte[] serialize(RedisSourceSplit testSplitReader) throws IOException {
19+
final ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
20+
buffer.putLong(0, testSplitReader.id);
21+
return buffer.array();
22+
}
23+
24+
@Override
25+
public RedisSourceSplit deserialize(int version, byte[] serialized) throws IOException {
26+
final ByteBuffer buffer = ByteBuffer.wrap(serialized);
27+
return new RedisSourceSplit(buffer.getLong());
28+
}
29+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package org.apache.flink.connector.redis.source;
2+
3+
public class RedisStateEnumerator {
4+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package org.apache.flink.connector.redis.source;
2+
3+
import org.apache.flink.core.io.SimpleVersionedSerializer;
4+
5+
import java.io.IOException;
6+
7+
public class RedisStateSerializer implements SimpleVersionedSerializer<RedisStateEnumerator> {
8+
@Override
9+
public int getVersion() {
10+
return 0;
11+
}
12+
13+
@Override
14+
public byte[] serialize(RedisStateEnumerator testStateEnumerator) throws IOException {
15+
return new byte[0];
16+
}
17+
18+
@Override
19+
public RedisStateEnumerator deserialize(int i, byte[] bytes) throws IOException {
20+
return new RedisStateEnumerator();
21+
}
22+
}

0 commit comments

Comments
 (0)