vis.js is a dynamic, browser-based visualization library
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

434 lines
13 KiB

10 years ago
  1. 'use strict';
  2. function JobAgent(id) {
  3. // execute super constructor
  4. eve.Agent.call(this, id);
  5. this.rpc = this.loadModule('rpc', this.rpcFunctions);
  6. this.connect(eve.system.transports.getAll());
  7. this.id = id;
  8. this.type = this.id.replace('job_','');
  9. this.globalStats = new DurationStats();
  10. this.globalStats.getFakeStats(id);
  11. this.agentStats = {};
  12. this.allJobs = {}; // used to quickly look up a job ID
  13. this.openJobs = {}; // used to check if there is a watcher to
  14. this.closedJobs = {};// keeps a list of agentIds containing jobs, used for stats collection
  15. // optimization would be nice with running averages, but N samples are needed, wait for demo data.
  16. this.watchers = {};
  17. }
  18. // extend the eve.Agent prototype
  19. JobAgent.prototype = Object.create(eve.Agent.prototype);
  20. JobAgent.prototype.constructor = JobAgent;
  21. // define RPC functions, preferably in a separated object to clearly distinct
  22. // exposed functions from local functions.
  23. JobAgent.prototype.rpcFunctions = {};
  24. JobAgent.prototype.expandPrerequisites = function(prerequisites) {
  25. var expanded = [];
  26. if (prerequisites !== undefined) {
  27. for (var i = 0; i < prerequisites.length; i++) {
  28. var prereq = prerequisites[i];
  29. if (typeof prereq == 'string') {
  30. expanded.push({
  31. jobId: prereq,
  32. uuid: uuid(),
  33. times : new DurationData(),
  34. stats: new DurationStats()
  35. });
  36. }
  37. else if (typeof prereq == 'object' && prereq.type !== undefined) { //&& prereq.agentId !== undefined not needed since the same items will be added when only type exists
  38. prereq.uuid = uuid();
  39. prereq.times = new DurationData();
  40. prereq.stats = new DurationStats();
  41. expanded.push(prereq);
  42. }
  43. else {
  44. console.log('ERROR: cannot use the prerequisites! Not in array of strings or array of objects with correct fields format.');
  45. throw new Error('ERROR: cannot use the prerequisites! Not in array of strings or array of objects with correct fields format.');
  46. }
  47. }
  48. }
  49. return expanded;
  50. };
  51. /**
  52. * Create new Job for agent
  53. * @param params
  54. */
  55. JobAgent.prototype.rpcFunctions.add = function(params) {
  56. var agentId = params.agentId;
  57. var jobId = params.jobId;
  58. // create stats if not yet exits
  59. if (this.agentStats[agentId] === undefined) {
  60. this.agentStats[agentId] = new DurationStats();
  61. }
  62. // create open job
  63. if (this.openJobs[agentId] === undefined) {
  64. this.openJobs[agentId] = {};
  65. }
  66. if (this.openJobs[agentId][jobId] !== undefined) {
  67. console.log('cannot start new job, jobId:', jobId, ' already exists!');
  68. throw new Error('cannot start new job, jobId:' + jobId + ' already exists!');
  69. }
  70. var prerequisites = this.expandPrerequisites(params.prerequisites);
  71. this.openJobs[agentId][jobId] = new Job(jobId, this.id, params.time, agentId, prerequisites);
  72. this.allJobs[jobId] = this.openJobs[agentId][jobId];
  73. this.addWatchers(jobId, prerequisites);
  74. // return prediction
  75. var statsData;
  76. if (this.agentStats[agentId].duration.mean == 0) {
  77. statsData = this.globalStats.getData();
  78. }
  79. else {
  80. statsData = this.agentStats[agentId].getData();
  81. }
  82. return statsData;
  83. };
  84. /**
  85. * finish the job of an agent
  86. * @param params
  87. */
  88. JobAgent.prototype.rpcFunctions.finish = function(params) {
  89. var agentId = params.agentId;
  90. var jobId = params.jobId;
  91. var job = this.openJobs[agentId][jobId];
  92. // finish job
  93. job.finish(params.time);
  94. // notify watchers that a job is finished.
  95. if (this.watchers[jobId] !== undefined) {
  96. for (var i = 0; i < this.watchers[jobId].length; i++) {
  97. var val = this.watchers[jobId][i];
  98. var address = val.address;
  99. var parentJobId = val.parentJobId;
  100. var uuid = val.uuid;
  101. this.rpc.request(address, {method:'watchedJobFinished', params:{
  102. uuid: uuid,
  103. parentJobId: parentJobId,
  104. duration: job.duration.getData()
  105. }}).done();
  106. }
  107. }
  108. // cleanup watchers
  109. delete this.watchers[jobId];
  110. // move from open to closed jobs.
  111. if (this.closedJobs[agentId] === undefined) {
  112. this.closedJobs[agentId] = {};
  113. }
  114. if (this.closedJobs[agentId][jobId] !== undefined) {
  115. console.log('cannot close job, jobId:', jobId, ' already exists!');
  116. throw new Error('cannot close job, jobId:' + jobId + ' already exists!');
  117. }
  118. this.closedJobs[agentId][jobId] = this.openJobs[agentId][jobId];
  119. delete this.openJobs[agentId][jobId];
  120. this.updateStats();
  121. return {
  122. elapsedTime: this.closedJobs[agentId][jobId].elapsedTime,
  123. elapsedTimeWithPause: this.closedJobs[agentId][jobId].elapsedTimeWithPause,
  124. duration: this.closedJobs[agentId][jobId].duration.getData(),
  125. prediction: this.globalStats.getData()
  126. };
  127. };
  128. /**
  129. * update the job of an agent
  130. * @param params
  131. */
  132. JobAgent.prototype.rpcFunctions.update = function(params) {
  133. var agentId = params.agentId;
  134. var jobId = params.jobId;
  135. var job = this.openJobs[agentId][jobId];
  136. var operation = params.operation;
  137. switch (operation) {
  138. case 'pause':
  139. job.pause(params.time, false);
  140. break;
  141. case 'endOfDay':
  142. job.pause(params.time, true);
  143. break;
  144. case 'startOfDay':
  145. job.resume(params.time, true);
  146. break;
  147. case 'resume':
  148. job.resume(params.time, false);
  149. break;
  150. }
  151. return {jobId: jobId, type: this.type, elapsedTime: job.elapsedTime, elapsedTimeWithPause: job.elapsedTimeWithPause};
  152. };
  153. /**
  154. * return agent stats
  155. * @param params
  156. * @returns {*}
  157. */
  158. JobAgent.prototype.rpcFunctions.getAgentStats = function(params) {
  159. return this.agentStats[params.agentId];
  160. };
  161. /**
  162. * return global stats
  163. * @param params
  164. * @returns {{mean: number, std: number}|*}
  165. */
  166. JobAgent.prototype.rpcFunctions.getGlobalStats = function(params) {
  167. return this.globalStats;
  168. };
  169. JobAgent.prototype.rpcFunctions.watchedJobFinished = function(params) {
  170. var jobId = params.parentJobId;
  171. this.allJobs[jobId].prerequisiteFinished(params);
  172. };
  173. /**
  174. *
  175. * @param params
  176. * @param sender
  177. * @returns {*}
  178. */
  179. JobAgent.prototype.rpcFunctions.addWatcherOnJobId = function(params, sender) {
  180. var jobId = params.jobId;
  181. var uuid = params.uuid;
  182. var parentJobId = params.parentJobId;
  183. var job = this.allJobs[jobId];
  184. // if the job is already finished, call the finished callback
  185. if (job.finished == true) {
  186. this.rpc.request(sender, {method:'watchedJobFinished', params:{
  187. uuid: uuid,
  188. parentJobId: parentJobId,
  189. duration: job.duration.getData() // we need the pure json data, not the class
  190. }}).done();
  191. }
  192. else {
  193. // we will create a watcher on a job which will alert the watcher when the job is finished with all the times.
  194. if (this.watchers[jobId] === undefined) {
  195. this.watchers[jobId] = [];
  196. }
  197. this.watchers[jobId].push({address: params.address, uuid: uuid});
  198. }
  199. // return the best prediction we have
  200. if (this.agentStats[job.agentId].mean == 0) {
  201. return this.globalStats.getData(); // we need the pure json data, not the class
  202. }
  203. return this.agentStats[job.agentId].getData(); // we need the pure json data, not the class
  204. };
  205. /**
  206. *
  207. * @param params
  208. * @param sender
  209. * @returns {*}
  210. */
  211. JobAgent.prototype.rpcFunctions.addWatcherByAgentID = function(params, sender) {
  212. var agentId = params.agentId;
  213. var parentJobId = params.parentJobId;
  214. var jobId = null;
  215. var uuid = params.uuid;
  216. var returnStats;
  217. // see which statistics collection we will need to return.
  218. if (this.agentStats[agentId].duration.mean == 0) {
  219. returnStats = this.globalStats;
  220. }
  221. else {
  222. returnStats = this.agentStats[agentId];
  223. }
  224. // see if we have an open job with that agent of this type
  225. if (this.openJobs[agentId] !== undefined) {
  226. for (var jId in this.openJobs[agentId]) {
  227. if (this.openJobs[agentId].hasOwnProperty(jId)) {
  228. jobId = jId;
  229. break;
  230. }
  231. }
  232. }
  233. // there is no open job from supplied agent of this type. return the mean of the return stats
  234. if (jobId === null) {
  235. this.rpc.request(params.address, {method:'watchedJobFinished', params:{
  236. uuid: uuid,
  237. parentJobId: parentJobId,
  238. duration: returnStats.getMeanData(), // we need the pure json data, not the class
  239. oldData: true
  240. }}).done();
  241. }
  242. else {
  243. params.jobId = jobId;
  244. this.rpcFunctions.addWatcherOnJobId.call(this, params, sender);
  245. }
  246. // return the best prediction we have
  247. return returnStats.getData(); // we need the pure json data, not the class
  248. };
  249. /**
  250. *
  251. * @param params
  252. * @param sender
  253. * @returns {*}
  254. */
  255. JobAgent.prototype.rpcFunctions.addWatcherByType = function(params, sender) {
  256. // since we cannot watch a global type, we return the global stats at that point.
  257. this.rpc.request(params.address, {method:'watchedJobFinished', params:{
  258. uuid: params.uuid,
  259. parentJobId: params.parentJobId,
  260. duration: this.globalStats.getMeanData(), // we need the pure json data, not the class
  261. oldData: true
  262. }}).done();
  263. return this.globalStats.getData(); // we need the pure json data, not the class
  264. };
  265. /**
  266. *
  267. * @param parentJobId | ID from the job that wants to WATCH other jobs
  268. * @param prerequisites
  269. */
  270. JobAgent.prototype.addWatchers = function(parentJobId, prerequisites) {
  271. for (var i = 0; i < prerequisites.length; i++) {
  272. var prereq = prerequisites[i];
  273. var params = {
  274. uuid: prereq.uuid,
  275. address: this.id, // this is the callback address
  276. parentJobId: parentJobId// this is the job that wants to watch the other one.
  277. };
  278. var me = this;
  279. if (prereq.jobId !== undefined) {
  280. // we now have a parentJobId to watch
  281. // we first need to find the type of job this belongs to.
  282. this.rpc.request('JobAgentGenerator', {method: 'returnJobAddress', params: {jobId: prereq.jobId}})
  283. // now that we have an address, set a watcher on the job id
  284. .done(function (address) {
  285. if (address != 'doesNotExist') {
  286. params.jobId = prereq.jobId; // this is the job we want to watch
  287. me.rpc.request(address, {method: 'addWatcherOnJobId', params: params})
  288. .done(function (preliminaryStats) {
  289. me.allJobs[parentJobId].watchingPrerequisite(preliminaryStats, prereq.uuid);
  290. })
  291. }
  292. else {
  293. console.log('ERROR: watch job does not exist.');
  294. throw new Error('ERROR: watch job does not exist.');
  295. }
  296. });
  297. }
  298. else if (prereq.agentId !== undefined && prereq.type !== undefined) {
  299. // we now have an agentId and a jobType to watch.
  300. params.agentId = prereq.agentId; // this is the job we want to watch
  301. this.rpc.request(prereq.type, {method: 'addWatcherByAgentID', params: params})
  302. .done(function (preliminaryStats) {
  303. me.allJobs[parentJobId].watchingPrerequisite(preliminaryStats, prereq.uuid);
  304. })
  305. }
  306. else if (prereq.type !== undefined) {
  307. this.rpc.request(prereq.type, {method: 'addWatcherByType', params: params})
  308. .done(function (preliminaryStats) {
  309. me.allJobs[parentJobId].watchingPrerequisite(preliminaryStats, prereq.uuid);
  310. })
  311. }
  312. }
  313. };
  314. JobAgent.prototype.updatePredictedStartup = function(jobId, prediction) {
  315. var jobPrediction = this.allJobs[jobId].predictedStartupTime;
  316. jobPrediction.mean = Math.max(jobPrediction.mean, prediction.mean);
  317. jobPrediction.std = Math.sqrt(Math.pow(jobPrediction.std,2) + Math.pow(prediction.std,2));
  318. this.allJobs[jobId].prerequisitesCount += 1;
  319. };
  320. /**
  321. * Update all statistics
  322. *
  323. */
  324. JobAgent.prototype.updateStats = function() {
  325. this.globalStats.clearStats();
  326. var count = 0;
  327. for (var agentId in this.closedJobs) {
  328. if (this.closedJobs.hasOwnProperty(agentId)) {
  329. var collection = this.closedJobs[agentId];
  330. // could be optimised with rolling average for efficient memory management
  331. this.agentStats[agentId].setData(this.updateStatsIn(collection));
  332. this.globalStats.sumStats(this.agentStats[agentId]);
  333. count += 1;
  334. }
  335. }
  336. this.globalStats.averageStats(count);
  337. };
  338. /**
  339. *
  340. * @param collection
  341. * @returns {{duration: *, durationWithPause: *, durationWithStartup: *, durationWithBoth: *}}
  342. */
  343. JobAgent.prototype.updateStatsIn = function(collection) {
  344. var stats = {};
  345. for (var i = 0; i < this.globalStats.fields.length; i++) {
  346. var field = this.globalStats.fields[i];
  347. stats[field] = this.collectStatsIn(collection, field);
  348. }
  349. return stats;
  350. };
  351. JobAgent.prototype.collectStatsIn = function(collection, field) {
  352. var total = 0;
  353. var mean = 0;
  354. var std = 0;
  355. var minVal = 1e16;
  356. var maxVal = 0;
  357. var count = 0;
  358. for (var jobId in collection) {
  359. if (collection.hasOwnProperty(jobId)) {
  360. var value = collection[jobId].duration[field];
  361. maxVal = value > maxVal ? value : maxVal;
  362. minVal = value < minVal ? value : minVal;
  363. total += collection[jobId].duration[field];
  364. count += 1;
  365. }
  366. }
  367. if (count > 0) {
  368. mean = total / count;
  369. for (var jobId in collection) {
  370. if (collection.hasOwnProperty(jobId)) {
  371. std += Math.pow(collection[jobId].duration[field] - mean,2);
  372. }
  373. }
  374. std = Math.sqrt(std/count);
  375. return {mean: mean, std: std, min: minVal, max: maxVal};
  376. }
  377. else {
  378. return {mean: 0, std: 0, min: 0, max: 0};
  379. }
  380. };
  381. JobAgent.prototype.hasJob = function(params) {
  382. return this.allJobs[params.jobId] !== undefined;
  383. };