Skip to content

Commit a0e9354

Browse files
committed
refactor message receiver
1 parent b073e04 commit a0e9354

File tree

1 file changed

+75
-66
lines changed

1 file changed

+75
-66
lines changed

src/main/java/dev/ancaghenade/shipmentlistdemo/controller/MessageReceiver.java

Lines changed: 75 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@
22

33
import dev.ancaghenade.shipmentlistdemo.service.ShipmentService;
44
import io.awspring.cloud.sqs.annotation.SqsListener;
5+
56
import java.io.IOException;
6-
import java.util.ArrayList;
77
import java.util.List;
88
import java.util.concurrent.CopyOnWriteArrayList;
9+
910
import org.json.JSONObject;
1011
import org.slf4j.Logger;
1112
import org.slf4j.LoggerFactory;
@@ -20,70 +21,78 @@
2021
@RestController
2122
public class MessageReceiver {
2223

23-
private static final Logger LOGGER = LoggerFactory.getLogger(MessageReceiver.class);
24-
private final List<SseEmitter> emitters = new CopyOnWriteArrayList<>();
25-
26-
private final ShipmentService shipmentService;
27-
28-
@Autowired
29-
public MessageReceiver(ShipmentService shipmentService) {
30-
this.shipmentService = shipmentService;
31-
}
32-
33-
@SqsListener(value = "update_shipment_picture_queue")
34-
public void loadMessagesFromQueue(String notification) {
35-
LOGGER.info("Message from queue {}", notification);
36-
37-
JSONObject obj = new JSONObject(notification);
38-
String message = obj.getString("Message");
39-
String shipmentId = message.split("/")[0];
40-
41-
shipmentService.updateImageLink(shipmentId, message);
42-
43-
List<SseEmitter> deadEmitters = new ArrayList<>();
44-
emitters.forEach(emitter -> {
45-
try {
46-
emitter.send(shipmentId);
47-
LOGGER.info("SSE event sent to emitter: {}", emitter);
48-
} catch (IOException e) {
49-
LOGGER.error("Error sending SSE event: {}", e.getMessage(), e);
50-
emitter.completeWithError(e);
51-
deadEmitters.add(emitter);
52-
}
53-
});
54-
emitters.removeAll(deadEmitters);
55-
}
56-
57-
@GetMapping("/push-endpoint")
58-
@CrossOrigin(origins = "http://localhost:3000")
59-
public SseEmitter pushData() {
60-
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
61-
62-
emitter.onCompletion(() -> {
63-
emitters.remove(emitter);
64-
});
65-
66-
emitter.onTimeout(() -> {
67-
emitters.remove(emitter);
68-
LOGGER.info("SseEmitter timed out.");
69-
});
70-
71-
emitter.onError(e -> {
72-
emitters.remove(emitter);
73-
LOGGER.error("SseEmitter error: {}", e.getMessage(), e);
74-
});
75-
76-
emitters.add(emitter);
77-
return emitter;
78-
}
79-
80-
81-
private void sleep(SseEmitter sseEmitter) {
82-
try {
83-
Thread.sleep(1000);
84-
} catch (InterruptedException e) {
85-
e.printStackTrace();
86-
sseEmitter.completeWithError(e);
24+
private static final Logger LOGGER = LoggerFactory.getLogger(MessageReceiver.class);
25+
private final List<SseEmitter> emitters = new CopyOnWriteArrayList<>();
26+
27+
private final ShipmentService shipmentService;
28+
29+
@Autowired
30+
public MessageReceiver(ShipmentService shipmentService) {
31+
this.shipmentService = shipmentService;
32+
}
33+
34+
@SqsListener(value = "update_shipment_picture_queue")
35+
public void loadMessagesFromQueue(String notification) {
36+
LOGGER.info("Message from queue {} ", notification);
37+
38+
var obj = new JSONObject(notification);
39+
var message = obj.getString("Message");
40+
var shipmentId = message.split("/")[0];
41+
42+
shipmentService.updateImageLink(shipmentId, message);
43+
44+
emitters.forEach(emitter -> {
45+
try {
46+
emitter.send(shipmentId);
47+
LOGGER.info("SSE event sent to emitter: {}", emitter);
48+
sleep(emitter);
49+
} catch (IOException e) {
50+
LOGGER.error("Error sending SSE event: {}", e.getMessage(), e);
51+
emitter.completeWithError(e);
52+
}
53+
emitter.complete();
54+
});
55+
}
56+
57+
@GetMapping(value = "/push-endpoint")
58+
@CrossOrigin(origins = "http://localhost:3000")
59+
public SseEmitter pushData() {
60+
61+
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
62+
63+
emitter.onCompletion(() -> {
64+
synchronized (emitters) {
65+
emitters.remove(emitter);
66+
}
67+
});
68+
69+
emitter.onTimeout(() -> {
70+
synchronized (emitters) {
71+
emitters.remove(emitter);
72+
}
73+
emitter.complete();
74+
LOGGER.info("SseEmitter is timed out");
75+
});
76+
77+
emitter.onError(e -> {
78+
synchronized (emitters) {
79+
emitters.remove(emitter);
80+
}
81+
});
82+
83+
synchronized (emitters) {
84+
emitters.add(emitter);
85+
}
86+
87+
return emitter;
88+
}
89+
90+
private void sleep(SseEmitter sseEmitter) {
91+
try {
92+
Thread.sleep(1000);
93+
} catch (InterruptedException e) {
94+
e.printStackTrace();
95+
sseEmitter.completeWithError(e);
96+
}
8797
}
88-
}
8998
}

0 commit comments

Comments
 (0)