26

Flink 源码阅读笔记(5)- 集群启动流程 - JR's Blog

 4 years ago
source link: https://blog.jrwang.me/2019/flink-source-code-bootstarp/?
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

在 Flink 1.5.0 版本发布的时候,Flink 迎来了一个重要的改进:根据 FLIP-6 重构了 Flink 集群部署和任务处理模型,以便更好地和管理资源和调度任务,更优雅地和 Yarn、 Mesos、Kubernetes 等框架进行集成。

在这篇文章中,我们将对 Flink 集群的启动流程加一分析。本文的分析基于 Flink 1.9-SNAPSHOT 版本的代码。

HA 及 Leader 选举

Flink 内部的组件如 ResourceManager, JobManager 等都可以配置 HA 模式,Flink 集群启动的的时候会大量涉及到 Leader 选举,Leader 地址获取等相关的操作,因而先对 HA 相关的概念进行介绍。

Leader 地址的获取通过 LeaderRetrievalListerLeaderRetriverService 这两个接口来完成。 LeaderRetriverService 可以启动一个对 Leader 地址的监听,在 Leader 选举完成后得到通知。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public interface LeaderRetrievalService {
	void start(LeaderRetrievalListener listener) throws Exception;
	void stop() throws Exception;
}


public interface LeaderRetrievalListener {
	void notifyLeaderAddress(@Nullable String leaderAddress, @Nullable UUID leaderSessionID);
	void handleError(Exception exception);
}

GatewayRetriver 接口用于获取 RpcGateway,抽象类 LeaderGatewayRetriver 则同时继承了 LeaderRetrieverGatewayRetriver,因而1)可以在Leader选举完成后得到 Leader 地址 2)可以获取到 Leader 的 RpcGateway。

RpcGatewayRetrieverLeaderGatewayRetriver 的具体实现,根据 Leader 的地址通过 RpcService.connect() 方法获得对应 Leader 的 RpcGateway。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
class RpcGatewayRetriever<F extends Serializable, T extends FencedRpcGateway<F>> extends LeaderGatewayRetriever<T> {
	@Override
	protected CompletableFuture<T> createGateway(CompletableFuture<Tuple2<String, UUID>> leaderFuture) {
		return FutureUtils.retryWithDelay(
			() ->
				leaderFuture.thenCompose(
					(Tuple2<String, UUID> addressLeaderTuple) ->
						rpcService.connect(
							addressLeaderTuple.f0,
							fencingTokenMapper.apply(addressLeaderTuple.f1),
							gatewayType)),
			retries,
			retryDelay,
			rpcService.getScheduledExecutor());
	}
}

Leader 选举是通过 LeaderElectionService(选举服务)和 LeaderContender(参与竞选的对象)共同来完成的,每一次选举成功后都会有唯一的 leaderSessionID,可以用来作为 RpcGateway 通信的 fence token。当一个 LeaderContender 竞选成功了,会通过 LeaderContender#grantLeadership 得到通知。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
public interface LeaderElectionService {
	void start(LeaderContender contender) throws Exception;

	void stop() throws Exception;

	void confirmLeaderSessionID(UUID leaderSessionID);

	boolean hasLeadership(@Nonnull UUID leaderSessionId);
}

public interface LeaderContender {
	void grantLeadership(UUID leaderSessionID);

	void revokeLeadership();

	String getAddress();

	void handleError(Exception exception);
}

LeaderElectionService 有多种实现,如无需进行选举过程的 StandaloneLeaderElectionService,以及借助 zookeeper 和 curator 框架实现的 ZooKeeperLeaderElectionService,具体的实现细节可参考对应的源码。

HighAvailabilityServices 接口则提供了获取 HA 相关所有服务的方法,包括:

  • ResourceManager 选举服务及 Leader 获取
  • Dispatcher 选举服务及 Leader 获取
  • 任务状态的注册表
  • checkpoint recovery、blob store 等相关的服务

MiniCluster 的启动流程

我们先从最为简单的 MiniCluster 着手,分析一下 Flink 的启动流程以及内部各组件之间的交互。 MiniCluster 可以看做是一个内嵌的 Flink 运行时环境,所有的组件都在独立的本地线程中运行。MiniCluster 的启动入口在 LocalStreamEnvironment 中。

MiniCluster#start 中,启动流程大致分为三个阶段:

  • 创建一些辅助的服务,如 RpcServiceHighAvailabilityServices, BlobServer
  • 启动 TaskManager
  • 启动 Dispatcher, ResourceManager 等

创建 HighAvailabilityServices

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
class MiniCluster {
	public void start() {
		//......
		ioExecutor = Executors.newFixedThreadPool(
					Hardware.getNumberCPUCores(),
					new ExecutorThreadFactory("mini-cluster-io"));
		haServices = createHighAvailabilityServices(configuration, ioExecutor);
		//......
	}
	
	protected HighAvailabilityServices createHighAvailabilityServices(Configuration configuration, Executor executor) throws Exception {
		LOG.info("Starting high-availability services");
		return HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
			configuration,
			executor);
	}
}

HighAvailabilityServicesUtils 是创建 HighAvailabilityServices 的工具类,在没有配置 HA 的情况下,会创建 EmbeddedHaServicesEmbeddedHaServices 不具备高可用的特性,适用于 ResourceMangaer, TaksManager,JobManager 等所有组件都运行在同一个进程的情况。EmbeddedHaService 为各组件创建的选举服务为 EmbeddedLeaderElectionService, 一旦有参与选举的 LeaderContender 加入,该 contender 就被选择为 leader。

启动 TaskManager

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class MiniCluster {
	public void start() {
		//......
		startTaskManagers();
		//......
	}

	private void startTaskManagers() throws Exception {
		final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers();
		for (int i = 0; i < numTaskManagers; i++) {
			startTaskExecutor();
		}
	}

	@VisibleForTesting
	void startTaskExecutor() throws Exception {
		synchronized (lock) {
			final Configuration configuration = miniClusterConfiguration.getConfiguration();

			final TaskExecutor taskExecutor = TaskManagerRunner.startTaskManager(
				configuration,
				new ResourceID(UUID.randomUUID().toString()),
				taskManagerRpcServiceFactory.createRpcService(),
				haServices,
				heartbeatServices,
				metricRegistry,
				blobCacheService,
				useLocalCommunication(),
				taskManagerTerminatingFatalErrorHandlerFactory.create(taskManagers.size()));

			taskExecutor.start();
			taskManagers.add(taskExecutor);
		}
	}
}

在创建 HighAvailabilityServices 之后,就可以依次启动 TaskManager 了。TaskManagerRunner#startTaskManager 会创建一个 TaskExecutor, TaskExecutor 实现了 RpcEndpoint 接口。 TaskExecutor 需要和 ResourceManager 等组件进行通信,可以通过 HighAvailabilityServices 获得对应的服务地址。

TaskExecutor 启动的回调函数中,会启动一系列服务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
	public void onStart() throws Exception {
		try {
			//启动服务
			startTaskExecutorServices();
		} catch (Exception e) {
			final TaskManagerException exception = new TaskManagerException(String.format("Could not start the TaskExecutor %s", getAddress()), e);
			onFatalError(exception);
			throw exception;
		}
		//超时交由 FatalErrorHandler 进行处理
		startRegistrationTimeout();
	}

	private void startTaskExecutorServices() throws Exception {
		try {
			// start by connecting to the ResourceManager
			resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());

			// tell the task slot table who's responsible for the task slot actions
			taskSlotTable.start(new SlotActionsImpl());

			// start the job leader service
			jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());

			fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService());
		} catch (Exception e) {
			handleStartTaskExecutorServicesException(e);
		}
	}
	
	/**
	 * The listener for leader changes of the resource manager.
	 */
	private final class ResourceManagerLeaderListener implements LeaderRetrievalListener {

		@Override
		public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
			//获得 ResourceManager 的地址, 和 ResourceManager 建立连接
			runAsync(
				() -> notifyOfNewResourceManagerLeader(
					leaderAddress,
					ResourceManagerId.fromUuidOrNull(leaderSessionID)));
		}

		@Override
		public void handleError(Exception exception) {
			onFatalError(exception);
		}
	}
	
	private final class JobLeaderListenerImpl implements JobLeaderListener {

		@Override
		public void jobManagerGainedLeadership(
			final JobID jobId,
			final JobMasterGateway jobManagerGateway,
			final JMTMRegistrationSuccess registrationMessage) {
			//和 JobManager 建立连接
			runAsync(
				() ->
					establishJobManagerConnection(
						jobId,
						jobManagerGateway,
						registrationMessage));
		}

		@Override
		public void jobManagerLostLeadership(final JobID jobId, final JobMasterId jobMasterId) {
			log.info("JobManager for job {} with leader id {} lost leadership.", jobId, jobMasterId);

			runAsync(() ->
				closeJobManagerConnection(
					jobId,
					new Exception("Job leader for job id " + jobId + " lost leadership.")));
		}

		@Override
		public void handleError(Throwable throwable) {
			onFatalError(throwable);
		}
	}
}

