enable device data generation
This commit is contained in:
parent
98d4dc0cc6
commit
8cb4183720
7 changed files with 131 additions and 113 deletions
|
@ -5,8 +5,6 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Queue;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
@ -15,7 +13,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
|
|||
public class Device {
|
||||
|
||||
@JsonProperty("id")
|
||||
private final String getDeviceId = UUID.randomUUID().toString();
|
||||
private final String id = UUID.randomUUID().toString();
|
||||
@JsonProperty("owner")
|
||||
private final String owner;
|
||||
@JsonProperty("color")
|
||||
|
@ -32,8 +30,8 @@ public class Device {
|
|||
}
|
||||
|
||||
@JsonProperty("id")
|
||||
public String getDeviceId() {
|
||||
return getDeviceId;
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
@JsonProperty("owner")
|
||||
|
@ -63,7 +61,12 @@ public class Device {
|
|||
|
||||
@JsonIgnore
|
||||
public void changeHealthScore(int delta) {
|
||||
this.healthScore += delta;
|
||||
if (healthScore >= 1) {
|
||||
healthScore += delta;
|
||||
if (healthScore > 150)
|
||||
healthScore /= 10;
|
||||
}
|
||||
messages.add(new DeviceEvent(id, owner, color, born, healthScore, Instant.now()));
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
|
@ -72,11 +75,7 @@ public class Device {
|
|||
}
|
||||
|
||||
@JsonIgnore
|
||||
public Collection<IoTMessage> getMessages() {
|
||||
ArrayList<IoTMessage> m = new ArrayList<>();
|
||||
while (!messages.isEmpty()) {
|
||||
m.add(messages.poll());
|
||||
}
|
||||
return m;
|
||||
public Queue<IoTMessage> getMessages() {
|
||||
return messages;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,84 @@
|
|||
package com.sap.tamagotchi.model;
|
||||
|
||||
public class DeviceEvent {
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import java.time.Instant;
|
||||
|
||||
public class DeviceEvent implements IoTMessage {
|
||||
|
||||
@JsonProperty("id")
|
||||
private final String id;
|
||||
@JsonProperty("owner")
|
||||
private final String owner;
|
||||
@JsonProperty("color")
|
||||
private final Color color;
|
||||
@JsonProperty("born")
|
||||
private final Instant born;
|
||||
@JsonProperty("healthScore")
|
||||
private final Integer healthScore;
|
||||
@JsonProperty("eventTime")
|
||||
private final Instant eventTime;
|
||||
|
||||
public DeviceEvent(String id, String owner, Color color, Instant born, Integer healthScore, Instant eventTime) {
|
||||
this.id = id;
|
||||
this.owner = owner;
|
||||
this.color = color;
|
||||
this.born = born;
|
||||
this.healthScore = healthScore;
|
||||
this.eventTime = eventTime;
|
||||
}
|
||||
|
||||
@JsonProperty("id")
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
@JsonProperty("owner")
|
||||
public String getOwner() {
|
||||
return owner;
|
||||
}
|
||||
|
||||
@JsonProperty("color")
|
||||
public Color getColor() {
|
||||
return color;
|
||||
}
|
||||
|
||||
@JsonProperty("born")
|
||||
public Instant getBorn() {
|
||||
return born;
|
||||
}
|
||||
|
||||
@JsonProperty("healthScore")
|
||||
public Integer getHealthScore() {
|
||||
return healthScore;
|
||||
}
|
||||
|
||||
@JsonProperty("eventTime")
|
||||
public Instant getEventTime() {
|
||||
return eventTime;
|
||||
}
|
||||
|
||||
@JsonProperty("isAlive")
|
||||
public boolean isAlive() {
|
||||
return healthScore > 1;
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
@Override
|
||||
public String getTopic() {
|
||||
return "tamagotchi-events";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "DeviceEvent{" +
|
||||
"id='" + id + '\'' +
|
||||
", owner='" + owner + '\'' +
|
||||
", color=" + color +
|
||||
", born=" + born +
|
||||
", healthScore=" + healthScore +
|
||||
", eventTime=" + eventTime +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,13 +0,0 @@
|
|||
package com.sap.tamagotchi.model;
|
||||
|
||||
public class DummyMessage {
|
||||
private final String message;
|
||||
|
||||
public DummyMessage(String message) {
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
public String getMessage() {
|
||||
return message;
|
||||
}
|
||||
}
|
|
@ -1,7 +1,5 @@
|
|||
package com.sap.tamagotchi.model;
|
||||
|
||||
public interface IoTMessage {
|
||||
String toMessage();
|
||||
|
||||
String getTopic();
|
||||
}
|
||||
|
|
|
@ -3,95 +3,33 @@
|
|||
*/
|
||||
package com.sap.tamagotchi.model;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import com.sap.tamagotchi.service.TamagotchiService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import com.sap.tamagotchi.service.TamagotchiService;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@Service
|
||||
public class Owner {
|
||||
|
||||
private TamagotchiService tamagotchiService;
|
||||
|
||||
private Collection<Device> devices;
|
||||
|
||||
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
|
||||
|
||||
@Autowired
|
||||
public Owner(TamagotchiService tamagotchiService) {
|
||||
this.tamagotchiService = tamagotchiService;
|
||||
// start();
|
||||
}
|
||||
|
||||
@Scheduled(fixedDelay = 5000)
|
||||
public void setData() {
|
||||
for (Device d : tamagotchiService.getDevices()) {
|
||||
double random = Math.random();
|
||||
Care care = new Care();
|
||||
|
||||
if (random <= 0.5) {
|
||||
Care care = new Care();
|
||||
care.setFeed(-(int) (random * 10));
|
||||
tamagotchiService.takeCare(d.getDeviceId(), new Care());
|
||||
} else {
|
||||
Care care = new Care();
|
||||
care.setFeed((int) (random * 10));
|
||||
tamagotchiService.takeCare(d.getDeviceId(), new Care());
|
||||
}
|
||||
|
||||
tamagotchiService.takeCare(d.getId(), care);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void start() {
|
||||
// Update device map
|
||||
final ScheduledFuture<?> handleGetMap = scheduler.scheduleAtFixedRate(getMapRunner, 0, 1, TimeUnit.MINUTES);
|
||||
scheduler.schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
handleGetMap.cancel(true);
|
||||
}
|
||||
}, 60, TimeUnit.MINUTES);
|
||||
|
||||
// Add userType and doAction
|
||||
final ScheduledFuture<?> handleAction = scheduler.scheduleAtFixedRate(getMapRunner, 0, 5, TimeUnit.MINUTES);
|
||||
scheduler.schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
handleAction.cancel(true);
|
||||
}
|
||||
}, 60, TimeUnit.MINUTES);
|
||||
}
|
||||
|
||||
Runnable getMapRunner = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
getMap();
|
||||
}
|
||||
};
|
||||
|
||||
Runnable actionRunner = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
addUserType();
|
||||
addAction();
|
||||
}
|
||||
};
|
||||
|
||||
public void getMap() {
|
||||
Collection<Device> currentDevices = tamagotchiService.getDevices();
|
||||
|
||||
}
|
||||
|
||||
public void addUserType() {
|
||||
|
||||
}
|
||||
|
||||
public void addAction() {
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,10 +9,10 @@ import com.google.cloud.pubsub.v1.Publisher;
|
|||
import com.google.protobuf.ByteString;
|
||||
import com.google.pubsub.v1.ProjectTopicName;
|
||||
import com.google.pubsub.v1.PubsubMessage;
|
||||
import com.sap.tamagotchi.model.DummyMessage;
|
||||
import com.sap.tamagotchi.model.IoTMessage;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
|
@ -23,28 +23,39 @@ import java.util.List;
|
|||
public class PublisherService {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
private static final ObjectMapper mapper = new ObjectMapper();
|
||||
|
||||
// use the default project id
|
||||
private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();
|
||||
private final ObjectMapper mapper;
|
||||
|
||||
@Autowired
|
||||
public PublisherService(ObjectMapper mapper) {
|
||||
this.mapper = mapper;
|
||||
}
|
||||
|
||||
public void publish(IoTMessage message) throws Exception {
|
||||
if (message == null) {
|
||||
LOGGER.info("received null message");
|
||||
return;
|
||||
}
|
||||
String topicId = message.getTopic();
|
||||
ProjectTopicName topicName = ProjectTopicName.of(PROJECT_ID, topicId);
|
||||
Publisher publisher = null;
|
||||
List<ApiFuture<String>> futures = new ArrayList<>();
|
||||
|
||||
try {
|
||||
String stringMessage = mapper.writeValueAsString(message);
|
||||
// Create a publisher instance with default settings bound to the topic
|
||||
publisher = Publisher.newBuilder(topicName).build();
|
||||
LOGGER.info("publish to topic" + publisher.getTopicNameString());
|
||||
|
||||
// convert message to bytes
|
||||
ByteString data = ByteString.copyFromUtf8(message.toMessage());
|
||||
ByteString data = ByteString.copyFromUtf8(stringMessage);
|
||||
PubsubMessage pubsubMessage = PubsubMessage.newBuilder()
|
||||
.setData(data)
|
||||
.build();
|
||||
LOGGER.info("publish to message" + message.toMessage());
|
||||
|
||||
LOGGER.info("publish to message" + stringMessage);
|
||||
|
||||
// Schedule a message to be published. Messages are automatically batched.
|
||||
ApiFuture<String> future = publisher.publish(pubsubMessage);
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package com.sap.tamagotchi.service;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
|
@ -7,6 +8,7 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
|
@ -16,22 +18,23 @@ import com.sap.tamagotchi.model.Care;
|
|||
import com.sap.tamagotchi.model.Device;
|
||||
import com.sap.tamagotchi.publisher.PublisherService;
|
||||
|
||||
import static jdk.nashorn.internal.objects.Global.print;
|
||||
|
||||
@Service
|
||||
@EnableScheduling
|
||||
public class TamagotchiService {
|
||||
|
||||
private static final long DEVICE_EVENT_PROCESSOR_SCHEDULE = 5_000;
|
||||
private static final long DEVICE_EVENT_PROCESSOR_SCHEDULE = 5000;
|
||||
|
||||
private final Logger logger;
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private final PublisherService publisherService;
|
||||
|
||||
private final Map<String, Device> deviceRegistry = Collections.synchronizedMap(new HashMap<>());
|
||||
|
||||
@Autowired
|
||||
public TamagotchiService(PublisherService publisherService, Logger logger) {
|
||||
public TamagotchiService(PublisherService publisherService) {
|
||||
this.publisherService = publisherService;
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
public Device getDevice(String deviceId) {
|
||||
|
@ -47,30 +50,32 @@ public class TamagotchiService {
|
|||
}
|
||||
|
||||
public Device createDevice(Device device) {
|
||||
deviceRegistry.put(device.getDeviceId(), device);
|
||||
deviceRegistry.put(device.getId(), device);
|
||||
return device;
|
||||
}
|
||||
|
||||
public void takeCare(String deviceId, Care care) {
|
||||
Device device = deviceRegistry.get(deviceId);
|
||||
if (device == null)
|
||||
return;
|
||||
device.changeHealthScore(care.getFeed());
|
||||
device.changeHealthScore(care.getPet());
|
||||
}
|
||||
|
||||
@Scheduled(fixedDelay = DEVICE_EVENT_PROCESSOR_SCHEDULE)
|
||||
private void processDeviceEvents() {
|
||||
// Set<String> deadList = Collections.synchronizedSet(new HashSet());
|
||||
deviceRegistry
|
||||
.values()
|
||||
.parallelStream()
|
||||
.filter(device -> device.hasMessages())
|
||||
.flatMap(device -> device.getMessages().stream())
|
||||
.forEach(message -> {
|
||||
try {
|
||||
publisherService.publish(message);
|
||||
} catch (Exception ex) {
|
||||
logger.error("processing device events failed: {}", ex.getMessage());
|
||||
throw new RuntimeException(ex);
|
||||
.filter(Device::hasMessages)
|
||||
.forEach(device -> {
|
||||
while (device.getMessages().peek() != null) {
|
||||
try {
|
||||
publisherService.publish(device.getMessages().peek());
|
||||
device.getMessages().poll();
|
||||
} catch (Exception ex) {
|
||||
LOGGER.error("processing device events failed: {}", ex.getMessage());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue