|
|
- 'use strict';
-
-
-
- function JobAgent(id) {
- // execute super constructor
- eve.Agent.call(this, id);
- this.rpc = this.loadModule('rpc', this.rpcFunctions);
- this.connect(eve.system.transports.getAll());
-
- this.id = id;
- this.type = this.id.replace('job_','');
- this.globalStats = new DurationStats();
- this.globalStats.getFakeStats(id);
-
- this.agentStats = {};
-
- this.allJobs = {}; // used to quickly look up a job ID
- this.openJobs = {}; // used to check if there is a watcher to
- this.closedJobs = {};// keeps a list of agentIds containing jobs, used for stats collection
- // optimization would be nice with running averages, but N samples are needed, wait for demo data.
-
- this.watchers = {};
- }
-
- // extend the eve.Agent prototype
- JobAgent.prototype = Object.create(eve.Agent.prototype);
- JobAgent.prototype.constructor = JobAgent;
-
- // define RPC functions, preferably in a separated object to clearly distinct
- // exposed functions from local functions.
- JobAgent.prototype.rpcFunctions = {};
-
-
- JobAgent.prototype.expandPrerequisites = function(prerequisites) {
- var expanded = [];
- if (prerequisites !== undefined) {
- for (var i = 0; i < prerequisites.length; i++) {
- var prereq = prerequisites[i];
- if (typeof prereq == 'string') {
- expanded.push({
- jobId: prereq,
- uuid: uuid(),
- times : new DurationData(),
- stats: new DurationStats()
- });
- }
- else if (typeof prereq == 'object' && prereq.type !== undefined) { //&& prereq.agentId !== undefined not needed since the same items will be added when only type exists
- prereq.uuid = uuid();
- prereq.times = new DurationData();
- prereq.stats = new DurationStats();
- expanded.push(prereq);
- }
- else {
- console.log('ERROR: cannot use the prerequisites! Not in array of strings or array of objects with correct fields format.');
- throw new Error('ERROR: cannot use the prerequisites! Not in array of strings or array of objects with correct fields format.');
- }
- }
- }
- return expanded;
- };
- /**
- * Create new Job for agent
- * @param params
- */
- JobAgent.prototype.rpcFunctions.add = function(params) {
- var agentId = params.agentId;
- var jobId = params.jobId;
-
- // create stats if not yet exits
- if (this.agentStats[agentId] === undefined) {
- this.agentStats[agentId] = new DurationStats();
- }
-
- // create open job
- if (this.openJobs[agentId] === undefined) {
- this.openJobs[agentId] = {};
- }
- if (this.openJobs[agentId][jobId] !== undefined) {
- console.log('cannot start new job, jobId:', jobId, ' already exists!');
- throw new Error('cannot start new job, jobId:' + jobId + ' already exists!');
- }
- var prerequisites = this.expandPrerequisites(params.prerequisites);
- this.openJobs[agentId][jobId] = new Job(jobId, this.id, params.time, agentId, prerequisites);
- this.allJobs[jobId] = this.openJobs[agentId][jobId];
- this.addWatchers(jobId, prerequisites);
-
-
- // return prediction
- var statsData;
- if (this.agentStats[agentId].duration.mean == 0) {
- statsData = this.globalStats.getData();
- }
- else {
- statsData = this.agentStats[agentId].getData();
- }
- return statsData;
- };
-
- /**
- * finish the job of an agent
- * @param params
- */
- JobAgent.prototype.rpcFunctions.finish = function(params) {
- var agentId = params.agentId;
- var jobId = params.jobId;
- var job = this.openJobs[agentId][jobId];
-
- // finish job
- job.finish(params.time);
- // notify watchers that a job is finished.
- if (this.watchers[jobId] !== undefined) {
- for (var i = 0; i < this.watchers[jobId].length; i++) {
- var val = this.watchers[jobId][i];
- var address = val.address;
- var parentJobId = val.parentJobId;
- var uuid = val.uuid;
- this.rpc.request(address, {method:'watchedJobFinished', params:{
- uuid: uuid,
- parentJobId: parentJobId,
- duration: job.duration.getData()
- }}).done();
- }
- }
- // cleanup watchers
- delete this.watchers[jobId];
-
- // move from open to closed jobs.
- if (this.closedJobs[agentId] === undefined) {
- this.closedJobs[agentId] = {};
- }
- if (this.closedJobs[agentId][jobId] !== undefined) {
- console.log('cannot close job, jobId:', jobId, ' already exists!');
- throw new Error('cannot close job, jobId:' + jobId + ' already exists!');
- }
- this.closedJobs[agentId][jobId] = this.openJobs[agentId][jobId];
-
- delete this.openJobs[agentId][jobId];
-
- this.updateStats();
-
- return {
- elapsedTime: this.closedJobs[agentId][jobId].elapsedTime,
- elapsedTimeWithPause: this.closedJobs[agentId][jobId].elapsedTimeWithPause,
- duration: this.closedJobs[agentId][jobId].duration.getData(),
- prediction: this.globalStats.getData()
- };
- };
-
- /**
- * update the job of an agent
- * @param params
- */
- JobAgent.prototype.rpcFunctions.update = function(params) {
- var agentId = params.agentId;
- var jobId = params.jobId;
- var job = this.openJobs[agentId][jobId];
- var operation = params.operation;
-
- switch (operation) {
- case 'pause':
- job.pause(params.time, false);
- break;
- case 'endOfDay':
- job.pause(params.time, true);
- break;
- case 'startOfDay':
- job.resume(params.time, true);
- break;
- case 'resume':
- job.resume(params.time, false);
- break;
- }
- return {jobId: jobId, type: this.type, elapsedTime: job.elapsedTime, elapsedTimeWithPause: job.elapsedTimeWithPause};
- };
-
- /**
- * return agent stats
- * @param params
- * @returns {*}
- */
- JobAgent.prototype.rpcFunctions.getAgentStats = function(params) {
- return this.agentStats[params.agentId];
- };
-
-
- /**
- * return global stats
- * @param params
- * @returns {{mean: number, std: number}|*}
- */
- JobAgent.prototype.rpcFunctions.getGlobalStats = function(params) {
- return this.globalStats;
- };
-
- JobAgent.prototype.rpcFunctions.watchedJobFinished = function(params) {
- var jobId = params.parentJobId;
- this.allJobs[jobId].prerequisiteFinished(params);
- };
-
-
- /**
- *
- * @param params
- * @param sender
- * @returns {*}
- */
- JobAgent.prototype.rpcFunctions.addWatcherOnJobId = function(params, sender) {
- var jobId = params.jobId;
- var uuid = params.uuid;
- var parentJobId = params.parentJobId;
- var job = this.allJobs[jobId];
-
- // if the job is already finished, call the finished callback
- if (job.finished == true) {
- this.rpc.request(sender, {method:'watchedJobFinished', params:{
- uuid: uuid,
- parentJobId: parentJobId,
- duration: job.duration.getData() // we need the pure json data, not the class
- }}).done();
- }
- else {
- // we will create a watcher on a job which will alert the watcher when the job is finished with all the times.
- if (this.watchers[jobId] === undefined) {
- this.watchers[jobId] = [];
- }
- this.watchers[jobId].push({address: params.address, uuid: uuid});
- }
-
- // return the best prediction we have
- if (this.agentStats[job.agentId].mean == 0) {
- return this.globalStats.getData(); // we need the pure json data, not the class
- }
- return this.agentStats[job.agentId].getData(); // we need the pure json data, not the class
- };
-
-
- /**
- *
- * @param params
- * @param sender
- * @returns {*}
- */
- JobAgent.prototype.rpcFunctions.addWatcherByAgentID = function(params, sender) {
- var agentId = params.agentId;
- var parentJobId = params.parentJobId;
- var jobId = null;
- var uuid = params.uuid;
- var returnStats;
-
- // see which statistics collection we will need to return.
- if (this.agentStats[agentId].duration.mean == 0) {
- returnStats = this.globalStats;
- }
- else {
- returnStats = this.agentStats[agentId];
- }
- // see if we have an open job with that agent of this type
- if (this.openJobs[agentId] !== undefined) {
- for (var jId in this.openJobs[agentId]) {
- if (this.openJobs[agentId].hasOwnProperty(jId)) {
- jobId = jId;
- break;
- }
- }
- }
- // there is no open job from supplied agent of this type. return the mean of the return stats
- if (jobId === null) {
- this.rpc.request(params.address, {method:'watchedJobFinished', params:{
- uuid: uuid,
- parentJobId: parentJobId,
- duration: returnStats.getMeanData(), // we need the pure json data, not the class
- oldData: true
- }}).done();
- }
- else {
- params.jobId = jobId;
- this.rpcFunctions.addWatcherOnJobId.call(this, params, sender);
- }
-
- // return the best prediction we have
- return returnStats.getData(); // we need the pure json data, not the class
- };
-
- /**
- *
- * @param params
- * @param sender
- * @returns {*}
- */
- JobAgent.prototype.rpcFunctions.addWatcherByType = function(params, sender) {
- // since we cannot watch a global type, we return the global stats at that point.
- this.rpc.request(params.address, {method:'watchedJobFinished', params:{
- uuid: params.uuid,
- parentJobId: params.parentJobId,
- duration: this.globalStats.getMeanData(), // we need the pure json data, not the class
- oldData: true
- }}).done();
- return this.globalStats.getData(); // we need the pure json data, not the class
- };
-
-
- /**
- *
- * @param parentJobId | ID from the job that wants to WATCH other jobs
- * @param prerequisites
- */
- JobAgent.prototype.addWatchers = function(parentJobId, prerequisites) {
- for (var i = 0; i < prerequisites.length; i++) {
- var prereq = prerequisites[i];
- var params = {
- uuid: prereq.uuid,
- address: this.id, // this is the callback address
- parentJobId: parentJobId// this is the job that wants to watch the other one.
- };
- var me = this;
- if (prereq.jobId !== undefined) {
- // we now have a parentJobId to watch
- // we first need to find the type of job this belongs to.
- this.rpc.request('JobAgentGenerator', {method: 'returnJobAddress', params: {jobId: prereq.jobId}})
- // now that we have an address, set a watcher on the job id
- .done(function (address) {
- if (address != 'doesNotExist') {
- params.jobId = prereq.jobId; // this is the job we want to watch
- me.rpc.request(address, {method: 'addWatcherOnJobId', params: params})
- .done(function (preliminaryStats) {
- me.allJobs[parentJobId].watchingPrerequisite(preliminaryStats, prereq.uuid);
- })
- }
- else {
- console.log('ERROR: watch job does not exist.');
- throw new Error('ERROR: watch job does not exist.');
- }
- });
- }
- else if (prereq.agentId !== undefined && prereq.type !== undefined) {
- // we now have an agentId and a jobType to watch.
- params.agentId = prereq.agentId; // this is the job we want to watch
- this.rpc.request(prereq.type, {method: 'addWatcherByAgentID', params: params})
- .done(function (preliminaryStats) {
- me.allJobs[parentJobId].watchingPrerequisite(preliminaryStats, prereq.uuid);
- })
- }
- else if (prereq.type !== undefined) {
- this.rpc.request(prereq.type, {method: 'addWatcherByType', params: params})
- .done(function (preliminaryStats) {
- me.allJobs[parentJobId].watchingPrerequisite(preliminaryStats, prereq.uuid);
- })
- }
- }
- };
-
- JobAgent.prototype.updatePredictedStartup = function(jobId, prediction) {
- var jobPrediction = this.allJobs[jobId].predictedStartupTime;
- jobPrediction.mean = Math.max(jobPrediction.mean, prediction.mean);
- jobPrediction.std = Math.sqrt(Math.pow(jobPrediction.std,2) + Math.pow(prediction.std,2));
-
- this.allJobs[jobId].prerequisitesCount += 1;
- };
-
-
- /**
- * Update all statistics
- *
- */
- JobAgent.prototype.updateStats = function() {
- this.globalStats.clearStats();
-
- var count = 0;
- for (var agentId in this.closedJobs) {
- if (this.closedJobs.hasOwnProperty(agentId)) {
- var collection = this.closedJobs[agentId];
- // could be optimised with rolling average for efficient memory management
- this.agentStats[agentId].setData(this.updateStatsIn(collection));
- this.globalStats.sumStats(this.agentStats[agentId]);
- count += 1;
- }
- }
- this.globalStats.averageStats(count);
- };
-
-
- /**
- *
- * @param collection
- * @returns {{duration: *, durationWithPause: *, durationWithStartup: *, durationWithBoth: *}}
- */
- JobAgent.prototype.updateStatsIn = function(collection) {
- var stats = {};
- for (var i = 0; i < this.globalStats.fields.length; i++) {
- var field = this.globalStats.fields[i];
- stats[field] = this.collectStatsIn(collection, field);
- }
- return stats;
- };
-
-
- JobAgent.prototype.collectStatsIn = function(collection, field) {
- var total = 0;
- var mean = 0;
- var std = 0;
- var minVal = 1e16;
- var maxVal = 0;
- var count = 0;
-
- for (var jobId in collection) {
- if (collection.hasOwnProperty(jobId)) {
- var value = collection[jobId].duration[field];
- maxVal = value > maxVal ? value : maxVal;
- minVal = value < minVal ? value : minVal;
-
- total += collection[jobId].duration[field];
- count += 1;
- }
- }
- if (count > 0) {
- mean = total / count;
- for (var jobId in collection) {
- if (collection.hasOwnProperty(jobId)) {
- std += Math.pow(collection[jobId].duration[field] - mean,2);
- }
- }
-
- std = Math.sqrt(std/count);
- return {mean: mean, std: std, min: minVal, max: maxVal};
- }
- else {
- return {mean: 0, std: 0, min: 0, max: 0};
- }
- };
-
- JobAgent.prototype.hasJob = function(params) {
- return this.allJobs[params.jobId] !== undefined;
- };
|