Pipeline Grid Plugin Implementation Example



JGDIPlugin.java




JGDIPlugin.java

package jgdiplugin;

import com.sun.grid.jgdi.EventClient;
import com.sun.grid.jgdi.JGDI;
import com.sun.grid.jgdi.JGDIException;
import com.sun.grid.jgdi.JGDIFactory;
import com.sun.grid.jgdi.configuration.ComplexEntry;
import com.sun.grid.jgdi.configuration.Job;
import com.sun.grid.jgdi.configuration.JobTask;
import com.sun.grid.jgdi.configuration.Range;
import com.sun.grid.jgdi.event.EventTypeEnum;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.StringTokenizer;
import java.util.Timer;
import java.util.TimerTask;
import jgdiplugin.accounting.ARCODatabase;
import jgdiplugin.accounting.DailyUsageThread;
import jgdiplugin.accounting.SGEAccountingThread;
import plgrid.GridJobArgument;
import plgrid.GridJobInfo;
import plgrid.GridJobSubmitInfo;
import plgrid.PipelineGridPlugin;
import plgrid.exception.PLGrid_InvalidMethodException;

public class JGDIPlugin extends PipelineGridPlugin {
	private static boolean TEST_MODE;
	private final String SGE_ROOT;
	private final String SGE_CELL;
	private final String SGE_PORT;
	private SGEAccountingThread sgeAccountingThread;
	private DailyUsageThread dailyUsageThread;
	private String finishedJobRetrievalMethod;
	private ARCODatabase arcoDatabase;
	public static final String JGDI_PLUGIN_VERSION = "3.4";
	private static final int PING_QMASTER_INTERVAL_MS = 15000;
	private String bootstrapURL;
	private JGDI jgdi = null;
	private JGDIJobFinishListener jobFinishListener;
	private JGDIJobModListener jobModListener;
	private JGDIQmasterDownListener qmasterDownListener;
	private EventClient modEventClient;
	private EventClient finishEventClient;
	private EventClient downEventClient;
	private boolean isQmasterAlive = true;
	private Timer heartBeatTimer;

	public JGDIPlugin() {
		this.SGE_ROOT = System.getenv("SGE_ROOT");
		this.SGE_CELL = System.getenv("SGE_CELL");

		String sge_port = System.getenv("SGE_PORT");

		if (sge_port == null)
			this.SGE_PORT = "6444";
		else {
			this.SGE_PORT = sge_port;
		}

		System.out.println("SGE Root: " + this.SGE_ROOT);
		System.out.println("SGE Cell: " + this.SGE_CELL);
		System.out.println("SGE Port: " + this.SGE_PORT);

		this.bootstrapURL = "bootstrap://" + this.SGE_ROOT + "@" + this.SGE_CELL + ":" + this.SGE_PORT;
		try {
			this.jgdi = JGDIFactory.newSynchronizedInstance(this.bootstrapURL);
			if (!(TEST_MODE))
				registerListeners();
		} catch (Exception ex) {
			ex.printStackTrace();
			return;
		}

		if (!(TEST_MODE)) {
			this.sgeAccountingThread = new SGEAccountingThread(this.SGE_ROOT, this.SGE_CELL);
			this.sgeAccountingThread.setName("SGEAccountingThread");
			this.sgeAccountingThread.start();
		}

		HeartBeatTimerTask tt = new HeartBeatTimerTask();

		this.heartBeatTimer = new Timer();
		this.heartBeatTimer.scheduleAtFixedRate(tt, 15000L, 15000L);

		System.out.println("JGDIPlugin (version: 3.4) started.");
	}