ResourceManagerLeaderListener 的监听被回调时,TaskExecutor 会试图建立和 ResourceManager 的连接,连接被封装为 TaskExecutorToResourceManagerConnection。一旦获取 ResourceManager 的 leader 被确定后,就可以获取到 ResourceManager 对应的 RpcGateway, 接下来就可以通过 RPC 调用发起 ResourceManager#registerTaskExecutor 注册流程。注册成功后,TaskExecutorResourceManager 报告其资源(主要是 slots)情况。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
class TaskExecutor {
	private void establishResourceManagerConnection(
			ResourceManagerGateway resourceManagerGateway,
			ResourceID resourceManagerResourceId,
			InstanceID taskExecutorRegistrationId,
			ClusterInformation clusterInformation) {
        //发送SlotReport
		final CompletableFuture<Acknowledge> slotReportResponseFuture = resourceManagerGateway.sendSlotReport(
			getResourceID(),
			taskExecutorRegistrationId,
			taskSlotTable.createSlotReport(getResourceID()),
			taskManagerConfiguration.getTimeout());
		//......
		//连接建立
		establishedResourceManagerConnection = new EstablishedResourceManagerConnection(
			resourceManagerGateway,
			resourceManagerResourceId,
			taskExecutorRegistrationId);

		stopRegistrationTimeout();
	}
}

启动 DispatcherResourceManagerComponent

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class MiniCluster {
	public void start() {
		//......
		dispatcherResourceManagerComponents.addAll(createDispatcherResourceManagerComponents(
					configuration,
					dispatcherResourceManagreComponentRpcServiceFactory,
					haServices,
					blobServer,
					heartbeatServices,
					metricRegistry,
					metricQueryServiceRetriever,
					new ShutDownFatalErrorHandler()
				));
		//......
	}

	protected Collection<? extends DispatcherResourceManagerComponent<?>> createDispatcherResourceManagerComponents(
			Configuration configuration,
			RpcServiceFactory rpcServiceFactory,
			HighAvailabilityServices haServices,
			BlobServer blobServer,
			HeartbeatServices heartbeatServices,
			MetricRegistry metricRegistry,
			MetricQueryServiceRetriever metricQueryServiceRetriever,
			FatalErrorHandler fatalErrorHandler) throws Exception {
	   //Session dispatcher, standalone resource manager
		SessionDispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory();
		return Collections.singleton(
			dispatcherResourceManagerComponentFactory.create(
				configuration,
				rpcServiceFactory.createRpcService(),
				haServices,
				blobServer,
				heartbeatServices,
				metricRegistry,
				new MemoryArchivedExecutionGraphStore(),
				metricQueryServiceRetriever,
				fatalErrorHandler));
	}

	@Nonnull
	private SessionDispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory() {
		return new SessionDispatcherResourceManagerComponentFactory(StandaloneResourceManagerFactory.INSTANCE);
	}
}

在 MiniCluster 模式下,会创建一个 SessionDispatcherResourceManagerComponent 对象。SessionDispatcherResourceManagerComponent 继承自 DispatcherResourceManagerComponent,用来启动 Dispatcher, ResourceManager,和 WebMonitorEndpoint, 这些组件都在同一个进程中运行。MiniCluster 模式下启动的是 StandaloneDispatcherStandaloneResourceManager

在工厂类创建 DispatcherResourceManagerComponent, 会启动 Dispatcher, ResourceManager 等组件:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public abstract class AbstractDispatcherResourceManagerComponentFactory<T extends Dispatcher, U extends RestfulGateway> implements DispatcherResourceManagerComponentFactory<T> {
	@Override
	public DispatcherResourceManagerComponent<T> create(
			Configuration configuration,
			RpcService rpcService,
			HighAvailabilityServices highAvailabilityServices,
			BlobServer blobServer,
			HeartbeatServices heartbeatServices,
			MetricRegistry metricRegistry,
			ArchivedExecutionGraphStore archivedExecutionGraphStore,
			MetricQueryServiceRetriever metricQueryServiceRetriever,
			FatalErrorHandler fatalErrorHandler) throws Exception {
		//.......

		webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
				configuration,
				dispatcherGatewayRetriever,
				resourceManagerGatewayRetriever,
				blobServer,
				executor,
				metricFetcher,
				highAvailabilityServices.getWebMonitorLeaderElectionService(),
				fatalErrorHandler);

		log.debug("Starting Dispatcher REST endpoint.");
		webMonitorEndpoint.start();
	
		resourceManager = resourceManagerFactory.createResourceManager(
				configuration,
				ResourceID.generate(),
				rpcService,
				highAvailabilityServices,
				heartbeatServices,
				metricRegistry,
				fatalErrorHandler,
				new ClusterInformation(hostname, blobServer.getPort()),
				webMonitorEndpoint.getRestBaseUrl(),
				jobManagerMetricGroup);

		final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint);

		dispatcher = dispatcherFactory.createDispatcher(
				configuration,
				rpcService,
				highAvailabilityServices,
				resourceManagerGatewayRetriever,
				blobServer,
				heartbeatServices,
				jobManagerMetricGroup,
				metricRegistry.getMetricQueryServiceGatewayRpcAddress(),
				archivedExecutionGraphStore,
				fatalErrorHandler,
				historyServerArchivist);

		log.debug("Starting ResourceManager.");
		resourceManager.start();
		resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);

		log.debug("Starting Dispatcher.");
		dispatcher.start();
		dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);

		return createDispatcherResourceManagerComponent(
				dispatcher,
				resourceManager,
				dispatcherLeaderRetrievalService,
				resourceManagerRetrievalService,
				webMonitorEndpoint,
				jobManagerMetricGroup);
	}
}

ResourceManager 启动的回调函数中,会通过 HighAvailabilityServices 获取到选举服务,从而参与到选举之中。并启动 JobLeaderIdService,管理向当前 ResourceManager 注册的作业的 leader id。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
		extends FencedRpcEndpoint<ResourceManagerId>
		implements ResourceManagerGateway, LeaderContender {
		@Override
	public void onStart() throws Exception {
		try {
			startResourceManagerServices();
		} catch (Exception e) {
			final ResourceManagerException exception = new ResourceManagerException(String.format("Could not start the ResourceManager %s", getAddress()), e);
			onFatalError(exception);
			throw exception;
		}
	}

	private void startResourceManagerServices() throws Exception {
		try {
			leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();

			initialize();
			//参与选举
			leaderElectionService.start(this);
			jobLeaderIdService.start(new JobLeaderIdActionsImpl());

			registerSlotAndTaskExecutorMetrics();
		} catch (Exception e) {
			handleStartResourceManagerServicesException(e);
		}
	}
}

Dispatcher 启动的回调函数中,当前 Dispatcher 也会通过 LeaderElectionService 参与选举。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> implements
	DispatcherGateway, LeaderContender, SubmittedJobGraphStore.SubmittedJobGraphListener {
	//resource manager 的 gateway retriever,可以和 resource manager 通信
	private final GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever;
	
	@Override
	public void onStart() throws Exception {
		try {
			startDispatcherServices();
		} catch (Exception e) {
			final DispatcherException exception = new DispatcherException(String.format("Could not start the Dispatcher %s", getAddress()), e);
			onFatalError(exception);
			throw exception;
		}
	}

	private void startDispatcherServices() throws Exception {
		try {
			submittedJobGraphStore.start(this);
			leaderElectionService.start(this);

			registerDispatcherMetrics(jobManagerMetricGroup);
		} catch (Exception e) {
			handleStartDispatcherServicesException(e);
		}
	}
}

提交 JobGraph

通过 MiniCluster#executeJobBlocking 提交 JobGraph 并等待运行完成,提交JobGraph和请求运行结果的逻辑如下,都是通过 RPC 调用来实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class MiniCluster {
    public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
	//通过 Dispatcher 的 gateway retriever 获取 DispatcherGateway
		final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = getDispatcherGatewayFuture();

		// we have to allow queued scheduling in Flip-6 mode because we need to request slots
		// from the ResourceManager
		jobGraph.setAllowQueuedScheduling(true);

		final CompletableFuture<InetSocketAddress> blobServerAddressFuture = createBlobServerAddress(dispatcherGatewayFuture);

		final CompletableFuture<Void> jarUploadFuture = uploadAndSetJobFiles(blobServerAddressFuture, jobGraph);

		//通过 RPC 调用向 Dispatcher 提交 JobGraph
		final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture
			.thenCombine(
				dispatcherGatewayFuture,
				(Void ack, DispatcherGateway dispatcherGateway) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout))
			.thenCompose(Function.identity());

		return acknowledgeCompletableFuture.thenApply(
			(Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));
	}

	public CompletableFuture<JobResult> requestJobResult(JobID jobId) {
		return runDispatcherCommand(dispatcherGateway -> dispatcherGateway.requestJobResult(jobId, RpcUtils.INF_TIMEOUT));
	}
}

Dispatcher 在接收到提交 JobGraph 的请求后,会将提交的 JobGraph 保存在 SubmittedJobGraphStore 中(用于故障恢复),并为提交的 JobGraph 启动 JobManager:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class Dispatcher {
    private CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph) {
		final RpcService rpcService = getRpcService();

		//创建 JobManagerRunner
		final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = CompletableFuture.supplyAsync(
			CheckedSupplier.unchecked(() ->
				jobManagerRunnerFactory.createJobManagerRunner(
					jobGraph,
					configuration,
					rpcService,
					highAvailabilityServices,
					heartbeatServices,
					jobManagerSharedServices,
					new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
					fatalErrorHandler)),
			rpcService.getExecutor());

		return jobManagerRunnerFuture.thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner));
	}

	private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner) throws Exception {
		final JobID jobId = jobManagerRunner.getJobGraph().getJobID();
		jobManagerRunner.getResultFuture().whenCompleteAsync(
			(ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> {
				// check if we are still the active JobManagerRunner by checking the identity
				//noinspection ObjectEquality
				if (jobManagerRunner == jobManagerRunnerFutures.get(jobId).getNow(null)) {
					if (archivedExecutionGraph != null) {
						jobReachedGloballyTerminalState(archivedExecutionGraph);
					} else {
						final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);

						if (strippedThrowable instanceof JobNotFinishedException) {
							jobNotFinished(jobId);
						} else {
							jobMasterFailed(jobId, strippedThrowable);
						}
					}
				} else {
					log.debug("There is a newer JobManagerRunner for the job {}.", jobId);
				}
			}, getMainThreadExecutor());

		//启动JobManager
		jobManagerRunner.start();

		return jobManagerRunner;
	}
}

