'use strict';
|
|
|
|
|
|
|
|
function JobAgent(id) {
|
|
// execute super constructor
|
|
eve.Agent.call(this, id);
|
|
this.rpc = this.loadModule('rpc', this.rpcFunctions);
|
|
this.connect(eve.system.transports.getAll());
|
|
|
|
this.id = id;
|
|
this.type = this.id.replace('job_','');
|
|
this.globalStats = new DurationStats();
|
|
this.globalStats.getFakeStats(id);
|
|
|
|
this.agentStats = {};
|
|
|
|
this.allJobs = {}; // used to quickly look up a job ID
|
|
this.openJobs = {}; // used to check if there is a watcher to
|
|
this.closedJobs = {};// keeps a list of agentIds containing jobs, used for stats collection
|
|
// optimization would be nice with running averages, but N samples are needed, wait for demo data.
|
|
|
|
this.watchers = {};
|
|
}
|
|
|
|
// extend the eve.Agent prototype
|
|
JobAgent.prototype = Object.create(eve.Agent.prototype);
|
|
JobAgent.prototype.constructor = JobAgent;
|
|
|
|
// define RPC functions, preferably in a separated object to clearly distinct
|
|
// exposed functions from local functions.
|
|
JobAgent.prototype.rpcFunctions = {};
|
|
|
|
|
|
JobAgent.prototype.expandPrerequisites = function(prerequisites) {
|
|
var expanded = [];
|
|
if (prerequisites !== undefined) {
|
|
for (var i = 0; i < prerequisites.length; i++) {
|
|
var prereq = prerequisites[i];
|
|
if (typeof prereq == 'string') {
|
|
expanded.push({
|
|
jobId: prereq,
|
|
uuid: uuid(),
|
|
times : new DurationData(),
|
|
stats: new DurationStats()
|
|
});
|
|
}
|
|
else if (typeof prereq == 'object' && prereq.type !== undefined) { //&& prereq.agentId !== undefined not needed since the same items will be added when only type exists
|
|
prereq.uuid = uuid();
|
|
prereq.times = new DurationData();
|
|
prereq.stats = new DurationStats();
|
|
expanded.push(prereq);
|
|
}
|
|
else {
|
|
console.log('ERROR: cannot use the prerequisites! Not in array of strings or array of objects with correct fields format.');
|
|
throw new Error('ERROR: cannot use the prerequisites! Not in array of strings or array of objects with correct fields format.');
|
|
}
|
|
}
|
|
}
|
|
return expanded;
|
|
};
|
|
/**
|
|
* Create new Job for agent
|
|
* @param params
|
|
*/
|
|
JobAgent.prototype.rpcFunctions.add = function(params) {
|
|
var agentId = params.agentId;
|
|
var jobId = params.jobId;
|
|
|
|
// create stats if not yet exits
|
|
if (this.agentStats[agentId] === undefined) {
|
|
this.agentStats[agentId] = new DurationStats();
|
|
}
|
|
|
|
// create open job
|
|
if (this.openJobs[agentId] === undefined) {
|
|
this.openJobs[agentId] = {};
|
|
}
|
|
if (this.openJobs[agentId][jobId] !== undefined) {
|
|
console.log('cannot start new job, jobId:', jobId, ' already exists!');
|
|
throw new Error('cannot start new job, jobId:' + jobId + ' already exists!');
|
|
}
|
|
var prerequisites = this.expandPrerequisites(params.prerequisites);
|
|
this.openJobs[agentId][jobId] = new Job(jobId, this.id, params.time, agentId, prerequisites);
|
|
this.allJobs[jobId] = this.openJobs[agentId][jobId];
|
|
this.addWatchers(jobId, prerequisites);
|
|
|
|
|
|
// return prediction
|
|
var statsData;
|
|
if (this.agentStats[agentId].duration.mean == 0) {
|
|
statsData = this.globalStats.getData();
|
|
}
|
|
else {
|
|
statsData = this.agentStats[agentId].getData();
|
|
}
|
|
return statsData;
|
|
};
|
|
|
|
/**
|
|
* finish the job of an agent
|
|
* @param params
|
|
*/
|
|
JobAgent.prototype.rpcFunctions.finish = function(params) {
|
|
var agentId = params.agentId;
|
|
var jobId = params.jobId;
|
|
var job = this.openJobs[agentId][jobId];
|
|
|
|
// finish job
|
|
job.finish(params.time);
|
|
// notify watchers that a job is finished.
|
|
if (this.watchers[jobId] !== undefined) {
|
|
for (var i = 0; i < this.watchers[jobId].length; i++) {
|
|
var val = this.watchers[jobId][i];
|
|
var address = val.address;
|
|
var parentJobId = val.parentJobId;
|
|
var uuid = val.uuid;
|
|
this.rpc.request(address, {method:'watchedJobFinished', params:{
|
|
uuid: uuid,
|
|
parentJobId: parentJobId,
|
|
duration: job.duration.getData()
|
|
}}).done();
|
|
}
|
|
}
|
|
// cleanup watchers
|
|
delete this.watchers[jobId];
|
|
|
|
// move from open to closed jobs.
|
|
if (this.closedJobs[agentId] === undefined) {
|
|
this.closedJobs[agentId] = {};
|
|
}
|
|
if (this.closedJobs[agentId][jobId] !== undefined) {
|
|
console.log('cannot close job, jobId:', jobId, ' already exists!');
|
|
throw new Error('cannot close job, jobId:' + jobId + ' already exists!');
|
|
}
|
|
this.closedJobs[agentId][jobId] = this.openJobs[agentId][jobId];
|
|
|
|
delete this.openJobs[agentId][jobId];
|
|
|
|
this.updateStats();
|
|
|
|
return {
|
|
elapsedTime: this.closedJobs[agentId][jobId].elapsedTime,
|
|
elapsedTimeWithPause: this.closedJobs[agentId][jobId].elapsedTimeWithPause,
|
|
duration: this.closedJobs[agentId][jobId].duration.getData(),
|
|
prediction: this.globalStats.getData()
|
|
};
|
|
};
|
|
|
|
/**
|
|
* update the job of an agent
|
|
* @param params
|
|
*/
|
|
JobAgent.prototype.rpcFunctions.update = function(params) {
|
|
var agentId = params.agentId;
|
|
var jobId = params.jobId;
|
|
var job = this.openJobs[agentId][jobId];
|
|
var operation = params.operation;
|
|
|
|
switch (operation) {
|
|
case 'pause':
|
|
job.pause(params.time, false);
|
|
break;
|
|
case 'endOfDay':
|
|
job.pause(params.time, true);
|
|
break;
|
|
case 'startOfDay':
|
|
job.resume(params.time, true);
|
|
break;
|
|
case 'resume':
|
|
job.resume(params.time, false);
|
|
break;
|
|
}
|
|
return {jobId: jobId, type: this.type, elapsedTime: job.elapsedTime, elapsedTimeWithPause: job.elapsedTimeWithPause};
|
|
};
|
|
|
|
/**
|
|
* return agent stats
|
|
* @param params
|
|
* @returns {*}
|
|
*/
|
|
JobAgent.prototype.rpcFunctions.getAgentStats = function(params) {
|
|
return this.agentStats[params.agentId];
|
|
};
|
|
|
|
|
|
/**
|
|
* return global stats
|
|
* @param params
|
|
* @returns {{mean: number, std: number}|*}
|
|
*/
|
|
JobAgent.prototype.rpcFunctions.getGlobalStats = function(params) {
|
|
return this.globalStats;
|
|
};
|
|
|
|
JobAgent.prototype.rpcFunctions.watchedJobFinished = function(params) {
|
|
var jobId = params.parentJobId;
|
|
this.allJobs[jobId].prerequisiteFinished(params);
|
|
};
|
|
|
|
|
|
/**
|
|
*
|
|
* @param params
|
|
* @param sender
|
|
* @returns {*}
|
|
*/
|
|
JobAgent.prototype.rpcFunctions.addWatcherOnJobId = function(params, sender) {
|
|
var jobId = params.jobId;
|
|
var uuid = params.uuid;
|
|
var parentJobId = params.parentJobId;
|
|
var job = this.allJobs[jobId];
|
|
|
|
// if the job is already finished, call the finished callback
|
|
if (job.finished == true) {
|
|
this.rpc.request(sender, {method:'watchedJobFinished', params:{
|
|
uuid: uuid,
|
|
parentJobId: parentJobId,
|
|
duration: job.duration.getData() // we need the pure json data, not the class
|
|
}}).done();
|
|
}
|
|
else {
|
|
// we will create a watcher on a job which will alert the watcher when the job is finished with all the times.
|
|
if (this.watchers[jobId] === undefined) {
|
|
this.watchers[jobId] = [];
|
|
}
|
|
this.watchers[jobId].push({address: params.address, uuid: uuid});
|
|
}
|
|
|
|
// return the best prediction we have
|
|
if (this.agentStats[job.agentId].mean == 0) {
|
|
return this.globalStats.getData(); // we need the pure json data, not the class
|
|
}
|
|
return this.agentStats[job.agentId].getData(); // we need the pure json data, not the class
|
|
};
|
|
|
|
|
|
/**
|
|
*
|
|
* @param params
|
|
* @param sender
|
|
* @returns {*}
|
|
*/
|
|
JobAgent.prototype.rpcFunctions.addWatcherByAgentID = function(params, sender) {
|
|
var agentId = params.agentId;
|
|
var parentJobId = params.parentJobId;
|
|
var jobId = null;
|
|
var uuid = params.uuid;
|
|
var returnStats;
|
|
|
|
// see which statistics collection we will need to return.
|
|
if (this.agentStats[agentId].duration.mean == 0) {
|
|
returnStats = this.globalStats;
|
|
}
|
|
else {
|
|
returnStats = this.agentStats[agentId];
|
|
}
|
|
// see if we have an open job with that agent of this type
|
|
if (this.openJobs[agentId] !== undefined) {
|
|
for (var jId in this.openJobs[agentId]) {
|
|
if (this.openJobs[agentId].hasOwnProperty(jId)) {
|
|
jobId = jId;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
// there is no open job from supplied agent of this type. return the mean of the return stats
|
|
if (jobId === null) {
|
|
this.rpc.request(params.address, {method:'watchedJobFinished', params:{
|
|
uuid: uuid,
|
|
parentJobId: parentJobId,
|
|
duration: returnStats.getMeanData(), // we need the pure json data, not the class
|
|
oldData: true
|
|
}}).done();
|
|
}
|
|
else {
|
|
params.jobId = jobId;
|
|
this.rpcFunctions.addWatcherOnJobId.call(this, params, sender);
|
|
}
|
|
|
|
// return the best prediction we have
|
|
return returnStats.getData(); // we need the pure json data, not the class
|
|
};
|
|
|
|
/**
|
|
*
|
|
* @param params
|
|
* @param sender
|
|
* @returns {*}
|
|
*/
|
|
JobAgent.prototype.rpcFunctions.addWatcherByType = function(params, sender) {
|
|
// since we cannot watch a global type, we return the global stats at that point.
|
|
this.rpc.request(params.address, {method:'watchedJobFinished', params:{
|
|
uuid: params.uuid,
|
|
parentJobId: params.parentJobId,
|
|
duration: this.globalStats.getMeanData(), // we need the pure json data, not the class
|
|
oldData: true
|
|
}}).done();
|
|
return this.globalStats.getData(); // we need the pure json data, not the class
|
|
};
|
|
|
|
|
|
/**
|
|
*
|
|
* @param parentJobId | ID from the job that wants to WATCH other jobs
|
|
* @param prerequisites
|
|
*/
|
|
JobAgent.prototype.addWatchers = function(parentJobId, prerequisites) {
|
|
for (var i = 0; i < prerequisites.length; i++) {
|
|
var prereq = prerequisites[i];
|
|
var params = {
|
|
uuid: prereq.uuid,
|
|
address: this.id, // this is the callback address
|
|
parentJobId: parentJobId// this is the job that wants to watch the other one.
|
|
};
|
|
var me = this;
|
|
if (prereq.jobId !== undefined) {
|
|
// we now have a parentJobId to watch
|
|
// we first need to find the type of job this belongs to.
|
|
this.rpc.request('JobAgentGenerator', {method: 'returnJobAddress', params: {jobId: prereq.jobId}})
|
|
// now that we have an address, set a watcher on the job id
|
|
.done(function (address) {
|
|
if (address != 'doesNotExist') {
|
|
params.jobId = prereq.jobId; // this is the job we want to watch
|
|
me.rpc.request(address, {method: 'addWatcherOnJobId', params: params})
|
|
.done(function (preliminaryStats) {
|
|
me.allJobs[parentJobId].watchingPrerequisite(preliminaryStats, prereq.uuid);
|
|
})
|
|
}
|
|
else {
|
|
console.log('ERROR: watch job does not exist.');
|
|
throw new Error('ERROR: watch job does not exist.');
|
|
}
|
|
});
|
|
}
|
|
else if (prereq.agentId !== undefined && prereq.type !== undefined) {
|
|
// we now have an agentId and a jobType to watch.
|
|
params.agentId = prereq.agentId; // this is the job we want to watch
|
|
this.rpc.request(prereq.type, {method: 'addWatcherByAgentID', params: params})
|
|
.done(function (preliminaryStats) {
|
|
me.allJobs[parentJobId].watchingPrerequisite(preliminaryStats, prereq.uuid);
|
|
})
|
|
}
|
|
else if (prereq.type !== undefined) {
|
|
this.rpc.request(prereq.type, {method: 'addWatcherByType', params: params})
|
|
.done(function (preliminaryStats) {
|
|
me.allJobs[parentJobId].watchingPrerequisite(preliminaryStats, prereq.uuid);
|
|
})
|
|
}
|
|
}
|
|
};
|
|
|
|
JobAgent.prototype.updatePredictedStartup = function(jobId, prediction) {
|
|
var jobPrediction = this.allJobs[jobId].predictedStartupTime;
|
|
jobPrediction.mean = Math.max(jobPrediction.mean, prediction.mean);
|
|
jobPrediction.std = Math.sqrt(Math.pow(jobPrediction.std,2) + Math.pow(prediction.std,2));
|
|
|
|
this.allJobs[jobId].prerequisitesCount += 1;
|
|
};
|
|
|
|
|
|
/**
|
|
* Update all statistics
|
|
*
|
|
*/
|
|
JobAgent.prototype.updateStats = function() {
|
|
this.globalStats.clearStats();
|
|
|
|
var count = 0;
|
|
for (var agentId in this.closedJobs) {
|
|
if (this.closedJobs.hasOwnProperty(agentId)) {
|
|
var collection = this.closedJobs[agentId];
|
|
// could be optimised with rolling average for efficient memory management
|
|
this.agentStats[agentId].setData(this.updateStatsIn(collection));
|
|
this.globalStats.sumStats(this.agentStats[agentId]);
|
|
count += 1;
|
|
}
|
|
}
|
|
this.globalStats.averageStats(count);
|
|
};
|
|
|
|
|
|
/**
|
|
*
|
|
* @param collection
|
|
* @returns {{duration: *, durationWithPause: *, durationWithStartup: *, durationWithBoth: *}}
|
|
*/
|
|
JobAgent.prototype.updateStatsIn = function(collection) {
|
|
var stats = {};
|
|
for (var i = 0; i < this.globalStats.fields.length; i++) {
|
|
var field = this.globalStats.fields[i];
|
|
stats[field] = this.collectStatsIn(collection, field);
|
|
}
|
|
return stats;
|
|
};
|
|
|
|
|
|
JobAgent.prototype.collectStatsIn = function(collection, field) {
|
|
var total = 0;
|
|
var mean = 0;
|
|
var std = 0;
|
|
var minVal = 1e16;
|
|
var maxVal = 0;
|
|
var count = 0;
|
|
|
|
for (var jobId in collection) {
|
|
if (collection.hasOwnProperty(jobId)) {
|
|
var value = collection[jobId].duration[field];
|
|
maxVal = value > maxVal ? value : maxVal;
|
|
minVal = value < minVal ? value : minVal;
|
|
|
|
total += collection[jobId].duration[field];
|
|
count += 1;
|
|
}
|
|
}
|
|
if (count > 0) {
|
|
mean = total / count;
|
|
for (var jobId in collection) {
|
|
if (collection.hasOwnProperty(jobId)) {
|
|
std += Math.pow(collection[jobId].duration[field] - mean,2);
|
|
}
|
|
}
|
|
|
|
std = Math.sqrt(std/count);
|
|
return {mean: mean, std: std, min: minVal, max: maxVal};
|
|
}
|
|
else {
|
|
return {mean: 0, std: 0, min: 0, max: 0};
|
|
}
|
|
};
|
|
|
|
JobAgent.prototype.hasJob = function(params) {
|
|
return this.allJobs[params.jobId] !== undefined;
|
|
};
|