JGDIPlugin.java |
package jgdiplugin; import com.sun.grid.jgdi.EventClient; import com.sun.grid.jgdi.JGDI; import com.sun.grid.jgdi.JGDIException; import com.sun.grid.jgdi.JGDIFactory; import com.sun.grid.jgdi.configuration.ComplexEntry; import com.sun.grid.jgdi.configuration.Job; import com.sun.grid.jgdi.configuration.JobTask; import com.sun.grid.jgdi.configuration.Range; import com.sun.grid.jgdi.event.EventTypeEnum; import java.io.InputStream; import java.io.OutputStream; import java.io.PrintStream; import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.StringTokenizer; import java.util.Timer; import java.util.TimerTask; import jgdiplugin.accounting.ARCODatabase; import jgdiplugin.accounting.DailyUsageThread; import jgdiplugin.accounting.SGEAccountingThread; import plgrid.GridJobArgument; import plgrid.GridJobInfo; import plgrid.GridJobSubmitInfo; import plgrid.PipelineGridPlugin; import plgrid.exception.PLGrid_InvalidMethodException; public class JGDIPlugin extends PipelineGridPlugin { private static boolean TEST_MODE; private final String SGE_ROOT; private final String SGE_CELL; private final String SGE_PORT; private SGEAccountingThread sgeAccountingThread; private DailyUsageThread dailyUsageThread; private String finishedJobRetrievalMethod; private ARCODatabase arcoDatabase; public static final String JGDI_PLUGIN_VERSION = "3.4"; private static final int PING_QMASTER_INTERVAL_MS = 15000; private String bootstrapURL; private JGDI jgdi = null; private JGDIJobFinishListener jobFinishListener; private JGDIJobModListener jobModListener; private JGDIQmasterDownListener qmasterDownListener; private EventClient modEventClient; private EventClient finishEventClient; private EventClient downEventClient; private boolean isQmasterAlive = true; private Timer heartBeatTimer; public JGDIPlugin() { this.SGE_ROOT = System.getenv("SGE_ROOT"); this.SGE_CELL = System.getenv("SGE_CELL"); String sge_port = System.getenv("SGE_PORT"); if (sge_port == null) this.SGE_PORT = "6444"; else { this.SGE_PORT = sge_port; } System.out.println("SGE Root: " + this.SGE_ROOT); System.out.println("SGE Cell: " + this.SGE_CELL); System.out.println("SGE Port: " + this.SGE_PORT); this.bootstrapURL = "bootstrap://" + this.SGE_ROOT + "@" + this.SGE_CELL + ":" + this.SGE_PORT; try { this.jgdi = JGDIFactory.newSynchronizedInstance(this.bootstrapURL); if (!(TEST_MODE)) registerListeners(); } catch (Exception ex) { ex.printStackTrace(); return; } if (!(TEST_MODE)) { this.sgeAccountingThread = new SGEAccountingThread(this.SGE_ROOT, this.SGE_CELL); this.sgeAccountingThread.setName("SGEAccountingThread"); this.sgeAccountingThread.start(); } HeartBeatTimerTask tt = new HeartBeatTimerTask(); this.heartBeatTimer = new Timer(); this.heartBeatTimer.scheduleAtFixedRate(tt, 15000L, 15000L); System.out.println("JGDIPlugin (version: 3.4) started."); } public void setPreferences(Map<String, String> prefs) { super.setPreferences(prefs); this.finishedJobRetrievalMethod = ((String) prefs.get("GridFinishedJobRetrievalMethod")); if ((this.finishedJobRetrievalMethod == null) || (this.finishedJobRetrievalMethod.trim().length() == 0)) { if (this.arcoDatabase != null) { this.arcoDatabase = new ARCODatabase(this); this.arcoDatabase.shutdown(); } if (this.sgeAccountingThread == null) { this.sgeAccountingThread = new SGEAccountingThread(this.SGE_ROOT, this.SGE_CELL); this.sgeAccountingThread.start(); } } else if (this.finishedJobRetrievalMethod.toLowerCase().equals("arco")) { if (this.sgeAccountingThread != null) { this.sgeAccountingThread.shutdown(); } if (this.arcoDatabase == null) { this.arcoDatabase = new ARCODatabase(this); } } String usageDBUrl = (String) prefs.get("GridUsageDBURL"); String usageDBUser = (String) prefs.get("GridUsageDBUser"); if ((usageDBUrl == null) || (usageDBUrl.trim().isEmpty()) || (usageDBUser == null) || (usageDBUser.trim().isEmpty())) return; String usageDBPass = (String) prefs.get("GridUsageDBPass"); if ((TEST_MODE) || (this.dailyUsageThread != null)) return; try { this.dailyUsageThread = new DailyUsageThread(this.sgeAccountingThread, usageDBUrl, usageDBUser, usageDBPass); this.dailyUsageThread.setPriority(1); this.dailyUsageThread.start(); } catch (Exception ex) { } } private void registerListeners() throws JGDIException { this.jobFinishListener = new JGDIJobFinishListener(this); this.finishEventClient = JGDIFactory.createEventClient(this.bootstrapURL, 0); this.finishEventClient.subscribe(EventTypeEnum.JobFinalUsage); this.finishEventClient.subscribe(EventTypeEnum.JobDel); this.finishEventClient.subscribe(EventTypeEnum.JobTaskDel); this.finishEventClient.commit(); this.finishEventClient.addEventListener(this.jobFinishListener); this.jobModListener = new JGDIJobModListener(this); this.modEventClient = JGDIFactory.createEventClient(this.bootstrapURL, 0); this.modEventClient.subscribe(EventTypeEnum.JobTaskMod); this.modEventClient.commit(); this.modEventClient.addEventListener(this.jobModListener); this.qmasterDownListener = new JGDIQmasterDownListener(this); this.downEventClient = JGDIFactory.createEventClient(this.bootstrapURL, 0); this.downEventClient.subscribe(EventTypeEnum.QmasterGoesDown); this.downEventClient.commit(); this.downEventClient.addEventListener(this.qmasterDownListener); } private boolean isQmasterAlive() { return this.isQmasterAlive; } public void pingQmaster() { if (!(this.SGE_ROOT.endsWith("6.2u1"))) return; try { JGDI j = JGDIFactory.newSynchronizedInstance(this.bootstrapURL); if (!(this.isQmasterAlive)) { System.err.println(new Date() + ": S U C C E S S: Qmaster restored"); this.isQmasterAlive = true; this.jgdi = j; registerListeners(); } else { j.close(); } return; } catch (Exception ex) { if (this.isQmasterAlive) { System.err.println(new Date() + ": W A R N I N G: Qmaster CRASH detected"); this.isQmasterAlive = false; try { this.finishEventClient.close(); this.modEventClient.close(); this.downEventClient.close(); } catch (Exception ex2) { ex2.printStackTrace(); } } } } public synchronized void waitForQmasterAlive() { pingQmaster(); while (!(this.isQmasterAlive)) try { Thread.sleep(15000L); } catch (Exception ex) { } } public String submitJob(GridJobSubmitInfo gji) { String ret = ""; Process process = null; int offset = 9; long sleepTime = 2000L; boolean needsContinue = false; String err = ""; int i = 1; do { if (i > 1) { try { Thread.sleep(sleepTime); } catch (Exception ex) { } } try { StringBuilder cmd = new StringBuilder(); String username = gji.getUsername(); if (username == null) { throw new Exception("Failed to get Username"); } if (gji.getPrivilegeEscalation()) { cmd.append("sudo -E -u "); cmd.append(username); cmd.append(" "); } String executableLocation = gji.getCommand(); if (executableLocation == null) { throw new Exception("Failed to get Executable Location"); } List<GridJobArgument> arguments = gji.getArguments(); if ((arguments == null) || (arguments.contains(null))) { throw new Exception("Failed to get command line arguments"); } cmd.append("qsub "); if (gji.getSubmissionType() == GridJobSubmitInfo.SUBMISSION_ARRAY) { cmd.append(gji.getCommand()); offset += 6; } else { cmd.append(gji.getNativeSpecification()); cmd.append(" -o "); cmd.append(gji.getOutputPath()); cmd.append(" -e "); cmd.append(gji.getErrorPath()); Properties envProperties = gji.getEnvironmentProperties(); String[] environment = null; int ei; if (envProperties != null) { environment = new String[envProperties.size()]; ei = 0; for (String varName : envProperties.stringPropertyNames()) { StringBuilder sb = new StringBuilder(varName); sb.append("="); sb.append(envProperties.getProperty(varName)); environment[(ei++)] = sb.toString(); } } if ((environment != null) && (environment.length > 0)) { cmd.append(" -v "); int s_index = 0; for (String s : environment) { if (s_index > 0) { cmd.append(","); } cmd.append(s); ++s_index; } cmd.append(" "); } cmd.append(" "); cmd.append(gji.getCommand()); for (GridJobArgument arg : arguments) { String argValue = arg.getValue(); if (argValue != null) { cmd.append(" "); cmd.append(argValue); } } } if (!(isQmasterAlive())) { waitForQmasterAlive(); needsContinue = true; /* if (process == null) break label1181; releaseProcess(process); process = null; break label1181: */ } StringTokenizer st = new StringTokenizer(cmd.toString()); String[] command = new String[st.countTokens()]; for (int k = 0; k < command.length; ++k) { command[k] = st.nextToken(); } ProcessBuilder pb = new ProcessBuilder(command); Map<String, String> env = pb.environment(); Properties props = gji.getEnvironmentProperties(); if (props != null) { for (String varName : props.stringPropertyNames()) { env.put(varName, props.getProperty(varName)); } } process = pb.start(); byte[] buff = new byte[100]; byte[] errBuff = new byte[255]; process.getInputStream().read(buff); process.getErrorStream().read(errBuff); err = new String(errBuff); releaseProcess(process); process = null; if ((err.contains("can't connect to service")) || (err.contains("got read error"))) { waitForQmasterAlive(); needsContinue = true; /* if (process == null) break label1181; releaseProcess(process); process = null; break label1181: */ } String response = new String(buff); for (String s : response.split("\n")) if (s.startsWith("Your job")) { String sub = s.substring(offset); int spaceIndex = sub.indexOf(" "); if (spaceIndex == -1) break; String jobId = sub.substring(0, spaceIndex); String str1 = jobId; return str1; } if (err.trim().length() > 0) { err = err + "\n"; } err = err + "ERROR: " + response; } catch (Exception ex) { ex.printStackTrace(); StringBuilder errorMsg = new StringBuilder("Unable to submit job. Internal error occurred\n"); errorMsg.append("\n Date: ").append(new Date().toString()); errorMsg.append("\n Reason: ").append(ex.getMessage()); err = errorMsg.toString(); } finally { if (process != null) { releaseProcess(process); process = null; } } System.err.println(new Date() + ": Attempt " + i + ": ERROR While submitting job: " + err); ret = ret + "Attempt " + (i++) + ": " + err + "\n"; if (i < 6) { needsContinue = true; label1181: if (i > 2) sleepTime *= 2L; } else { return "ERROR:" + ret; } } while (needsContinue); return "ERROR:" + ret; } public List<GridJobInfo> getJobList(String complexVariables) { List<GridJobInfo> ret = new LinkedList<>(); if (this.jgdi == null) { System.err.println("JGDI is not properly initialized, returning NULL."); return null; } Iterator i$; try { List<Job> qJobs = this.jgdi.getJobList(); if (TEST_MODE) { System.out.println("qJobs.size = " + qJobs.size()); } Map<String, String> complexVars = new HashMap<>(); if (complexVariables != null) { String[] complexVarStringTokens = complexVariables.split(","); for (String str : complexVarStringTokens) { if (str.trim().length() > 0) { String[] var = str.trim().split("="); if (var.length == 1) { if (var[0].trim().length() > 0) complexVars.put(var[0], "true"); } else { if ((var.length != 2) || (var[0].trim().length() <= 0)) continue; if (var[1].trim().length() > 0) complexVars.put(var[0].trim(), var[1].trim()); else { complexVars.put(var[0].trim(), "true"); } } } } } List<Job> plJobs = new LinkedList<>(); for (Job j : qJobs) { boolean sameComplexVars = true; Map<String, String> jobComplexVars; if (!(complexVars.isEmpty())) { String prefix = (String) complexVars.get("GridJobNamePrefix"); if (prefix != null) { if (!(j.getJobName().startsWith(prefix))) sameComplexVars = false; } else { jobComplexVars = new HashMap<>(); for (int i = 0; i < j.getHardResourceCount(); ++i) { ComplexEntry ce = j.getHardResource(i); String name = ce.getName(); String val = ce.getStringval(); jobComplexVars.put(name, val); } for (String cvn : complexVars.keySet()) { if (!(jobComplexVars.containsKey(cvn))) { sameComplexVars = false; break; } String jcvv = (String) jobComplexVars.get(cvn); if (!(jcvv.equalsIgnoreCase((String) complexVars.get(cvn)))) { sameComplexVars = false; break; } } } } if (sameComplexVars) { plJobs.add(j); } } if (TEST_MODE) { System.out.println("plJobs.size = " + qJobs.size()); for (Job j : plJobs) { System.out.println(j.getJobNumber()); } } for (i$ = plJobs.iterator(); i$.hasNext();) { Job j = (Job) i$.next(); String jobId = String.valueOf(j.getJobNumber()); List<JobTask> taskList = j.getJaTasksList(); if (taskList != null) if (taskList.isEmpty()) { Range range = j.getJaStructure(0); if (TEST_MODE) { System.out.println("==================== JOB " + j.getJobNumber() + " ========="); System.out.println(j.dump()); System.out.println("============================================"); } for (int i = range.getMin(); i <= range.getMax(); i += range.getStep()) ret.add(getJobInfo(jobId + "." + i, j)); } else { for (JobTask task : taskList) { GridJobInfo gji = getJobInfo(jobId + "." + task.getTaskNumber(), j); if (gji != null) ret.add(gji); } } } } catch (JGDIException ex) { Job j; String jobId; ex.printStackTrace(); } return ret; } private GridJobInfo getTaskInfo(GridJobInfo gji, Job j, int taskId) { for (JobTask jt : j.getJaTasksList()) { int taskNum = jt.getTaskNumber(); if (taskNum == taskId) { try { Double d = Double.valueOf(jt.getUsage("end_time")); if (d.longValue() > 0L) return null; } catch (Exception ex) { } gji.setStartTime(jt.getStartTime() * 1000L); gji.setState(2); return gji; } } for (Range r : j.getJaNHIdsList()) { for (int i = r.getMin(); i <= r.getMax(); i += r.getStep()) { if (taskId == i) { gji.setQueuedTime(j.getSubmissionTime() * 1000L); gji.setState(1); return gji; } } } return null; } private GridJobInfo getJobInfo(String jobId, Job j) { GridJobInfo gji = new GridJobInfo(jobId); int taskId = 1; String jobIdOnly = jobId; if (jobId.contains(".")) { String strTaskId = jobId.substring(jobId.indexOf(".") + 1); taskId = Integer.valueOf(strTaskId).intValue(); jobIdOnly = jobId.substring(0, jobId.indexOf(".")); } try { if (j == null) { j = this.jgdi.getJob(Integer.valueOf(jobIdOnly).intValue()); } if (j != null) { GridJobInfo ret = getTaskInfo(gji, j, taskId); if (ret != null) { for (int i = 0; i < j.getHardResourceCount(); ++i) { ComplexEntry ce = j.getHardResource(i); ret.addComplexVariable(ce.getName(), ce.getStringval()); } return ret; } } GridJobInfo fji = getFinishedJobInfo(jobId); if (fji != null) { return fji; } } catch (JGDIException ex) { ex.printStackTrace(); if (ex.getMessage().contains("unable to contact qmaster using port")) { this.isQmasterAlive = false; System.err.println("Qmaster is unavailable. "); } } catch (Exception ex) { ex.printStackTrace(); } return gji; } public GridJobInfo getJobInfo(String jobId) { return getJobInfo(jobId, null); } private GridJobInfo getFinishedJobInfo(String jobId) throws PLGrid_InvalidMethodException { GridJobInfo fji = null; if ((this.finishedJobRetrievalMethod == null) || (this.finishedJobRetrievalMethod.trim().length() == 0)) { fji = this.sgeAccountingThread.getFinishedJobInfo(jobId); } else if (this.finishedJobRetrievalMethod.toLowerCase().equals("arco")) { if (this.arcoDatabase == null) { this.arcoDatabase = new ARCODatabase(this); } fji = this.arcoDatabase.getFinishedJobInfo(jobId); } else { throw new PLGrid_InvalidMethodException("Method \"" + this.finishedJobRetrievalMethod + "\" is not supported by this plugin for obtaining finished job information."); } return fji; } public void killJob(String jobId, String username, boolean force) { if (username != null) { StringBuilder cmd = new StringBuilder(); cmd.append("sudo -u "); cmd.append(username); cmd.append(" "); cmd.append("qdel "); if (force) { cmd.append("-f "); } cmd.append(jobId); Process process = null; try { process = Runtime.getRuntime().exec(cmd.toString()); process.waitFor(); } catch (Exception ex) { ex.printStackTrace(); StringBuilder errorMsg = new StringBuilder("Unable to delete job "); errorMsg.append(jobId); errorMsg.append("\n Date: ").append(new Date().toString()); errorMsg.append("\n Reason: ").append(ex.getMessage()); System.err.println(errorMsg); } finally { if (process != null) { releaseProcess(process); process = null; } } } else { try { if (jobId.contains(".")) { List answers = new LinkedList(); String[] tasks = { jobId }; this.jgdi.deleteJobsWithAnswer(tasks, force, null, answers); } else { this.jgdi.deleteJob(Integer.valueOf(jobId).intValue()); } } catch (Exception ex) { if (!(ex.getMessage().contains("does not exist"))) ex.printStackTrace(); } } } private void releaseProcess(Process p) { Exception ex = null; if (p != null) { try { p.getInputStream().close(); } catch (Exception iex) { ex = iex; } try { p.getOutputStream().close(); } catch (Exception oex) { ex = oex; } try { p.getErrorStream().close(); } catch (Exception eex) { ex = eex; } try { p.destroy(); } catch (Exception dex) { ex = dex; } } if (ex != null) ex.printStackTrace(); } public static void main(String[] args) { System.out.println("PipelineGridPlugin version 3.1"); System.out.println("JGDI Plugin version 3.4"); TEST_MODE = true; } private class HeartBeatTimerTask extends TimerTask { public void run() { JGDIPlugin.this.pingQmaster(); } } }