RESTlet调用MapReduce脚本示例

    xiaoxiao2023-09-28  100

    RESTlet调用MapReduce脚本示例

    开发背景

    bom批量传入修改接口,单个RESTlet超脚本治理用量限制(5000),可以考虑在RESTlet中调用MapReduce脚本提交作业实例,新作业不受用量限制,可以自包容处理超用量问题(白皮书:如果map /reduce作业违反了NetSuite治理的某些方面,则map/reduce框架会自动生成作业并重新安排其工作以供日后使用,而不会中断脚本)。

    RESTlet脚本

    /** * @NApiVersion 2.x * @NScriptType Restlet * @NModuleScope SameAccount * @NAmdConfig /SuiteScripts/intretech/config.json * @author Zhu Yanlong */ /BOM接口 // // PUT // // define([ 'N/record', 'N/search', 'N/runtime', 'N/task', 'dao' ], /** * @param {record} record * @param {search} search * @param {runtime} runtime * @param {task} task * @param {dao} dao */ function(record, search, runtime, task, dao) { /** * Function called upon sending a PUT request to the RESTlet. * * @param {string | Object} requestBody - The HTTP request body; request body will be passed into function as a string when request Content-Type is * 'text/plain' or parsed into an Object when request Content-Type is 'application/json' (in which case the body must be a valid JSON) * @returns {string | Object} HTTP response body; return string when request Content-Type is 'text/plain'; return Object when request Content-Type is * 'application/json' * @since 2015.2 */ function doPut(requestBody) { log.audit("Remaining governance units start: " + runtime.getCurrentScript().getRemainingUsage()); //调用Map/Reduce脚本实例 var mapReduceScriptTask = task.create({ taskType : task.TaskType.MAP_REDUCE }); mapReduceScriptTask.scriptId = 'customscript_if_bom_update_mapreduce'; mapReduceScriptTask.deploymentId = 'customdeploy_if_bom_update_mapreduce'; mapReduceScriptTask.params = { "custscript_requestbody" : requestBody }; var mapReduceScriptTaskId = mapReduceScriptTask.submit(); var taskStatus = task.checkStatus({ taskId : mapReduceScriptTaskId }); log.debug("taskStatus: " + taskStatus.status); //等待Map/Reduce完成后,再响应 while (taskStatus.status === 'PENDING' || taskStatus.status === 'PROCESSING') { sleep(5000); taskStatus = task.checkStatus({ taskId : mapReduceScriptTaskId }); } log.debug("taskStatus: " + taskStatus.status); if (taskStatus.status == 'COMPLETE') log.debug("Complete"); else if (taskStatus.status == 'FAILED') log.debug("Failed"); log.audit("Remaining governance units end: " + runtime.getCurrentScript().getRemainingUsage()); return { 'MapReduceScriptTaskStatus' : taskStatus } //======================辅助函数=======================// function sleep(milliseconds) { var start = new Date().getTime(); for (var i = 0; i < 1e7; i++) { if ((new Date().getTime() - start) > milliseconds) { break; } } } //======================辅助函数 end=======================// } return { put : doPut }; });

    Map/Reduce脚本

    /** * @NApiVersion 2.x * @NScriptType MapReduceScript * @NModuleScope SameAccount * @NAmdConfig /SuiteScripts/intretech/config.json */ define([ 'N/runtime', 'N/record', 'N/error', 'N/email', 'N/log', 'utils', 'dao' ], /** * @param {runtime} runtime * @param {record} record * @param {error} error * @param {email} email * @param {log} log * @param {utils} utils * @param {dao} dao */ function(runtime, record, error, email, log, utils, dao) { function handleErrorAndSendNotification(e, stage) { log.error('Stage: ' + stage + ' failed', e); var author = 27; var recipients = '377132229@qq.com'; var subject = 'Map/Reduce script ' + runtime.getCurrentScript().id + ' failed for stage: ' + stage; var body = 'An error occurred with the following information:\n' + 'Error code: ' + e.name + '\n' + 'Error msg: ' + e.message; email.send({ author : author, recipients : recipients, subject : subject, body : body }); } function handleErrorIfAny(summary) { var inputSummary = summary.inputSummary; var mapSummary = summary.mapSummary; var reduceSummary = summary.reduceSummary; if (inputSummary.error) { var e = error.create({ name : 'INPUT_STAGE_FAILED', message : inputSummary.error }); handleErrorAndSendNotification(e, 'getInputData'); } handleErrorInStage('map', mapSummary); handleErrorInStage('reduce', reduceSummary); } function handleErrorInStage(stage, summary) { var errorMsg = []; summary.errors.iterator().each(function(key, value) { var msg = 'Failure update from BOM data: ' + key + '. Error was: ' + JSON.parse(value).message + '\n'; errorMsg.push(msg); return true; }); if (errorMsg.length > 0) { var e = error.create({ name : 'BOM_UPDATE_FAILED', message : JSON.stringify(errorMsg) }); handleErrorAndSendNotification(e, stage); } } function createSummaryRecord(summary) { try { var seconds = summary.seconds; var usage = summary.usage; var yields = summary.yields; var rec = record.create({ type : 'customrecord_summary', }); rec.setValue({ fieldId : 'name', value : 'Summary for M/R script: ' + runtime.getCurrentScript().id }); rec.setValue({ fieldId : 'custrecord_time', value : seconds }); rec.setValue({ fieldId : 'custrecord_usage', value : usage }); rec.setValue({ fieldId : 'custrecord_yields', value : yields }); rec.save(); } catch (e) { handleErrorAndSendNotification(e, 'summarize'); } } /** * Marks the beginning of the Map/Reduce process and generates input data. * * @typedef {Object} ObjectRef * @property {number} id - Internal ID of the record instance * @property {string} type - Record type id * * @return {Array|Object|Search|RecordRef} inputSummary * @since 2015.1 */ function getInputData(context) { //从当前脚本参数字段取值 log.audit("Remaining governance units start: " + runtime.getCurrentScript().getRemainingUsage()); return utils.string2JSONObject(runtime.getCurrentScript().getParameter('custscript_requestbody')); } /** * Executes when the map entry point is triggered and applies to each key/value pair. * * @param {MapSummary} context - Data collection containing the key/value pairs to process through the map stage * @since 2015.1 */ function map(context) { log.debug({ title : "mapContext 1", details : context }); var i = 0; //一定要用JSON.parse将value的string编码值转换成对象 var bomList = JSON.parse(context.value); bomList.forEach(function(b) { context.write({ key : i, value : b }); i++; }); } /** * Executes when the reduce entry point is triggered and applies to each group. * * @param {ReduceSummary} context - Data collection containing the groups to process through the reduce stage * @since 2015.1 */ function reduce(context) { log.debug({ title : "reduceContext context key", details : context.key }); log.debug({ title : "reduceContext context values[0] before JSON.parse", details : context.values[0] }); log.debug({ title : "reduceContext context values[0] after JSON.parse", details : JSON.parse(context.values[0]) }); //更新BOM数据,一定要用JSON.parse将数组string编码值转换成对象 var result = dao.upsertBomAllRecord('PUT', JSON.parse(context.values[0])); // end 更新BOM数据 context.write({ key : context.key, value : result }); log.debug({ title : "reduceContext result", details : context }); log.debug({ title : "reduceContext context", details : 'key: ' + context.key + ', value: ' + context.value }); } /** * Executes when the summarize entry point is triggered and applies to the result set. * * @param {Summary} summary - Holds statistics regarding the execution of a map/reduce script * @since 2015.1 */ function summarize(summary) { handleErrorIfAny(summary); createSummaryRecord(summary); log.debug({ title : "summary", details : summary }); log.audit("Remaining governance units end: " + runtime.getCurrentScript().getRemainingUsage()); } return { getInputData : getInputData, map : map, reduce : reduce, summarize : summarize }; });

    遇到的问题

    Map/Reduce不能被RESTlet成功调用进入map/reduce/summary阶段

    参考: Answer Id: 80924 - Map/Reduce Script called by RESTlet Fails When calling a Map/Reduce Script from a RESTlet using task.create(), the Map/Reduce Script fails when using a non-admin role. To resolve this, the role used to execute the RESTlet should have the Documents and Files Permission. Here’s how to set that up: 解决:Navigate to (Administrator) Setup > Users/Roles > Manage Roles > Edit the Role used to execute the RESTlet Under the Permissions tab > Lists subtab > Add the Permission: Documents and Files and set the Level: Full Click Save

    最新回复(0)