	public void setPreferences(Map<String, String> prefs) {
		super.setPreferences(prefs);

		this.finishedJobRetrievalMethod = ((String) prefs.get("GridFinishedJobRetrievalMethod"));

		if ((this.finishedJobRetrievalMethod == null) || (this.finishedJobRetrievalMethod.trim().length() == 0)) {
			if (this.arcoDatabase != null) {
				this.arcoDatabase = new ARCODatabase(this);
				this.arcoDatabase.shutdown();
			}

			if (this.sgeAccountingThread == null) {
				this.sgeAccountingThread = new SGEAccountingThread(this.SGE_ROOT, this.SGE_CELL);
				this.sgeAccountingThread.start();
			}
		} else if (this.finishedJobRetrievalMethod.toLowerCase().equals("arco")) {
			if (this.sgeAccountingThread != null) {
				this.sgeAccountingThread.shutdown();
			}

			if (this.arcoDatabase == null) {
				this.arcoDatabase = new ARCODatabase(this);
			}
		}

		String usageDBUrl = (String) prefs.get("GridUsageDBURL");
		String usageDBUser = (String) prefs.get("GridUsageDBUser");
		if ((usageDBUrl == null) || (usageDBUrl.trim().isEmpty()) || (usageDBUser == null)
				|| (usageDBUser.trim().isEmpty()))
			return;
		String usageDBPass = (String) prefs.get("GridUsageDBPass");
		if ((TEST_MODE) || (this.dailyUsageThread != null))
			return;
		try {
			this.dailyUsageThread = new DailyUsageThread(this.sgeAccountingThread, usageDBUrl, usageDBUser,
					usageDBPass);

			this.dailyUsageThread.setPriority(1);
			this.dailyUsageThread.start();
		} catch (Exception ex) {
		}
	}

	private void registerListeners() throws JGDIException {
		this.jobFinishListener = new JGDIJobFinishListener(this);
		this.finishEventClient = JGDIFactory.createEventClient(this.bootstrapURL, 0);
		this.finishEventClient.subscribe(EventTypeEnum.JobFinalUsage);
		this.finishEventClient.subscribe(EventTypeEnum.JobDel);
		this.finishEventClient.subscribe(EventTypeEnum.JobTaskDel);
		this.finishEventClient.commit();
		this.finishEventClient.addEventListener(this.jobFinishListener);

		this.jobModListener = new JGDIJobModListener(this);
		this.modEventClient = JGDIFactory.createEventClient(this.bootstrapURL, 0);
		this.modEventClient.subscribe(EventTypeEnum.JobTaskMod);
		this.modEventClient.commit();
		this.modEventClient.addEventListener(this.jobModListener);

		this.qmasterDownListener = new JGDIQmasterDownListener(this);
		this.downEventClient = JGDIFactory.createEventClient(this.bootstrapURL, 0);
		this.downEventClient.subscribe(EventTypeEnum.QmasterGoesDown);
		this.downEventClient.commit();
		this.downEventClient.addEventListener(this.qmasterDownListener);
	}

	private boolean isQmasterAlive() {
		return this.isQmasterAlive;
	}

	public void pingQmaster() {
		if (!(this.SGE_ROOT.endsWith("6.2u1")))
			return;
		try {
			JGDI j = JGDIFactory.newSynchronizedInstance(this.bootstrapURL);

			if (!(this.isQmasterAlive)) {
				System.err.println(new Date() + ": S U C C E S S: Qmaster restored");
				this.isQmasterAlive = true;
				this.jgdi = j;
				registerListeners();
			} else {
				j.close();
			}
			return;
		} catch (Exception ex) {
			if (this.isQmasterAlive) {
				System.err.println(new Date() + ": W A R N I N G: Qmaster CRASH detected");
				this.isQmasterAlive = false;
				try {
					this.finishEventClient.close();
					this.modEventClient.close();
					this.downEventClient.close();
				} catch (Exception ex2) {
					ex2.printStackTrace();
				}
			}
		}
	}

	public synchronized void waitForQmasterAlive() {
		pingQmaster();
		while (!(this.isQmasterAlive))
			try {
				Thread.sleep(15000L);
			} catch (Exception ex) {
			}
	}

	public String submitJob(GridJobSubmitInfo gji)
  {
    String ret = "";
    Process process = null;
    int offset = 9;

    long sleepTime = 2000L;
    boolean needsContinue = false;
    String err = "";
    int i = 1;
    do {
      if (i > 1) {
        try {
          Thread.sleep(sleepTime);
        }
        catch (Exception ex)
        {
        }
      }

      try
      {
        StringBuilder cmd = new StringBuilder();

        String username = gji.getUsername();
        if (username == null) {
          throw new Exception("Failed to get Username");
        }

        if (gji.getPrivilegeEscalation()) {
          cmd.append("sudo -E -u ");
          cmd.append(username);
          cmd.append(" ");
        }

        String executableLocation = gji.getCommand();

        if (executableLocation == null) {
          throw new Exception("Failed to get Executable Location");
        }

        List<GridJobArgument> arguments = gji.getArguments();

        if ((arguments == null) || (arguments.contains(null))) {
          throw new Exception("Failed to get command line arguments");
        }

        cmd.append("qsub ");

        if (gji.getSubmissionType() == GridJobSubmitInfo.SUBMISSION_ARRAY) {
          cmd.append(gji.getCommand());
          offset += 6;
        } else {
          cmd.append(gji.getNativeSpecification());
          cmd.append(" -o ");
          cmd.append(gji.getOutputPath());
          cmd.append(" -e ");
          cmd.append(gji.getErrorPath());

          Properties envProperties = gji.getEnvironmentProperties();
          String[] environment = null;
          int ei;
          if (envProperties != null) {
            environment = new String[envProperties.size()];
            ei = 0;
            for (String varName : envProperties.stringPropertyNames()) {
              StringBuilder sb = new StringBuilder(varName);
              sb.append("=");
              sb.append(envProperties.getProperty(varName));
              environment[(ei++)] = sb.toString();
            }
          }

          if ((environment != null) && (environment.length > 0)) {
            cmd.append(" -v ");
            int s_index = 0;
            for (String s : environment) {
              if (s_index > 0) {
                cmd.append(",");
              }

              cmd.append(s);
              ++s_index;
            }

            cmd.append(" ");
          }

          cmd.append(" ");

          cmd.append(gji.getCommand());

          for (GridJobArgument arg : arguments) {
            String argValue = arg.getValue();
            if (argValue != null) {
              cmd.append(" ");
              cmd.append(argValue);
            }
          }
        }

        if (!(isQmasterAlive())) {
          waitForQmasterAlive();
          needsContinue = true;

/*          if (process == null) break label1181; 
          releaseProcess(process); 
          process = null; break label1181: 
*/        }
        StringTokenizer st = new StringTokenizer(cmd.toString());
        String[] command = new String[st.countTokens()];

        for (int k = 0; k < command.length; ++k) {
          command[k] = st.nextToken();
        }

        ProcessBuilder pb = new ProcessBuilder(command);
        Map<String, String> env = pb.environment();

        Properties props = gji.getEnvironmentProperties();

        if (props != null) {
          for (String varName : props.stringPropertyNames()) {
            env.put(varName, props.getProperty(varName));
          }
        }

        process = pb.start();

        byte[] buff = new byte[100];
        byte[] errBuff = new byte[255];
        process.getInputStream().read(buff);
        process.getErrorStream().read(errBuff);

        err = new String(errBuff);
        releaseProcess(process);
        process = null;

        if ((err.contains("can't connect to service")) || (err.contains("got read error"))) {
          waitForQmasterAlive();
          needsContinue = true;

/*          if (process == null) break label1181; 
          releaseProcess(process); 
          process = null; break label1181: 
*/        }
        String response = new String(buff);

        for (String s : response.split("\n"))
          if (s.startsWith("Your job")) {
            String sub = s.substring(offset);
            int spaceIndex = sub.indexOf(" ");

            if (spaceIndex == -1) break;
            String jobId = sub.substring(0, spaceIndex);
            String str1 = jobId;
            return str1;
          }
        if (err.trim().length() > 0) {
          err = err + "\n";
        }

        err = err + "ERROR: " + response;
      } catch (Exception ex) {
        ex.printStackTrace();
        StringBuilder errorMsg = new StringBuilder("Unable to submit job. Internal error occurred\n");

        errorMsg.append("\n     Date: ").append(new Date().toString());
        errorMsg.append("\n   Reason: ").append(ex.getMessage());

        err = errorMsg.toString();
      } finally {
        if (process != null) {
          releaseProcess(process);
          process = null;
        }
      }

      System.err.println(new Date() + ": Attempt " + i + ": ERROR While submitting job: " + err);

      ret = ret + "Attempt " + (i++) + ": " + err + "\n";
      if (i < 6) {
        needsContinue = true;
        label1181: if (i > 2)
          sleepTime *= 2L;
      }
      else
      {
        return "ERROR:" + ret; }
    }
    while (needsContinue);

    return "ERROR:" + ret;
  }

	public List<GridJobInfo> getJobList(String complexVariables) {
		List<GridJobInfo> ret = new LinkedList<>();

		if (this.jgdi == null) {
			System.err.println("JGDI is not properly initialized, returning NULL.");
			return null;
		}
		Iterator i$;
		try {
			List<Job> qJobs = this.jgdi.getJobList();
			if (TEST_MODE) {
				System.out.println("qJobs.size = " + qJobs.size());
			}

			Map<String, String> complexVars = new HashMap<>();

			if (complexVariables != null) {
				String[] complexVarStringTokens = complexVariables.split(",");

				for (String str : complexVarStringTokens) {
					if (str.trim().length() > 0) {
						String[] var = str.trim().split("=");
						if (var.length == 1) {
							if (var[0].trim().length() > 0)
								complexVars.put(var[0], "true");
						} else {
							if ((var.length != 2) || (var[0].trim().length() <= 0))
								continue;
							if (var[1].trim().length() > 0)
								complexVars.put(var[0].trim(), var[1].trim());
							else {
								complexVars.put(var[0].trim(), "true");
							}
						}
					}
				}

			}

			List<Job> plJobs = new LinkedList<>();

			for (Job j : qJobs) {
				boolean sameComplexVars = true;
				Map<String, String> jobComplexVars;
				if (!(complexVars.isEmpty())) {
					String prefix = (String) complexVars.get("GridJobNamePrefix");

					if (prefix != null) {
						if (!(j.getJobName().startsWith(prefix)))
							sameComplexVars = false;
					} else {
						jobComplexVars = new HashMap<>();

						for (int i = 0; i < j.getHardResourceCount(); ++i) {
							ComplexEntry ce = j.getHardResource(i);
							String name = ce.getName();
							String val = ce.getStringval();
							jobComplexVars.put(name, val);
						}

						for (String cvn : complexVars.keySet()) {
							if (!(jobComplexVars.containsKey(cvn))) {
								sameComplexVars = false;
								break;
							}
							String jcvv = (String) jobComplexVars.get(cvn);

							if (!(jcvv.equalsIgnoreCase((String) complexVars.get(cvn)))) {
								sameComplexVars = false;
								break;
							}
						}

					}

				}

				if (sameComplexVars) {
					plJobs.add(j);
				}
			}

			if (TEST_MODE) {
				System.out.println("plJobs.size = " + qJobs.size());

				for (Job j : plJobs) {
					System.out.println(j.getJobNumber());
				}

			}

			for (i$ = plJobs.iterator(); i$.hasNext();) {
				Job j = (Job) i$.next();

				String jobId = String.valueOf(j.getJobNumber());

				List<JobTask> taskList = j.getJaTasksList();

				if (taskList != null)
					if (taskList.isEmpty()) {
						Range range = j.getJaStructure(0);

						if (TEST_MODE) {
							System.out.println("==================== JOB " + j.getJobNumber() + " =========");
							System.out.println(j.dump());
							System.out.println("============================================");
						}

						for (int i = range.getMin(); i <= range.getMax(); i += range.getStep())
							ret.add(getJobInfo(jobId + "." + i, j));
					} else {
						for (JobTask task : taskList) {
							GridJobInfo gji = getJobInfo(jobId + "." + task.getTaskNumber(), j);
							if (gji != null)
								ret.add(gji);
						}
					}
			}
		} catch (JGDIException ex) {
			Job j;
			String jobId;
			ex.printStackTrace();
		}

		return ret;
	}

	private GridJobInfo getTaskInfo(GridJobInfo gji, Job j, int taskId) {
		for (JobTask jt : j.getJaTasksList()) {
			int taskNum = jt.getTaskNumber();
			if (taskNum == taskId) {
				try {
					Double d = Double.valueOf(jt.getUsage("end_time"));
					if (d.longValue() > 0L)
						return null;
				} catch (Exception ex) {
				}
				gji.setStartTime(jt.getStartTime() * 1000L);
				gji.setState(2);
				return gji;
			}

		}

		for (Range r : j.getJaNHIdsList()) {
			for (int i = r.getMin(); i <= r.getMax(); i += r.getStep()) {
				if (taskId == i) {
					gji.setQueuedTime(j.getSubmissionTime() * 1000L);
					gji.setState(1);
					return gji;
				}
			}

		}

		return null;
	}

	private GridJobInfo getJobInfo(String jobId, Job j) {
		GridJobInfo gji = new GridJobInfo(jobId);

		int taskId = 1;

		String jobIdOnly = jobId;

		if (jobId.contains(".")) {
			String strTaskId = jobId.substring(jobId.indexOf(".") + 1);
			taskId = Integer.valueOf(strTaskId).intValue();
			jobIdOnly = jobId.substring(0, jobId.indexOf("."));
		}

		try {
			if (j == null) {
				j = this.jgdi.getJob(Integer.valueOf(jobIdOnly).intValue());
			}

			if (j != null) {
				GridJobInfo ret = getTaskInfo(gji, j, taskId);

				if (ret != null) {
					for (int i = 0; i < j.getHardResourceCount(); ++i) {
						ComplexEntry ce = j.getHardResource(i);
						ret.addComplexVariable(ce.getName(), ce.getStringval());
					}

					return ret;
				}
			}

			GridJobInfo fji = getFinishedJobInfo(jobId);
			if (fji != null) {
				return fji;
			}
		} catch (JGDIException ex) {
			ex.printStackTrace();
			if (ex.getMessage().contains("unable to contact qmaster using port")) {
				this.isQmasterAlive = false;
				System.err.println("Qmaster is unavailable. ");
			}
		} catch (Exception ex) {
			ex.printStackTrace();
		}

		return gji;
	}

	public GridJobInfo getJobInfo(String jobId) {
		return getJobInfo(jobId, null);
	}

	private GridJobInfo getFinishedJobInfo(String jobId) throws PLGrid_InvalidMethodException {
		GridJobInfo fji = null;
		if ((this.finishedJobRetrievalMethod == null) || (this.finishedJobRetrievalMethod.trim().length() == 0)) {
			fji = this.sgeAccountingThread.getFinishedJobInfo(jobId);
		} else if (this.finishedJobRetrievalMethod.toLowerCase().equals("arco")) {
			if (this.arcoDatabase == null) {
				this.arcoDatabase = new ARCODatabase(this);
			}
			fji = this.arcoDatabase.getFinishedJobInfo(jobId);
		} else {
			throw new PLGrid_InvalidMethodException("Method \"" + this.finishedJobRetrievalMethod
					+ "\" is not supported by this plugin for obtaining finished job information.");
		}

		return fji;
	}

	public void killJob(String jobId, String username, boolean force) {
		if (username != null) {
			StringBuilder cmd = new StringBuilder();

			cmd.append("sudo -u ");
			cmd.append(username);
			cmd.append(" ");

			cmd.append("qdel ");

			if (force) {
				cmd.append("-f ");
			}

			cmd.append(jobId);

			Process process = null;
			try {
				process = Runtime.getRuntime().exec(cmd.toString());
				process.waitFor();
			} catch (Exception ex) {
				ex.printStackTrace();
				StringBuilder errorMsg = new StringBuilder("Unable to delete job ");

				errorMsg.append(jobId);
				errorMsg.append("\n     Date: ").append(new Date().toString());
				errorMsg.append("\n   Reason: ").append(ex.getMessage());

				System.err.println(errorMsg);
			} finally {
				if (process != null) {
					releaseProcess(process);
					process = null;
				}
			}
		} else {
			try {
				if (jobId.contains(".")) {
					List answers = new LinkedList();
					String[] tasks = { jobId };
					this.jgdi.deleteJobsWithAnswer(tasks, force, null, answers);
				} else {
					this.jgdi.deleteJob(Integer.valueOf(jobId).intValue());
				}
			} catch (Exception ex) {
				if (!(ex.getMessage().contains("does not exist")))
					ex.printStackTrace();
			}
		}
	}

	private void releaseProcess(Process p) {
		Exception ex = null;

		if (p != null) {
			try {
				p.getInputStream().close();
			} catch (Exception iex) {
				ex = iex;
			}
			try {
				p.getOutputStream().close();
			} catch (Exception oex) {
				ex = oex;
			}
			try {
				p.getErrorStream().close();
			} catch (Exception eex) {
				ex = eex;
			}
			try {
				p.destroy();
			} catch (Exception dex) {
				ex = dex;
			}
		}

		if (ex != null)
			ex.printStackTrace();
	}

	public static void main(String[] args) {
		System.out.println("PipelineGridPlugin version 3.1");
		System.out.println("JGDI Plugin version 3.4");

		TEST_MODE = true;
	}

	private class HeartBeatTimerTask extends TimerTask {
		public void run() {
			JGDIPlugin.this.pingQmaster();
		}
	}
}