| 'use strict'; | |
| 
 | |
| 
 | |
| 
 | |
| function JobAgent(id) { | |
|   // execute super constructor | |
|   eve.Agent.call(this, id); | |
|   this.rpc = this.loadModule('rpc', this.rpcFunctions); | |
|   this.connect(eve.system.transports.getAll()); | |
| 
 | |
|   this.id = id; | |
|   this.type = this.id.replace('job_',''); | |
|   this.globalStats = new DurationStats(); | |
|   this.globalStats.getFakeStats(id); | |
| 
 | |
|   this.agentStats = {}; | |
| 
 | |
|   this.allJobs = {};   // used to quickly look up a job ID | |
|   this.openJobs = {};  // used to check if there is a watcher to | |
|   this.closedJobs = {};// keeps a list of agentIds containing jobs, used for stats collection | |
|   // optimization would be nice with running averages, but N samples are needed, wait for demo data. | |
|  | |
|   this.watchers  = {}; | |
| } | |
| 
 | |
| // extend the eve.Agent prototype | |
| JobAgent.prototype = Object.create(eve.Agent.prototype); | |
| JobAgent.prototype.constructor = JobAgent; | |
| 
 | |
| // define RPC functions, preferably in a separated object to clearly distinct | |
| // exposed functions from local functions. | |
| JobAgent.prototype.rpcFunctions = {}; | |
| 
 | |
| 
 | |
| JobAgent.prototype.expandPrerequisites = function(prerequisites) { | |
|   var expanded = []; | |
|   if (prerequisites !== undefined) { | |
|     for (var i = 0; i < prerequisites.length; i++) { | |
|       var prereq = prerequisites[i]; | |
|       if (typeof prereq == 'string') { | |
|         expanded.push({ | |
|           jobId: prereq, | |
|           uuid: uuid(), | |
|           times : new DurationData(), | |
|           stats: new DurationStats() | |
|         }); | |
|       } | |
|       else if (typeof prereq == 'object' && prereq.type !== undefined) { //&& prereq.agentId !== undefined not needed since the same items will be added when only type exists | |
|         prereq.uuid = uuid(); | |
|         prereq.times = new DurationData(); | |
|         prereq.stats = new DurationStats(); | |
|         expanded.push(prereq); | |
|       } | |
|       else { | |
|         console.log('ERROR: cannot use the prerequisites! Not in array of strings or array of objects with correct fields format.'); | |
|         throw new Error('ERROR: cannot use the prerequisites! Not in array of strings or array of objects with correct fields format.'); | |
|       } | |
|     } | |
|   } | |
|   return expanded; | |
| }; | |
| /** | |
|  * Create new Job for agent | |
|  * @param params | |
|  */ | |
| JobAgent.prototype.rpcFunctions.add = function(params) { | |
|   var agentId = params.agentId; | |
|   var jobId = params.jobId; | |
| 
 | |
|   // create stats if not yet exits | |
|   if (this.agentStats[agentId] === undefined) { | |
|     this.agentStats[agentId] = new DurationStats(); | |
|   } | |
| 
 | |
|   // create open job | |
|   if (this.openJobs[agentId] === undefined) { | |
|     this.openJobs[agentId] = {}; | |
|   } | |
|   if (this.openJobs[agentId][jobId] !== undefined) { | |
|     console.log('cannot start new job, jobId:', jobId, ' already exists!'); | |
|     throw new Error('cannot start new job, jobId:' + jobId + ' already exists!'); | |
|   } | |
|   var prerequisites = this.expandPrerequisites(params.prerequisites); | |
|   this.openJobs[agentId][jobId] = new Job(jobId, this.id, params.time, agentId, prerequisites); | |
|   this.allJobs[jobId] = this.openJobs[agentId][jobId]; | |
|   this.addWatchers(jobId, prerequisites); | |
| 
 | |
| 
 | |
|   // return prediction | |
|   var statsData; | |
|   if (this.agentStats[agentId].duration.mean == 0) { | |
|     statsData = this.globalStats.getData(); | |
|   } | |
|   else { | |
|     statsData = this.agentStats[agentId].getData(); | |
|   } | |
|   return statsData; | |
| }; | |
| 
 | |
