博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink的ActorGateway
阅读量:7040 次
发布时间:2019-06-28

本文共 9071 字,大约阅读时间需要 30 分钟。

本文主要研究一下flink的ActorGateway

ActorGateway

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/instance/ActorGateway.java

public interface ActorGateway extends Serializable {	/**	 * Sends a message asynchronously and returns its response. The response to the message is	 * returned as a future.	 *	 * @param message Message to be sent	 * @param timeout Timeout until the Future is completed with an AskTimeoutException	 * @return Future which contains the response to the sent message	 */	Future ask(Object message, FiniteDuration timeout);	/**	 * Sends a message asynchronously without a result.	 *	 * @param message Message to be sent	 */	void tell(Object message);	/**	 * Sends a message asynchronously without a result with sender being the sender.	 *	 * @param message Message to be sent	 * @param sender Sender of the message	 */	void tell(Object message, ActorGateway sender);	/**	 * Forwards a message. For the receiver of this message it looks as if sender has sent the	 * message.	 *	 * @param message Message to be sent	 * @param sender Sender of the forwarded message	 */	void forward(Object message, ActorGateway sender);	/**	 * Retries to send asynchronously a message up to numberRetries times. The response to this	 * message is returned as a future. The message is re-sent if the number of retries is not yet	 * exceeded and if an exception occurred while sending it.	 *	 * @param message Message to be sent	 * @param numberRetries Number of times to retry sending the message	 * @param timeout Timeout for each sending attempt	 * @param executionContext ExecutionContext which is used to send the message multiple times	 * @return Future of the response to the sent message	 */	Future retry(			Object message,			int numberRetries,			FiniteDuration timeout,			ExecutionContext executionContext);	/**	 * Returns the path of the remote instance.	 *	 * @return Path of the remote instance.	 */	String path();	/**	 * Returns the underlying actor with which is communicated	 *	 * @return ActorRef of the target actor	 */	ActorRef actor();	/**	 * Returns the leaderSessionID associated with the remote actor or null.	 *	 * @return Leader session ID if its associated with this gateway, otherwise null	 */	UUID leaderSessionID();}复制代码
  • ActorGateway接口定义了ask、tell、forward、retry、path、actor、leaderSessionID方法;它有一个实现类为AkkaActorGateway

AkkaActorGateway

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java

public class AkkaActorGateway implements ActorGateway, Serializable {	private static final long serialVersionUID = 42L;	// ActorRef of the remote instance	private final ActorRef actor;	// Associated leader session ID, which is used for RequiresLeaderSessionID messages	private final UUID leaderSessionID;	// Decorator for messages	private final MessageDecorator decorator;	public AkkaActorGateway(ActorRef actor, UUID leaderSessionID) {		this.actor = Preconditions.checkNotNull(actor);		this.leaderSessionID = Preconditions.checkNotNull(leaderSessionID);		// we want to wrap RequiresLeaderSessionID messages in a LeaderSessionMessage		this.decorator = new LeaderSessionMessageDecorator(leaderSessionID);	}	/**	 * Sends a message asynchronously and returns its response. The response to the message is	 * returned as a future.	 *	 * @param message Message to be sent	 * @param timeout Timeout until the Future is completed with an AskTimeoutException	 * @return Future which contains the response to the sent message	 */	@Override	public Future ask(Object message, FiniteDuration timeout) {		Object newMessage = decorator.decorate(message);		return Patterns.ask(actor, newMessage, new Timeout(timeout));	}	/**	 * Sends a message asynchronously without a result.	 *	 * @param message Message to be sent	 */	@Override	public void tell(Object message) {		Object newMessage = decorator.decorate(message);		actor.tell(newMessage, ActorRef.noSender());	}	/**	 * Sends a message asynchronously without a result with sender being the sender.	 *	 * @param message Message to be sent	 * @param sender Sender of the message	 */	@Override	public void tell(Object message, ActorGateway sender) {		Object newMessage = decorator.decorate(message);		actor.tell(newMessage, sender.actor());	}	/**	 * Forwards a message. For the receiver of this message it looks as if sender has sent the	 * message.	 *	 * @param message Message to be sent	 * @param sender Sender of the forwarded message	 */	@Override	public void forward(Object message, ActorGateway sender) {		Object newMessage = decorator.decorate(message);		actor.tell(newMessage, sender.actor());	}	/**	 * Retries to send asynchronously a message up to numberRetries times. The response to this	 * message is returned as a future. The message is re-sent if the number of retries is not yet	 * exceeded and if an exception occurred while sending it.	 *	 * @param message Message to be sent	 * @param numberRetries Number of times to retry sending the message	 * @param timeout Timeout for each sending attempt	 * @param executionContext ExecutionContext which is used to send the message multiple times	 * @return Future of the response to the sent message	 */	@Override	public Future retry(			Object message,			int numberRetries,			FiniteDuration timeout,			ExecutionContext executionContext) {		Object newMessage = decorator.decorate(message);		return AkkaUtils.retry(			actor,			newMessage,			numberRetries,			executionContext,			timeout);	}	/**	 * Returns the ActorPath of the remote instance.	 *	 * @return ActorPath of the remote instance.	 */	@Override	public String path() {		return actor.path().toString();	}	/**	 * Returns {@link ActorRef} of the target actor	 *	 * @return ActorRef of the target actor	 */	@Override	public ActorRef actor() {		return actor;	}	@Override	public UUID leaderSessionID() {		return leaderSessionID;	}	@Override	public String toString() {		return String.format("AkkaActorGateway(%s, %s)", actor.path(), leaderSessionID);	}}复制代码
  • AkkaActorGateway实现了ActorGateway接口,它的构造器要求输入ActorRef及leaderSessionID,同时基于leaderSessionID创建了LeaderSessionMessageDecorator;ask、tell、forward、retry方法均首先调用LeaderSessionMessageDecorator.decorate方法包装message参数,然后再去调用ActorRef的相应方法

MessageDecorator

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/messages/MessageDecorator.java

public interface MessageDecorator extends java.io.Serializable {	/**	 * Decorates a message	 *	 * @param message Message to decorate	 * @return Decorated message	 */	Object decorate(Object message);}复制代码
  • MessageDecorator接口定义了decorate方法用于包装message,它有一个实现类为LeaderSessionMessageDecorator

LeaderSessionMessageDecorator

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/messages/LeaderSessionMessageDecorator.java

public class LeaderSessionMessageDecorator implements MessageDecorator {	private static final long serialVersionUID = 5359618147408392706L;		/** Leader session ID with which the RequiresLeaderSessionID messages will be decorated */	private final UUID leaderSessionID;	/**	 * Sets the leader session ID with which the messages will be decorated.	 *	 * @param leaderSessionID Leader session ID to be used for decoration	 */	public LeaderSessionMessageDecorator(UUID leaderSessionID) {		this.leaderSessionID = leaderSessionID;	}	@Override	public Object decorate(Object message) {		if (message instanceof RequiresLeaderSessionID) {			return new JobManagerMessages.LeaderSessionMessage(leaderSessionID, message);		} else {			return message;		}	}}复制代码
  • LeaderSessionMessageDecorator实现了MessageDecorator接口,其decorate方法判断message是RequiresLeaderSessionID类型的话,则返回JobManagerMessages.LeaderSessionMessage,否则返回原始的message

JobManagerMessages.LeaderSessionMessage

flink-1.7.2/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala

object JobManagerMessages {  /** Wrapper class for leader session messages. Leader session messages implement the    * [[RequiresLeaderSessionID]] interface and have to be wrapped in a [[LeaderSessionMessage]],    * which also contains the current leader session ID.    *    * @param leaderSessionID Current leader session ID    * @param message [[RequiresLeaderSessionID]] message to be wrapped in a [[LeaderSessionMessage]]    */  case class LeaderSessionMessage(leaderSessionID: UUID, message: Any)  //......}复制代码
  • JobManagerMessages.LeaderSessionMessage是一个case class,它有两个属性,分别是leaderSessionID及message

小结

