diff --git a/src/tamagotchi-service/src/main/java/com/sap/tamagotchi/service/TamagotchiService.java b/src/tamagotchi-service/src/main/java/com/sap/tamagotchi/service/TamagotchiService.java index e793207..e76e643 100644 --- a/src/tamagotchi-service/src/main/java/com/sap/tamagotchi/service/TamagotchiService.java +++ b/src/tamagotchi-service/src/main/java/com/sap/tamagotchi/service/TamagotchiService.java @@ -6,6 +6,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; +import org.slf4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -15,13 +16,19 @@ import com.sap.tamagotchi.publisher.PublisherService; @Service public class TamagotchiService { + private static final long DEVICE_EVENT_PROCESSOR_SCHEDULE = 5_000; + + private final Logger logger; + private final PublisherService publisherService; private final Map deviceRegistry = Collections.synchronizedMap(new HashMap<>()); @Autowired - public TamagotchiService(PublisherService publisherService) { + public TamagotchiService(PublisherService publisherService, Logger logger) { this.publisherService = publisherService; + this.logger = logger; + startDeviceEventProcessor(); } public Device getDevice(String deviceId) { @@ -37,11 +44,37 @@ public class TamagotchiService { } public Device createDevice(Device device) { + device.start(); deviceRegistry.put(device.getDeviceId(), device); return device; } private void processDeviceEvents() { + deviceRegistry + .values() + .parallelStream() + .filter(device -> device.hasMessages()) + .flatMap(device -> device.getMessages().stream()) + .forEach(message -> { + try { + publisherService.publish(message); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + }); + } + + public void startDeviceEventProcessor() { + new Thread(() -> { + try { + while (true) { + processDeviceEvents(); + Thread.sleep(DEVICE_EVENT_PROCESSOR_SCHEDULE); + } + } catch (InterruptedException e) { + logger.error("DeviceEventProcessor failed: {}", e.getMessage()); + } + }).start(); } }