| /** | |
|  * finish the job of an agent | |
|  * @param params | |
|  */ | |
| JobAgent.prototype.rpcFunctions.finish = function(params) { | |
|   var agentId = params.agentId; | |
|   var jobId = params.jobId; | |
|   var job = this.openJobs[agentId][jobId]; | |
| 
 | |
|   // finish job | |
|   job.finish(params.time); | |
|   // notify watchers that a job is finished. | |
|   if (this.watchers[jobId] !== undefined) { | |
|     for (var i = 0; i < this.watchers[jobId].length; i++) { | |
|       var val = this.watchers[jobId][i]; | |
|       var address     = val.address; | |
|       var parentJobId = val.parentJobId; | |
|       var uuid        = val.uuid; | |
|       this.rpc.request(address, {method:'watchedJobFinished', params:{ | |
|         uuid:                 uuid, | |
|         parentJobId:          parentJobId, | |
|         duration:             job.duration.getData() | |
|       }}).done(); | |
|     } | |
|   } | |
|   // cleanup watchers | |
|   delete this.watchers[jobId]; | |
| 
 | |
|   // move from open to closed jobs. | |
|   if (this.closedJobs[agentId] === undefined) { | |
|     this.closedJobs[agentId] = {}; | |
|   } | |
|   if (this.closedJobs[agentId][jobId] !== undefined) { | |
|     console.log('cannot close job, jobId:', jobId, ' already exists!'); | |
|     throw new Error('cannot close job, jobId:' + jobId + ' already exists!'); | |
|   } | |
|   this.closedJobs[agentId][jobId] = this.openJobs[agentId][jobId]; | |
| 
 | |
|   delete this.openJobs[agentId][jobId]; | |
| 
 | |
|   this.updateStats(); | |
| 
 | |
|   return { | |
|     elapsedTime: this.closedJobs[agentId][jobId].elapsedTime, | |
|     elapsedTimeWithPause: this.closedJobs[agentId][jobId].elapsedTimeWithPause, | |
|     duration: this.closedJobs[agentId][jobId].duration.getData(), | |
|     prediction: this.globalStats.getData() | |
|   }; | |
| }; | |
| 
 | |
| /** | |
|  * update the job of an agent | |
|  * @param params | |
|  */ | |
| JobAgent.prototype.rpcFunctions.update = function(params) { | |
|   var agentId = params.agentId; | |
|   var jobId = params.jobId; | |
|   var job = this.openJobs[agentId][jobId]; | |
|   var operation = params.operation; | |
| 
 | |
|   switch (operation) { | |
|     case 'pause': | |
|       job.pause(params.time, false); | |
|       break; | |
|     case 'endOfDay': | |
|       job.pause(params.time, true); | |
|       break; | |
|     case 'startOfDay': | |
|       job.resume(params.time, true); | |
|       break; | |
|     case 'resume': | |
|       job.resume(params.time, false); | |
|       break; | |
|   } | |
|   return {jobId: jobId, type: this.type, elapsedTime: job.elapsedTime, elapsedTimeWithPause: job.elapsedTimeWithPause}; | |
| }; | |
| 
 | |
| /** | |
|  * return agent stats | |
|  * @param params | |
|  * @returns {*} | |
|  */ | |
| JobAgent.prototype.rpcFunctions.getAgentStats = function(params) { | |
|   return this.agentStats[params.agentId]; | |
| }; | |
| 
 | |
| 
 | |
| /** | |
|  * return global stats | |
|  * @param params | |
|  * @returns {{mean: number, std: number}|*} | |
|  */ | |
| JobAgent.prototype.rpcFunctions.getGlobalStats = function(params) { | |
|   return this.globalStats; | |
| }; | |
| 
 | |
| JobAgent.prototype.rpcFunctions.watchedJobFinished = function(params) { | |
|   var jobId = params.parentJobId; | |
|   this.allJobs[jobId].prerequisiteFinished(params); | |
| }; | |
| 
 | |
| 
 | |
| /** | |
|  * | |
|  * @param params | |
|  * @param sender | |
|  * @returns {*} | |
|  */ | |
| JobAgent.prototype.rpcFunctions.addWatcherOnJobId = function(params, sender) { | |
|   var jobId = params.jobId; | |
|   var uuid = params.uuid; | |
|   var parentJobId = params.parentJobId; | |
|   var job = this.allJobs[jobId]; | |
| 
 | |
|   // if the job is already finished, call the finished callback | |
|   if (job.finished == true) { | |
|     this.rpc.request(sender, {method:'watchedJobFinished', params:{ | |
|       uuid:                 uuid, | |
|       parentJobId:          parentJobId, | |
|       duration:             job.duration.getData()  // we need the pure json data, not the class | |
|     }}).done(); | |
|   } | |
|   else { | |
|     // we will create a watcher on a job which will alert the watcher when the job is finished with all the times. | |
|     if (this.watchers[jobId] === undefined) { | |
|       this.watchers[jobId] = []; | |
|     } | |
|     this.watchers[jobId].push({address: params.address, uuid: uuid}); | |
|   } | |
| 
 | |
|   // return the best prediction we have | |
|   if (this.agentStats[job.agentId].mean == 0) { | |
|     return this.globalStats.getData(); // we need the pure json data, not the class | |
|   } | |
|   return this.agentStats[job.agentId].getData(); // we need the pure json data, not the class | |
| }; | |
| 
 | |
