Update TamagotchiService.java
This commit is contained in:
parent
cd930c6b99
commit
5cd8e408ef
1 changed files with 34 additions and 1 deletions
|
@ -6,6 +6,7 @@ import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
@ -15,13 +16,19 @@ import com.sap.tamagotchi.publisher.PublisherService;
|
||||||
@Service
|
@Service
|
||||||
public class TamagotchiService {
|
public class TamagotchiService {
|
||||||
|
|
||||||
|
private static final long DEVICE_EVENT_PROCESSOR_SCHEDULE = 5_000;
|
||||||
|
|
||||||
|
private final Logger logger;
|
||||||
|
|
||||||
private final PublisherService publisherService;
|
private final PublisherService publisherService;
|
||||||
|
|
||||||
private final Map<String, Device> deviceRegistry = Collections.synchronizedMap(new HashMap<>());
|
private final Map<String, Device> deviceRegistry = Collections.synchronizedMap(new HashMap<>());
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
public TamagotchiService(PublisherService publisherService) {
|
public TamagotchiService(PublisherService publisherService, Logger logger) {
|
||||||
this.publisherService = publisherService;
|
this.publisherService = publisherService;
|
||||||
|
this.logger = logger;
|
||||||
|
startDeviceEventProcessor();
|
||||||
}
|
}
|
||||||
|
|
||||||
public Device getDevice(String deviceId) {
|
public Device getDevice(String deviceId) {
|
||||||
|
@ -37,11 +44,37 @@ public class TamagotchiService {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Device createDevice(Device device) {
|
public Device createDevice(Device device) {
|
||||||
|
device.start();
|
||||||
deviceRegistry.put(device.getDeviceId(), device);
|
deviceRegistry.put(device.getDeviceId(), device);
|
||||||
return device;
|
return device;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processDeviceEvents() {
|
private void processDeviceEvents() {
|
||||||
|
|
||||||
|
deviceRegistry
|
||||||
|
.values()
|
||||||
|
.parallelStream()
|
||||||
|
.filter(device -> device.hasMessages())
|
||||||
|
.flatMap(device -> device.getMessages().stream())
|
||||||
|
.forEach(message -> {
|
||||||
|
try {
|
||||||
|
publisherService.publish(message);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
throw new RuntimeException(ex);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public void startDeviceEventProcessor() {
|
||||||
|
new Thread(() -> {
|
||||||
|
try {
|
||||||
|
while (true) {
|
||||||
|
processDeviceEvents();
|
||||||
|
Thread.sleep(DEVICE_EVENT_PROCESSOR_SCHEDULE);
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
logger.error("DeviceEventProcessor failed: {}", e.getMessage());
|
||||||
|
}
|
||||||
|
}).start();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue