mapreducer工作流程

    xiaoxiao2021-04-15  362

    hadoop mapreduce中对split的处理

    分片: 1. 在job.waitForCompletion(true)后使用job.submit() 提交job,之后调用 submitter.submitJobInternal(Job.this, cluster); 2. 在submitJobInternal()函数中 会给job创建分片 int maps = writeSplits(job, submitJobDir); 在该函数中会调用writeNewSplits() 3. 在writeNewSplits()方法中,首先利用反射获得InputFormat的对象,从而调用该对象中的getSplits()方法,来进行分片,从而得到List<InputSplit>。 此处以FileInputFormat.java类为例。在该类中首先拿到输入路径下所有文件的状态,从而组成一个List<FileStatus>,而FileStatus中包括 path;length;isdir;block_replication;blocksize;modification_time;access_time;permission;owner;group;symlink; 在拿到一个文件的这些状态后,根据文件大小与128mb(b)大小相比进行分块操作,最终得到一个InputSplit对象,最终行形成List<InputSplit>队列, 最后会通过 JobSplitWriter.createSplitFiles方法将数组内容写出. writeNewSplits方法返回的是分片数目,决定了会创建多少个 map task. 4. 在JobSplitWriter.createSplitFiles方法中, 会打开一个输出流out,输出文件名是(${jobSubmitDir}/job.split) 其中调用writeNewSplits()来完成写出操作.与此同时,该函数会返回一个SplitMetaInfo的数组. 在该数据结构中主要包括三个属性: long startOffset : 该分片在 job.split 中的偏移量 long inputDataLength: 分片数据长度 String[] locations: hosts on which this split is local 5.最后调用writeJobSplitMetaInfo()方法 将第4步中的SplitMetaInfo数组写入到另一个文件中,文件名是${jobSubmitDir}/job.splitmetainfo. 6. 上述步骤中最终会输出两个文件 job.split 和 job.splitmetainfo. 在 job.split内容: split的类名, split的序列化信息,在FileSplit类中会写入文件名,偏移量,和长度 在 job.splitmetainfo中内容: META_SPLIT_FILE_HEADER , version , 分片数量, splitMetaInfo序列化(包括locations的数目, 以此写入所有的locations, startOffset,inputDataLength) 到此为止,将分片信息记录完成,写入到HDFS中相应的文件中. 读取分片: 1. 首先在JobImpl类中的InitTransition中会读取相应的split信息, 并启动相应的Task 2. TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId) 在该函数中会通过SplitMetaInfoReader.readSplitMetaInfo函数从job.splitmetainfo文件中读取出相应信息,首先会先验证META_SPLIT_VERSION和numSplits,然后会依次读取出每个splitMetaInfo,根据splitMetaInfo再从 job.split 中读取相应数据,构建出TaskSplitIndex对象,然后得到TaskSplitMetaInfo对象,最后返回 TaskSplitMetaInfo[]数组. TaskSplitIndex有两个属性: String splitLocation 和 long startOffset; TaskSplitMetaInfo有三个属性: TaskSplitIndex splitIndex , long inputDataLength, String[] locations 3. 在创建Map任务的时候会将该数组传入 , createMapTasks(job, inputLength, taskSplitMetaInfo); 对应每个taskSplitMetaInfo 会创建一个TaskImpl,并传入对应taskSplitMetaInfo TaskImpl task =new MapTaskImpl(....); 4. 在MapTaskImpl中会创建MapTaskAttemptImpl对象,该对象中存在createRemoteTask方法,在改方法中创建了实际的MapTask对象 MapTask mapTask =new MapTask("", TypeConverter.fromYarn(getID()), partition,splitInfo.getSplitIndex(), 1); splitInfo.getSplitIndex()会返回一个TaskSplitIndex对象, 5. 在MapTask执行runNewMapper方法时,会通过 split = getSplitDetails(new Path(splitIndex.getSplitLocation()),splitIndex.getStartOffset()); 读取到实际的文件,最后通过InputFormat接口的createRecordReader方法得到需要的RecordReader.

    此处参考博文

    先简述两个点: 1.ResourceManager是有main函数的,是用hadoop启动脚本启动的.

    2.因为rmAppManager是属于RM端的,rmAppManager.submitApplication前,submitApplication最开始提交端在哪 (即客户端提交应用程序的方法流程) :

    1.job.waitForCompletion(这个估计比较熟悉了,是平常自定义MapReduce时提交job)

    -–> 2. job.submit

    –-> 3. JobSubmitter.submitJobInternal(通过writeSplits方法完成切片)

    -–> 4.(ClientProtocol)submitClient.submitJob(协议的实现是YarnRunner)

    –-> 5. ResourceMgrDelegate.submitApplication

    -–> 6. (YarnClient)client.submitApplication(协议的实现是YarnClientImpl)

    -–> 7.开始属于RM端 :(ApplicationClientProtocol)rmClient.submitApplication(协议的实现是ClientRMService)

    -–> 8. rmAppManager.submitApplication


    submitApplication功能简要说明:

    1.创建RMAppImpl,然后将RMAppImpl放入ResourceManager的上下文rmContext

    2 触发RMAppEventType.START事件

    源码如下:

    @SuppressWarnings("unchecked") protected void submitApplication( ApplicationSubmissionContext submissionContext, long submitTime, String user) throws YarnException { ApplicationId applicationId = submissionContext.getApplicationId(); /** * 创建RMAppImpl,然后将RMAppImpl放入ResourceManager上下文rmContext */ RMAppImpl application = createAndPopulateNewRMApp(submissionContext, submitTime, user, false); ApplicationId appId = submissionContext.getApplicationId(); /** * 如果使用了Kerberos认证 */ if (UserGroupInformation.isSecurityEnabled()) { try { this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId, parseCredentials(submissionContext), submissionContext.getCancelTokensWhenComplete(), application.getUser()); } catch (Exception e) { LOG.warn("Unable to parse credentials.", e); assert application.getState() == RMAppState.NEW; this.rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, e.getMessage())); throw RPCUtil.getRemoteException(e); } } else { /** * 触发一个RMAppEventType.START事件 */ this.rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(applicationId, RMAppEventType.START)); } }

    这里最主要的是触发了RMAppEventType.START事件,那么带着两个思路继续。

    1.RMAppEventType.START事件由谁来处理?

    2.RMAppEventType.START事件处理逻辑(怎么处理)?

    基于上面两个问题,我们就要知道对应EventHandler及Dispatcher的由来,见ResourceManager(ResourceManager有main方法,是搭建环境后用脚本启动ResourceManager进程)的serviceInit方法.更详尽的原理,大家可以搜索状态机和调度器相关资料.

    ResourceManager.serviceInit代码清单

    protected void serviceInit(Configuration conf) throws Exception { ...省略... this.rmContext = new RMContextImpl(); ... createAndInitActiveServices() ...省略... }

    createAndInitActiveServices()方法代码清单

    protected void createAndInitActiveServices() throws Exception { activeServices = new RMActiveServices(this); activeServices.init(conf); }

    activeServices.init是其基类AbstractService的方法,会回调具体实现类的serviceInit方法,所以我们看RMActiveServices的serviceInit方法. RMActiveServices.serviceInit 代码清单

    protected void serviceInit(Configuration configuration) throws Exception { ...省略.... /** * 这里注册RMAppEventType事件类型的调度器ApplicationEventDispatcher,其逻辑是利用RMAppImpl.handler处理Event,这里的rmContext也是在初始化方法serviceInit中实例化出来. */ rmDispatcher.register(RMAppEventType.class, new ApplicationEventDispatcher(rmContext)); rmDispatcher.register(RMAppAttemptEventType.class, new ApplicationAttemptEventDispatcher(rmContext)); ...省略.... /** * rmAppManager也是这个时候创建的 */ rmAppManager = createRMAppManager(); rmDispatcher.register(RMAppManagerEventType.class, rmAppManager); clientRM = createClientRMService(); addService(clientRM); rmContext.setClientRMService(clientRM); ...省略.... }

    见上代码 RMAppEventType事件类型对应的事件调度器是ApplicationEventDispatcher

    @Private public static final class ApplicationEventDispatcher implements EventHandler<RMAppEvent> { private final RMContext rmContext; public ApplicationEventDispatcher(RMContext rmContext) { this.rmContext = rmContext; } @Override public void handle(RMAppEvent event) { ApplicationId appID = event.getApplicationId(); RMApp rmApp = this.rmContext.getRMApps().get(appID); if (rmApp != null) { try { rmApp.handle(event); } catch (Throwable t) { LOG.error("Error in handling event type " + event.getType() + " for application " + appID, t); } } } }

    再看ApplicationEventDispatcher的handle方法的实现,可以知道最后RMAppEventType类型的事件由RMAppImpl来处理. 所以在RMAppImpl类里查找对应事件的处理.见RMAppImpl代码清单

    private static final StateMachineFactory<RMAppImpl,RMAppState, RMAppEventType,RMAppEvent> stateMachineFactory = new StateMachineFactory<RMAppImpl,RMAppState, RMAppEventType,RMAppEvent>(RMAppState.NEW) ...省略... .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING, RMAppEventType.START, new RMAppNewlySavingTransition()) ...省略...

    即RMAppEventType.START事件由转换器RMAppNewlySavingTransition来处理.

    RMAppNewlySavingTransition代码如下:

    private static final class RMAppNewlySavingTransition extends RMAppTransition { @Override public void transition(RMAppImpl app, RMAppEvent event) { // If recovery is enabled then store the application information in a // non-blocking call so make sure that RM has stored the information // needed to restart the AM after RM restart without further client // communication LOG.info("Storing application with id " + app.applicationId); app.rmContext.getStateStore().storeNewApplication(app); } } //storeNewApplication代码 public void storeNewApplication(RMApp app) { ApplicationSubmissionContext context = app .getApplicationSubmissionContext(); assert context instanceof ApplicationSubmissionContextPBImpl; ApplicationStateData appState = ApplicationStateData.newInstance(app.getSubmitTime(), app.getStartTime(), context, app.getUser(), app.getCallerContext()); dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState)); } ###RMStateStoreAppEvent方法 public class RMStateStoreAppEvent extends RMStateStoreEvent { private final ApplicationStateData appState; public RMStateStoreAppEvent(ApplicationStateData appState) { super(RMStateStoreEventType.STORE_APP); this.appState = appState; } public ApplicationStateData getAppState() { return appState; } }

    经过RMAppNewlySavingTransition后,RMApp的状态由NEW转为NEW_SAVING.根据提交上下文创建应用程序状态数据结构(主要是提交时间,启动事件,提交上下文,提交用户等),然后触发RMStateStoreEventType.STORE_APP事件,RMStateStoreState由ACTIVE转换为FENCED。同时执行StoreAppTransition方法。

    StoreAppTransition代码如下所示:

    private static class StoreAppTransition implements MultipleArcTransition<RMStateStore, RMStateStoreEvent, RMStateStoreState> { @Override public RMStateStoreState transition(RMStateStore store, RMStateStoreEvent event) { if (!(event instanceof RMStateStoreAppEvent)) { // should never happen LOG.error("Illegal event type: " + event.getClass()); return RMStateStoreState.ACTIVE; } boolean isFenced = false; ApplicationStateData appState = ((RMStateStoreAppEvent) event).getAppState(); ApplicationId appId = appState.getApplicationSubmissionContext().getApplicationId(); LOG.info("Storing info for app: " + appId); try { //TODO:将application结构数据存储在ZK上,目录为:/rmstore/ZKRMStateRoot/RMAppRoot/$appId store.storeApplicationStateInternal(appId, appState); //TODO:用rmDispatcher全局调度器触发RMAppEventType.APP_NEW_SAVED store.notifyApplication(new RMAppEvent(appId, RMAppEventType.APP_NEW_SAVED)); } catch (Exception e) { LOG.error("Error storing app: " + appId, e); isFenced = store.notifyStoreOperationFailedInternal(e); } return finalState(isFenced); }; }

    主要是store.notifyApplication方法,其参数event是RMAppEventType.APP_NEW_SAVED类型事件,代码如下所示(此处的作用未知):

    private void notifyApplication(RMAppEvent event) { rmDispatcher.getEventHandler().handle(event); }

    因为rmDispatcher是ResourceManager的全局调度器,根据上文的介绍,RMAppEventType类型的事件最后是RMAppImpl处理,那么我们到RMAppImpl找RMAppEventType.APP_NEW_SAVED,该状态使得RMAppState的状态由NEW_SAVING到SUBMITTED变化。另外对应的transition,是AddApplicationToSchedulerTransition.对应的代码如下作用:

    private static final class AddApplicationToSchedulerTransition extends RMAppTransition { @Override public void transition(RMAppImpl app, RMAppEvent event) { app.handler.handle(new AppAddedSchedulerEvent(app.user, app.submissionContext, false)); // send the ATS create Event app.sendATSCreateEvent(); } } ## AppAddedSchedulerEvent函数 public AppAddedSchedulerEvent(String user, ApplicationSubmissionContext submissionContext, boolean isAppRecovering) { this(submissionContext.getApplicationId(), submissionContext.getQueue(), user, isAppRecovering, submissionContext.getReservationID(), submissionContext.getPriority()); } public AppAddedSchedulerEvent(ApplicationId applicationId, String queue, String user, boolean isAppRecovering, ReservationId reservationID, Priority appPriority) { super(SchedulerEventType.APP_ADDED); this.applicationId = applicationId; this.queue = queue; this.user = user; this.reservationID = reservationID; this.isAppRecovering = isAppRecovering; this.appPriority = appPriority; }

    那么接下来是要看AppAddedSchedulerEvent(SchedulerEventType.APP_ADDED)事件对应的处理器是哪个? 从上面的transition方法可以看到app是RMAppImpl,去翻RMAppImpl的构造函数,可以知道它的dispatcher和handler都是从ResourceManager上下文rmContext中提取出来的,dispatcher是AysncDispatcher,handler是根据事件类型适配的。所以顺着思路我们到ResourceManager中找到APP_ADDED事件对应的处理器. 见ResourceManager中RMActiveServices的serviceInit方法:

    /** RMActiveServices是随着ResourceManager一起初始化的 */ @Private public class RMActiveServices extends CompositeService { ...省略... schedulerDispatcher = createSchedulerEventDispatcher(); addIfService(schedulerDispatcher); rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher) } ...省略...

    SchedulerEventType类型的事件由SchedulerEventDispatcher调度器调度.那么再看SchedulerEventDispatcher的实现,其中handle方法:

    //将申请资源的事件加入eventQueue队列,由run方法去消费处理,做真正的资源分配申请 @Override public void handle(SchedulerEvent event) { try { int qSize = eventQueue.size(); if (qSize != 0 && qSize % 1000 == 0 && lastEventQueueSizeLogged != qSize) { lastEventQueueSizeLogged = qSize; LOG.info("Size of scheduler event-queue is " + qSize); } int remCapacity = eventQueue.remainingCapacity(); if (remCapacity < 1000) { LOG.info("Very low remaining capacity on scheduler event queue: " + remCapacity); } this.eventQueue.put(event); } catch (InterruptedException e) { LOG.info("Interrupted. Trying to exit gracefully."); } }

    handle方法主要是将事件加入eventQueue队列,由run方法去消费处理.这里明显是一个生产者消费者设计模式.消费者呢,见下面代码清单:

    private final class EventProcessor implements Runnable { @Override public void run() { SchedulerEvent event; while (!stopped && !Thread.currentThread().isInterrupted()) { try { event = eventQueue.take(); } catch (InterruptedException e) { LOG.error("Returning, interrupted : " + e); return; // TODO: Kill RM. } try { scheduler.handle(event); } catch (Throwable t) { // An error occurred, but we are shutting down anyway. // If it was an InterruptedException, the very act of // shutdown could have caused it and is probably harmless. if (stopped) { LOG.warn("Exception during shutdown: ", t); break; } LOG.fatal("Error in handling event type " + event.getType() + " to the scheduler", t); if (shouldExitOnError && !ShutdownHookManager.get().isShutdownInProgress()) { LOG.info("Exiting, bbye.."); System.exit(-1); } } } } }

    消费是由内部类EventProcessor完成的。EventProcessor线程类是由SchedulerEventDispatcher.serviceStart启动.主要消费逻辑是交给了scheduler.handle(event).scheduler默认情况下是CapacityScheduler, YarnConfiguration.DEFAULT_RM_SCHEDULER值是“org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler”,是在ResourceManager初始化时实例化:

    @Override protected void RMActiveServics.serviceInit(Configuration configuration) throws Exception { ...省略... schedulerDispatcher = createSchedulerEventDispatcher(); addIfService(schedulerDispatcher); rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher); ... 省略... ### createSchedulerEventDispatcher protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() { return new SchedulerEventDispatcher(this.scheduler); } } ### this.scheduler实例化scheduler = createScheduler(); protected ResourceScheduler createScheduler() { String schedulerClassName = conf.get(YarnConfiguration.RM_SCHEDULER, YarnConfiguration.DEFAULT_RM_SCHEDULER); LOG.info("Using Scheduler: " + schedulerClassName); try { Class<?> schedulerClazz = Class.forName(schedulerClassName); if (ResourceScheduler.class.isAssignableFrom(schedulerClazz)) { return (ResourceScheduler) ReflectionUtils.newInstance(schedulerClazz, this.conf); } else { throw new YarnRuntimeException("Class: " + schedulerClassName + " not instance of " + ResourceScheduler.class.getCanonicalName()); } } catch (ClassNotFoundException e) { throw new YarnRuntimeException("Could not instantiate Scheduler: " + schedulerClassName, e); } }

    到这里我们可以明确到CapacityScheduler里定位SchedulerEventType.APP_ADDED事件的处理逻辑.


    最新回复(0)