启动的 JobManagerRunner 会竞争 leader ,一旦被选举为 leader,就会启动一个 JobMaster

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public class JobManagerRunner implements LeaderContender, OnCompletionActions, AutoCloseableAsync {
	public void start() throws Exception {
		try {
			//竞争leader
			leaderElectionService.start(this);
		} catch (Exception e) {
			log.error("Could not start the JobManager because the leader election service did not start.", e);
			throw new Exception("Could not start the leader election service.", e);
		}
	}
	
	//被选举为 leader
	@Override
	public void grantLeadership(final UUID leaderSessionID) {
		synchronized (lock) {
			if (shutdown) {
				log.info("JobManagerRunner already shutdown.");
				return;
			}

			leadershipOperation = leadershipOperation.thenCompose(
				(ignored) -> {
					synchronized (lock) {
						return verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);
					}
				});

			handleException(leadershipOperation, "Could not start the job manager.");
		}
	}

	private CompletableFuture<Void> verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) {
		final CompletableFuture<JobSchedulingStatus> jobSchedulingStatusFuture = getJobSchedulingStatus();

		return jobSchedulingStatusFuture.thenCompose(
			jobSchedulingStatus -> {
				if (jobSchedulingStatus == JobSchedulingStatus.DONE) {
					return jobAlreadyDone();
				} else {
					return startJobMaster(leaderSessionId);
				}
			});
	}

	private CompletionStage<Void> startJobMaster(UUID leaderSessionId) {
		log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.",
			jobGraph.getName(), jobGraph.getJobID(), leaderSessionId, getAddress());

		try {
			runningJobsRegistry.setJobRunning(jobGraph.getJobID());
		} catch (IOException e) {
			return FutureUtils.completedExceptionally(
				new FlinkException(
					String.format("Failed to set the job %s to running in the running jobs registry.", jobGraph.getJobID()),
					e));
		}

		final CompletableFuture<Acknowledge> startFuture;
		try {
		  //使用特定的 JobMasterId 启动 JobMaster
			startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));
		} catch (Exception e) {
			return FutureUtils.completedExceptionally(new FlinkException("Failed to start the JobMaster.", e));
		}

		final CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture = leaderGatewayFuture;
		return startFuture.thenAcceptAsync(
			(Acknowledge ack) -> confirmLeaderSessionIdIfStillLeader(leaderSessionId, currentLeaderGatewayFuture),
			executor);
	}
}

JobMaster 启动后会和 ResourceManager 建立连接,连接被封装为 ResourceManagerConnection。一旦连接建立之后,JobMaster 就可以通过 RPC 调用和 ResourceManager 进行通信了:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService {
    private void startJobMasterServices() throws Exception {
		// start the slot pool make sure the slot pool now accepts messages for this leader
		slotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor());
		scheduler.start(getMainThreadExecutor());

		//TODO: Remove once the ZooKeeperLeaderRetrieval returns the stored address upon start
		// try to reconnect to previously known leader
		reconnectToResourceManager(new FlinkException("Starting JobMaster component."));

		// job is ready to go, try to establish connection with resource manager
		//   - activate leader retrieval for the resource manager
		//   - on notification of the leader, the connection will be established and
		//     the slot pool will start requesting slots
		resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
	}
}

在此之后就进入了任务调度执行的流程。

Standalone Cluster 模式的启动流程

在 Standalone 模式下,TaskManager 和 ResourceManager 等都在独立的进程中运行。Standalone Cluster 有两种启动方式, 即 standalonesession 模式和 standalonejob 方式,它们区别在于 Dispatcher 的实现方式不同。

JobManager 的启动

需要注意的一点是,这里我们所说的 JobManager 指的是包含 DispatcherResouceManager 等组件的单一进程,而并非 Dispatcher 为执行 JobGraph 而启动的 JobManagerRunner。在 FLIP-6 的实现中,每个 JobGraph 的调度执行的实际上是由一个独立的 JobMaster 负责的。

standalonesession 方式启动的 JobManager 的入口类是 StandaloneSessionClusterEntrypoint, 继承自 SessionClusterEntrypoint;与此对应的是,以 standalonejob 方式启动 JobManager 的入口类是 StandaloneJobClusterEntryPoint,继承自 JobClusterEntrypoint。它们都由公共父类 ClusterEntrypoint 派生而来,区别在于生成的 DispatcherResourceManagerComponent 不同。

先来看下启动过程,实际上和 MiniCluster 模式下启动 DispatcherResourceManagerComponent 的过程类似:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
abstract class ClusterEntrypoint {
    private void runCluster(Configuration configuration) throws Exception {
		synchronized (lock) {
		
			//初始化 RpcService, HighAvailabilityServices  等服务
			initializeServices(configuration);

			// write host information into configuration
			configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
			configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());

			//生成 DispatcherResourceManagerComponentFactory,由具体子类实现
			final DispatcherResourceManagerComponentFactory<?> dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);

			//创建 DispatcherResourceManagerComponent, 启动 ResourceManager, Dispatcher
			clusterComponent = dispatcherResourceManagerComponentFactory.create(
				configuration,
				commonRpcService,
				haServices,
				blobServer,
				heartbeatServices,
				metricRegistry,
				archivedExecutionGraphStore,
				new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
				this);

