|
[Akka] родитель не получает сообщение о том, что его child помер
#39627651
Ссылка:
Ссылка на сообщение:
Ссылка с названием темы:
|
|
|
|
Делаю оф. туториал. Сейчас на https://doc.akka.io/docs/akka/current/guide/tutorial_4.html
Собсно мой код:
**Deviсe group:**
1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16. 17. 18. 19. 20. 21. 22. 23. 24. 25. 26. 27. 28. 29. 30. 31. 32. 33. 34. 35. 36. 37. 38. 39. 40. 41. 42. 43. 44. 45. 46. 47. 48. 49. 50. 51. 52. 53. 54. 55. 56. 57. 58. 59. 60. 61. 62. 63. 64. 65. 66. 67. 68. 69. 70. 71. 72. 73. 74. 75. 76. 77. 78. 79. 80. 81. 82. 83. 84. 85. 86. 87. 88. 89. 90. 91. 92. 93. 94. 95. 96. 97.
package com.lightbend.akka.sample.iot;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class DeviceGroup extends AbstractActor {
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
final String groupId;
final Map<String, ActorRef> deviceIdToActor = new HashMap<>();
final Map<ActorRef, String> actorToDeviceId = new HashMap<>();
public DeviceGroup(String groupId) {
this.groupId = groupId;
}
public static Props props(String groupId) {
return Props.create(DeviceGroup.class, groupId);
}
@Override
public void preStart() {
log.info("DeviceGroup {} started", groupId);
}
@Override
public void postStop() {
log.info("DeviceGroup {} stopped", groupId);
}
private void onTrackDevice(Device.RequestTrackDevice trackMsg) {
if (this.groupId.equals(trackMsg.groupId)) {
ActorRef deviceActor = deviceIdToActor.get(trackMsg.deviceId);
if (deviceActor != null) {
deviceActor.forward(trackMsg, getContext());
} else {
log.info("Creating device actor for {}", trackMsg.deviceId);
deviceActor = getContext().actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId);
deviceIdToActor.put(trackMsg.deviceId, deviceActor);
actorToDeviceId.put(deviceActor, trackMsg.deviceId);
deviceActor.forward(trackMsg, getContext());
}
} else {
log.warning(
"Ignoring TrackDevice request for {}. This actor is responsible for {}.",
groupId, this.groupId
);
}
}
private void onTerminated(Terminated t) {
ActorRef deviceActor = t.getActor();
String deviceId = actorToDeviceId.get(deviceActor);
log.info("Device actor for {} has been terminated", deviceId);
actorToDeviceId.remove(deviceActor);
deviceIdToActor.remove(deviceId);
}
private void onDeviceList(RequestDeviceList r) {
getSender().tell(new ReplyDeviceList(r.requestId, deviceIdToActor.keySet()), getSelf());
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Device.RequestTrackDevice.class, this::onTrackDevice)
.match(Terminated.class, this::onTerminated)
.match(RequestDeviceList.class, this::onDeviceList)
.build();
}
public static final class RequestDeviceList {
final long requestId;
public RequestDeviceList(long requestId) {
this.requestId = requestId;
}
}
public static final class ReplyDeviceList {
final long requestId;
final Set<String> ids;
public ReplyDeviceList(long requestId, Set<String> ids) {
this.requestId = requestId;
this.ids = ids;
}
}
}
**Device:**
1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16. 17. 18. 19. 20. 21. 22. 23. 24. 25. 26. 27. 28. 29. 30. 31. 32. 33. 34. 35. 36. 37. 38. 39. 40. 41. 42. 43. 44. 45. 46. 47. 48. 49. 50. 51. 52. 53. 54. 55. 56. 57. 58. 59. 60. 61. 62. 63. 64. 65. 66. 67. 68. 69. 70. 71. 72. 73. 74. 75. 76. 77. 78. 79. 80. 81. 82. 83. 84. 85. 86. 87. 88. 89. 90. 91. 92. 93. 94. 95. 96. 97. 98. 99. 100. 101. 102. 103. 104. 105. 106. 107. 108. 109. 110. 111. 112. 113. 114. 115.
package com.lightbend.akka.sample.iot;
import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import java.util.Optional;
public class Device extends AbstractActor {
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
final String groupId;
final String deviceId;
public Device(String groupId, String deviceId) {
this.groupId = groupId;
this.deviceId = deviceId;
}
public static Props props(String groupId, String deviceId) {
return Props.create(Device.class, groupId, deviceId);
}
Optional<Double> lastTemperatureReading = Optional.empty();
@Override
public void preStart() {
log.info("Device actor {}-{} started", groupId, deviceId);
}
@Override
public void postStop() {
log.info("Device actor {}-{} stopped", groupId, deviceId);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(RequestTrackDevice.class, r -> {
if (this.groupId.equals(r.groupId) && this.deviceId.equals(r.deviceId)) {
getSender().tell(new DeviceRegistered(), getSelf());
} else {
log.warning(
"Ignoring TrackDevice request for {}-{}.This actor is responsible for {}-{}.",
r.groupId, r.deviceId, this.groupId, this.deviceId
);
}
})
.match(ReadTemperature.class, r -> {
getSender().tell(new RespondTemperature(r.requestId, lastTemperatureReading), getSelf());
})
.match(RecordTemperature.class, r -> {
log.info("Recorded temperature reading {} with {}", r.value, r.requestId);
lastTemperatureReading = Optional.of(r.value);
getSender().tell(new TemperatureRecorded(r.requestId), getSelf());
})
.build();
}
//temperature request
public static final class ReadTemperature {
long requestId;
public ReadTemperature(long requestId) {
this.requestId = requestId;
}
}
//temperature response
public static final class RespondTemperature {
long requestId;
Optional<Double> value;
public RespondTemperature(long requestId, Optional<Double> value) {
this.requestId = requestId;
this.value = value;
}
}
//==============
public static final class RecordTemperature {
final long requestId;
final double value;
public RecordTemperature(long requestId, double value) {
this.requestId = requestId;
this.value = value;
}
}
public static final class TemperatureRecorded {
final long requestId;
public TemperatureRecorded(long requestId) {
this.requestId = requestId;
}
}
//===================
public static final class RequestTrackDevice {
public final String groupId;
public final String deviceId;
public RequestTrackDevice(String groupId, String deviceId) {
this.groupId = groupId;
this.deviceId = deviceId;
}
}
public static final class DeviceRegistered {
}
}
и тест:
1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16. 17. 18. 19. 20. 21. 22. 23. 24. 25. 26. 27. 28. 29. 30. 31. 32.
@Test
public void testListActiveDevicesAfterOneShutsDown() {
TestKit probe = new TestKit(system);
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
groupActor.tell(new Device.RequestTrackDevice("group", "device1"), probe.getRef());
probe.expectMsgClass(Device.DeviceRegistered.class);
ActorRef toShutDown = probe.getLastSender();
groupActor.tell(new Device.RequestTrackDevice("group", "device2"), probe.getRef());
probe.expectMsgClass(Device.DeviceRegistered.class);
groupActor.tell(new DeviceGroup.RequestDeviceList(0L), probe.getRef());
DeviceGroup.ReplyDeviceList reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
assertEquals(0L, reply.requestId);
assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids);
probe.watch(toShutDown);
toShutDown.tell(PoisonPill.getInstance(), ActorRef.noSender());
probe.expectTerminated(toShutDown);
// using awaitAssert to retry because it might take longer for the groupActor
// to see the Terminated, that order is undefined
probe.awaitAssert(Duration.fromNanos(10_000_000_000L),() -> {
groupActor.tell(new DeviceGroup.RequestDeviceList(1L), probe.getRef());
DeviceGroup.ReplyDeviceList r =
probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
assertEquals(1L, r.requestId);
assertEquals(Stream.of("device2").collect(Collectors.toSet()), r.ids);
return null;
});
}
Вот такие логи пишутся:
1. 2. 3. 4. 5. 6. 7. 8. 9. 10.
[INFO] [04/09/2018 17:56:31.976] [default-akka.actor.default-dispatcher-2] [akka://default/user/$a] DeviceGroup group started
[INFO] [04/09/2018 17:56:31.977] [default-akka.actor.default-dispatcher-2] [akka://default/user/$a] Creating device actor for device1
[INFO] [04/09/2018 17:56:31.979] [default-akka.actor.default-dispatcher-3] [akka://default/user/$a/device-device1] Device actor group-device1 started
[INFO] [04/09/2018 17:56:31.983] [default-akka.actor.default-dispatcher-3] [akka://default/user/$a] Creating device actor for device2
[INFO] [04/09/2018 17:56:31.983] [default-akka.actor.default-dispatcher-2] [akka://default/user/$a/device-device2] Device actor group-device2 started
[INFO] [04/09/2018 17:56:31.992] [default-akka.actor.default-dispatcher-2] [akka://default/user/$a/device-device1] Device actor group-device1 stopped
java.lang.AssertionError:
Expected :[device2]
Actual :[device1, device2]
Метод `com.lightbend.akka.sample.iot.DeviceGroup#onTerminated` не вызывается.
Что я делаю не так?
|
|
|