  • ActorGateway接口定义了ask、tell、forward、retry、path、actor、leaderSessionID方法;它有一个实现类为AkkaActorGateway
  • AkkaActorGateway实现了ActorGateway接口,它的构造器要求输入ActorRef及leaderSessionID,同时基于leaderSessionID创建了LeaderSessionMessageDecorator;ask、tell、forward、retry方法均首先调用LeaderSessionMessageDecorator.decorate方法包装message参数,然后再去调用ActorRef的相应方法
  • MessageDecorator接口定义了decorate方法用于包装message,它有一个实现类为LeaderSessionMessageDecorator;LeaderSessionMessageDecorator实现了MessageDecorator接口,其decorate方法判断message是RequiresLeaderSessionID类型的话,则返回JobManagerMessages.LeaderSessionMessage,否则返回原始的message;JobManagerMessages.LeaderSessionMessage是一个case class,它有两个属性,分别是leaderSessionID及message

doc

转载于:https://juejin.im/post/5c8c6f13f265da2dca38ac5a

你可能感兴趣的文章
NG-ZORRO 7.3.1 发布,Ant Design 的 Angular 实现
查看>>
20180417PLSQL中sql语句格式化与注解问题
查看>>
Java 基础 之 while 循环
查看>>
顺丰今天上市了,王卫又成了民营快递业的首富
查看>>
前端和云端性能分析工具分析报告
查看>>
Jim Zemlin:加速开源创新,Linux 基金会超越 Linux
查看>>
.NET零基础入门05:委托与事件
查看>>
【阿里云MVP公益共创项目】服务数万爱心教师支教,推动中国渔业生态保护
查看>>
Linux命令复习和练习_03
查看>>
使用 github pages, 快速部署你的静态网页
查看>>
react 之 state 对象
查看>>
Java中的锁原理、锁优化、CAS、AQS
查看>>
“智能厨电+渠道精耕”,华帝迈出“关键一步”
查看>>
Scrapy爬虫(2)爬取新浪旅游图片
查看>>
Nginx反向代理以及负载均衡配置
查看>>
巨头抢滩视频云 金山云稳坐头把交椅
查看>>
索尼富士康领投,AR显示技术厂商Digilens获得2200万美元融资
查看>>
Qt5 GUI 开发的应用易受远程代码执行漏洞的影响
查看>>
搞懂Java动态代理
查看>>
「镁客·请讲」NXROBO林天麟:我们分三步走,首先要做的就是打通机器人行业的产业链...
查看>>