			//一旦 DispatcherResourceManagerComponent#getShutDownFuture 完成,则关闭各项服务
			clusterComponent.getShutDownFuture().whenComplete(
				(ApplicationStatus applicationStatus, Throwable throwable) -> {
					if (throwable != null) {
						shutDownAsync(
							ApplicationStatus.UNKNOWN,
							ExceptionUtils.stringifyException(throwable),
							false);
					} else {
						// This is the general shutdown path. If a separate more specific shutdown was
						// already triggered, this will do nothing
						shutDownAsync(
							applicationStatus,
							null,
							true);
					}
				});
		}
	}

}

这里生成的 HighAvailabilityServices 和 MiniCluster 模式下略有区别,由于各组件不在同一个进程中,因而需要从配置中加载配置:1)如果采用基于 Zookeeper 的 HA 模式,则创建 ZooKeeperHaServices,基于 zookeeper 获取 leader 通信地址 2)如果没有配置 HA,则创建 StandaloneHaServices, 并从配置文件中获取各组件的 RPC 地址信息。

StandaloneSessionClusterEntrypoint 中,生成 DispatcherResourceManagerComponent 的工厂类是 SessionDispatcherResourceManagerComponentFactory,该工厂类创建 SessionDispatcherResourceManagerComponent:由 SessionDispatcherFactory 创建 StandaloneDispatcher, 由 StandaloneResourceManagerFactory 创建 StandaloneResourceManager

StandaloneJobClusterEntrypoint 中,生成 DispatcherResourceManagerComponent 的工厂类是 JobDispatcherResourceManagerComponentFactory,该厂类创建 JobDispatcherResourceManagerComponent:由 StandaloneResourceManagerFactory 创建 StandaloneResourceManager,由 JobDispatcherFactory 创建 MiniDispatcher。一个 MiniDispatcher 和一个 JobGraph 相绑定,一旦绑定的 JobGraph 执行结束,则关闭 MiniDispatcher,进而停止 JobManager 进程。

DispatcherResourceManager 服务内部的启动流程则和 MiniCluster 中一致,这里不再赘述。

TaskManager 的启动

TaskManager 的启动入口在 TaskManagerRunner 中,它的启动流程和 MiniCluster 模式下基本一致,区别在于: 1)运行在独立的进程中, 2)HighAvailabilityServices 的创建要依赖配置文件获取。 TaskManagerRunner 会创建 TaskExecutorTaskExecutor 通过 HighAvailabilityServices 获取 ResourceManager 的通信地址,并和 ResourceManager 建立连接。

Yarn Cluster 的启动流程

Yarn Cluster 的启动入口在 FlinkYarnSessionCli 中 :首先根据命令行参数创建 YarnClusterDescriptor,接着调用 YarnClusterDescriptor#deploySessionCluster 触发集群的部署。

实际启动的逻辑在 AbstractYarnClusterDescriptor#deployInternal 中,主要就是通过 YarnClient 向 yarn 集群提交应用,启动 ApplicationMaster:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
abstract class AbstractYarnClusterDescriptor {
    protected ClusterClient<ApplicationId> deployInternal(
			ClusterSpecification clusterSpecification,
			String applicationName,
			String yarnClusterEntrypoint,
			@Nullable JobGraph jobGraph,
			boolean detached) throws Exception {
		//.......
	
		ApplicationReport report = startAppMaster(
			flinkConfiguration,
			applicationName,
			yarnClusterEntrypoint,
			jobGraph,
			yarnClient,
			yarnApplication,
			validClusterSpecification);
			
		//.....

		return createYarnClusterClient(
			this,
			validClusterSpecification.getNumberTaskManagers(),
			validClusterSpecification.getSlotsPerTaskManager(),
			report,
			flinkConfiguration,
			true);
	}
}

根据 sessioncluster 和 jobcluster 者两种启动的区别, 提交到 Yarn 中 ApplicationMatser 的入口类分别为 YarnSessionClusterEntrypointYarnJobClusterEntrypoint, 区别在于 Dispatcher 分别为 StandaloneDispatcherMiniDispatcherResoureManager 的具体实现类为 YarnResourceManager

