public interface ActorGateway extends Serializable { /** * Sends a message asynchronously and returns its response. The response to the message is * returned as a future. * * @param message Message to be sent * @param timeout Timeout until the Future is completed with an AskTimeoutException * @return Future which contains the response to the sent message */ Future
public class AkkaActorGateway implements ActorGateway, Serializable { private static final long serialVersionUID = 42L; // ActorRef of the remote instance private final ActorRef actor; // Associated leader session ID, which is used for RequiresLeaderSessionID messages private final UUID leaderSessionID; // Decorator for messages private final MessageDecorator decorator; public AkkaActorGateway(ActorRef actor, UUID leaderSessionID) { this.actor = Preconditions.checkNotNull(actor); this.leaderSessionID = Preconditions.checkNotNull(leaderSessionID); // we want to wrap RequiresLeaderSessionID messages in a LeaderSessionMessage this.decorator = new LeaderSessionMessageDecorator(leaderSessionID); } /** * Sends a message asynchronously and returns its response. The response to the message is * returned as a future. * * @param message Message to be sent * @param timeout Timeout until the Future is completed with an AskTimeoutException * @return Future which contains the response to the sent message */ @Override public Future ask(Object message, FiniteDuration timeout) { Object newMessage = decorator.decorate(message); return Patterns.ask(actor, newMessage, new Timeout(timeout)); } /** * Sends a message asynchronously without a result. * * @param message Message to be sent */ @Override public void tell(Object message) { Object newMessage = decorator.decorate(message); actor.tell(newMessage, ActorRef.noSender()); } /** * Sends a message asynchronously without a result with sender being the sender. * * @param message Message to be sent * @param sender Sender of the message */ @Override public void tell(Object message, ActorGateway sender) { Object newMessage = decorator.decorate(message); actor.tell(newMessage, sender.actor()); } /** * Forwards a message. For the receiver of this message it looks as if sender has sent the * message. * * @param message Message to be sent * @param sender Sender of the forwarded message */ @Override public void forward(Object message, ActorGateway sender) { Object newMessage = decorator.decorate(message); actor.tell(newMessage, sender.actor()); } /** * Retries to send asynchronously a message up to numberRetries times. The response to this * message is returned as a future. The message is re-sent if the number of retries is not yet * exceeded and if an exception occurred while sending it. * * @param message Message to be sent * @param numberRetries Number of times to retry sending the message * @param timeout Timeout for each sending attempt * @param executionContext ExecutionContext which is used to send the message multiple times * @return Future of the response to the sent message */ @Override public Future retry( Object message, int numberRetries, FiniteDuration timeout, ExecutionContext executionContext) { Object newMessage = decorator.decorate(message); return AkkaUtils.retry( actor, newMessage, numberRetries, executionContext, timeout); } /** * Returns the ActorPath of the remote instance. * * @return ActorPath of the remote instance. */ @Override public String path() { return actor.path().toString(); } /** * Returns {@link ActorRef} of the target actor * * @return ActorRef of the target actor */ @Override public ActorRef actor() { return actor; } @Override public UUID leaderSessionID() { return leaderSessionID; } @Override public String toString() { return String.format("AkkaActorGateway(%s, %s)", actor.path(), leaderSessionID); }}复制代码
public class LeaderSessionMessageDecorator implements MessageDecorator { private static final long serialVersionUID = 5359618147408392706L; /** Leader session ID with which the RequiresLeaderSessionID messages will be decorated */ private final UUID leaderSessionID; /** * Sets the leader session ID with which the messages will be decorated. * * @param leaderSessionID Leader session ID to be used for decoration */ public LeaderSessionMessageDecorator(UUID leaderSessionID) { this.leaderSessionID = leaderSessionID; } @Override public Object decorate(Object message) { if (message instanceof RequiresLeaderSessionID) { return new JobManagerMessages.LeaderSessionMessage(leaderSessionID, message); } else { return message; } }}复制代码
object JobManagerMessages { /** Wrapper class for leader session messages. Leader session messages implement the * [[RequiresLeaderSessionID]] interface and have to be wrapped in a [[LeaderSessionMessage]], * which also contains the current leader session ID. * * @param leaderSessionID Current leader session ID * @param message [[RequiresLeaderSessionID]] message to be wrapped in a [[LeaderSessionMessage]] */ case class LeaderSessionMessage(leaderSessionID: UUID, message: Any) //......}复制代码