| 
 | |
| /** | |
|  * | |
|  * @param params | |
|  * @param sender | |
|  * @returns {*} | |
|  */ | |
| JobAgent.prototype.rpcFunctions.addWatcherByAgentID = function(params, sender) { | |
|   var agentId = params.agentId; | |
|   var parentJobId = params.parentJobId; | |
|   var jobId = null; | |
|   var uuid = params.uuid; | |
|   var returnStats; | |
| 
 | |
|   // see which statistics collection we will need to return. | |
|   if (this.agentStats[agentId].duration.mean == 0) { | |
|     returnStats = this.globalStats; | |
|   } | |
|   else { | |
|     returnStats = this.agentStats[agentId]; | |
|   } | |
|   // see if we have an open job with that agent of this type | |
|   if (this.openJobs[agentId] !== undefined) { | |
|     for (var jId in this.openJobs[agentId]) { | |
|       if (this.openJobs[agentId].hasOwnProperty(jId)) { | |
|         jobId = jId; | |
|         break; | |
|       } | |
|     } | |
|   } | |
|   // there is no open job from supplied agent of this type. return the mean of the return stats | |
|   if (jobId === null) { | |
|     this.rpc.request(params.address, {method:'watchedJobFinished', params:{ | |
|       uuid:                 uuid, | |
|       parentJobId:          parentJobId, | |
|       duration:             returnStats.getMeanData(), // we need the pure json data, not the class | |
|       oldData: true | |
|     }}).done(); | |
|   } | |
|   else { | |
|     params.jobId = jobId; | |
|     this.rpcFunctions.addWatcherOnJobId.call(this, params, sender); | |
|   } | |
| 
 | |
|   // return the best prediction we have | |
|   return returnStats.getData();  // we need the pure json data, not the class | |
| }; | |
| 
 | |
| /** | |
|  * | |
|  * @param params | |
|  * @param sender | |
|  * @returns {*} | |
|  */ | |
| JobAgent.prototype.rpcFunctions.addWatcherByType = function(params, sender) { | |
|   // since we cannot watch a global type, we return the global stats at that point. | |
|   this.rpc.request(params.address, {method:'watchedJobFinished', params:{ | |
|     uuid:                 params.uuid, | |
|     parentJobId:          params.parentJobId, | |
|     duration:             this.globalStats.getMeanData(), // we need the pure json data, not the class | |
|     oldData: true | |
|   }}).done(); | |
|   return this.globalStats.getData(); // we need the pure json data, not the class | |
| }; | |
| 
 | |
| 
 | |
| /** | |
|  * | |
|  * @param parentJobId           | ID from the job that wants to WATCH other jobs | |
|  * @param prerequisites | |
|  */ | |
| JobAgent.prototype.addWatchers = function(parentJobId, prerequisites) { | |
|   for (var i = 0; i < prerequisites.length; i++) { | |
|     var prereq = prerequisites[i]; | |
|     var params = { | |
|       uuid: prereq.uuid, | |
|       address: this.id, // this is the callback address | |
|       parentJobId: parentJobId// this is the job that wants to watch the other one. | |
|     }; | |
|     var me = this; | |
|     if (prereq.jobId !== undefined) { | |
|       // we now have a parentJobId to watch | |
|       // we first need to find the type of job this belongs to. | |
|       this.rpc.request('JobAgentGenerator', {method: 'returnJobAddress', params: {jobId: prereq.jobId}}) | |
|         // now that we have an address, set a watcher on the job id | |
|         .done(function (address) { | |
|           if (address != 'doesNotExist') { | |
|             params.jobId = prereq.jobId;  // this is the job we want to watch | |
|             me.rpc.request(address, {method: 'addWatcherOnJobId', params: params}) | |
|               .done(function (preliminaryStats) { | |
|                 me.allJobs[parentJobId].watchingPrerequisite(preliminaryStats, prereq.uuid); | |
|               }) | |
|           } | |
|           else { | |
|             console.log('ERROR: watch job does not exist.'); | |
|             throw new Error('ERROR: watch job does not exist.'); | |
|           } | |
|         }); | |
|     } | |
|     else if (prereq.agentId !== undefined && prereq.type !== undefined) { | |
|       // we now have an agentId and a jobType to watch. | |
|       params.agentId = prereq.agentId;  // this is the job we want to watch | |
|       this.rpc.request(prereq.type, {method: 'addWatcherByAgentID', params: params}) | |
|         .done(function (preliminaryStats) { | |
|           me.allJobs[parentJobId].watchingPrerequisite(preliminaryStats, prereq.uuid); | |
|         }) | |
|     } | |
|     else if (prereq.type !== undefined) { | |
|       this.rpc.request(prereq.type, {method: 'addWatcherByType', params: params}) | |
|         .done(function (preliminaryStats) { | |
|           me.allJobs[parentJobId].watchingPrerequisite(preliminaryStats, prereq.uuid); | |
|         }) | |
|     } | |
|   } | |
| }; | |
| 
 | |
| JobAgent.prototype.updatePredictedStartup = function(jobId, prediction) { | |
|   var jobPrediction = this.allJobs[jobId].predictedStartupTime; | |
|   jobPrediction.mean = Math.max(jobPrediction.mean, prediction.mean); | |
|   jobPrediction.std = Math.sqrt(Math.pow(jobPrediction.std,2) + Math.pow(prediction.std,2)); | |
| 
 | |
|   this.allJobs[jobId].prerequisitesCount += 1; | |
| }; | |
| 
 | |
| 
 | |
| /** | |
|  * Update all statistics | |
|  * | |
|  */ | |
| JobAgent.prototype.updateStats = function() { | |
|   this.globalStats.clearStats(); | |
| 
 | |
|   var count = 0; | |
|   for (var agentId in this.closedJobs) { | |
|     if (this.closedJobs.hasOwnProperty(agentId)) { | |
|       var collection = this.closedJobs[agentId]; | |
|       // could be optimised with rolling average for efficient memory management | |
|       this.agentStats[agentId].setData(this.updateStatsIn(collection)); | |
|       this.globalStats.sumStats(this.agentStats[agentId]); | |
|       count += 1; | |
|     } | |
|   } | |
|   this.globalStats.averageStats(count); | |
| }; | |
| 
 | |
| 
 | |
| /** | |
|  * | |
|  * @param collection | |
|  * @returns {{duration: *, durationWithPause: *, durationWithStartup: *, durationWithBoth: *}} | |
|  */ | |
| JobAgent.prototype.updateStatsIn = function(collection) { | |
|   var stats = {}; | |
|   for (var i = 0; i < this.globalStats.fields.length; i++) { | |
|     var field = this.globalStats.fields[i]; | |
|     stats[field] = this.collectStatsIn(collection, field); | |
|   } | |
|   return stats; | |
| }; | |
| 
 | |
| 
 | |
| JobAgent.prototype.collectStatsIn = function(collection, field) { | |
|   var total  = 0; | |
|   var mean   = 0; | |
|   var std    = 0; | |
|   var minVal = 1e16; | |
|   var maxVal = 0; | |
|   var count  = 0; | |
| 
 | |
|   for (var jobId in collection) { | |
|     if (collection.hasOwnProperty(jobId)) { | |
|       var value = collection[jobId].duration[field]; | |
|       maxVal = value > maxVal ? value : maxVal; | |
|       minVal = value < minVal ? value : minVal; | |
| 
 | |
|       total += collection[jobId].duration[field]; | |
|       count += 1; | |
|     } | |
|   } | |
|   if (count > 0) { | |
|     mean = total / count; | |
|     for (var jobId in collection) { | |
|       if (collection.hasOwnProperty(jobId)) { | |
|         std += Math.pow(collection[jobId].duration[field] - mean,2); | |
|       } | |
|     } | |
| 
 | |
|     std = Math.sqrt(std/count); | |
|     return {mean: mean, std: std, min: minVal, max: maxVal}; | |
|   } | |
|   else { | |
|     return {mean: 0, std: 0, min: 0, max: 0}; | |
|   } | |
| }; | |
| 
 | |
| JobAgent.prototype.hasJob = function(params) { | |
|   return this.allJobs[params.jobId] !== undefined; | |
| };
 |