和前述的 Standalone Cluster 不同, Yarn Cluster 模式下启动的 Flink 集群,其 TaskManager 是由 YarnResourceManager 根据 JobMaster 的请求动态向 Yarn 的 ResourceManager 进行申请的。在 JobMaster 向 ResourceManager 申请资源时,如果当前没有足够的资源分配,则 YarnResourceManager 会向 Yarn 集群的 ResourceManager 申请新的 container,并启动 TaskManager:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
class YarnResourceManager {
	//申请container
	private void requestYarnContainer() {
		resourceManagerClient.addContainerRequest(getContainerRequest());

		// make sure we transmit the request fast and receive fast news of granted allocations
		resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);

		numPendingContainerRequests++;

		log.info("Requesting new TaskExecutor container with resources {}. Number pending requests {}.",
			resource,
			numPendingContainerRequests);
	}
	
	//分配container的回调函数
	@Override
	public void onContainersAllocated(List<Container> containers) {
		runAsync(() -> {
			final Collection<AMRMClient.ContainerRequest> pendingRequests = getPendingRequests();
			final Iterator<AMRMClient.ContainerRequest> pendingRequestsIterator = pendingRequests.iterator();

			for (Container container : containers) {
				log.info(
					"Received new container: {} - Remaining pending container requests: {}",
					container.getId(),
					numPendingContainerRequests);

				if (numPendingContainerRequests > 0) {
					removeContainerRequest(pendingRequestsIterator.next());

					final String containerIdStr = container.getId().toString();
					final ResourceID resourceId = new ResourceID(containerIdStr);

					workerNodeMap.put(resourceId, new YarnWorkerNode(container));

					try {
						// Context information used to start a TaskExecutor Java process
						ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorLaunchContext(
							container.getResource(),
							containerIdStr,
							container.getNodeId().getHost());
                        //启动 TaskManager
						nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
					} catch (Throwable t) {
						log.error("Could not start TaskManager in container {}.", container.getId(), t);

						// release the failed container
						workerNodeMap.remove(resourceId);
						resourceManagerClient.releaseAssignedContainer(container.getId());
						// and ask for a new one
						requestYarnContainerIfRequired();
					}
				} else {
					// return the excessive containers
					log.info("Returning excess container {}.", container.getId());
					resourceManagerClient.releaseAssignedContainer(container.getId());
				}
			}

			// if we are waiting for no further containers, we can go to the
			// regular heartbeat interval
			if (numPendingContainerRequests <= 0) {
				resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
			}
		});
	}
	
	//创建 TaskManager 的启动上下文
	private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource, String containerId, String host)
			throws Exception {
		// init the ContainerLaunchContext
		final String currDir = env.get(ApplicationConstants.Environment.PWD.key());

		final ContaineredTaskManagerParameters taskManagerParameters =
				ContaineredTaskManagerParameters.create(flinkConfig, resource.getMemory(), numberOfTaskSlots);

		log.debug("TaskExecutor {} will be started with container size {} MB, JVM heap size {} MB, " +
				"JVM direct memory limit {} MB",
				containerId,
				taskManagerParameters.taskManagerTotalMemoryMB(),
				taskManagerParameters.taskManagerHeapSizeMB(),
				taskManagerParameters.taskManagerDirectMemoryLimitMB());

		Configuration taskManagerConfig = BootstrapTools.cloneConfiguration(flinkConfig);

		log.debug("TaskManager configuration: {}", taskManagerConfig);

		ContainerLaunchContext taskExecutorLaunchContext = Utils.createTaskExecutorContext(
			flinkConfig,
			yarnConfig,
			env,
			taskManagerParameters,
			taskManagerConfig,
			currDir,
			YarnTaskExecutorRunner.class, //入口类
			log);

		// set a special environment variable to uniquely identify this container
		taskExecutorLaunchContext.getEnvironment()
				.put(ENV_FLINK_CONTAINER_ID, containerId);
		taskExecutorLaunchContext.getEnvironment()
				.put(ENV_FLINK_NODE_ID, host);
		return taskExecutorLaunchContext;
	}
}

本文简单分析了 Flink 集群的启动流程,以及 ResourceManagerTaskExecutor DispatcherJobMaster 等不同组件之间的通信过程。

-EOF-


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK