1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.gwe.app.agent;
18
19 import java.io.Serializable;
20 import java.rmi.RemoteException;
21 import java.util.concurrent.Callable;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.gwe.api.ServerAPI4Agent;
26 import org.gwe.api.SystemDaemonRequest;
27 import org.gwe.persistence.model.ComputeResourceInfo;
28 import org.gwe.persistence.model.DaemonConfigDesc;
29 import org.gwe.persistence.model.order.DaemonRequest;
30 import org.gwe.utils.reinvoke.ReinvocationInterceptor;
31
32
33
34
35
36 public class BaseAgent implements Callable<Void> {
37
38 private static Log log = LogFactory.getLog(BaseAgent.class);
39
40 protected ServerAPI4Agent agentAPI;
41 public int allocId;
42 private String workspace;
43 public DaemonConfigDesc config;
44
45 public BaseAgent(ServerAPI4Agent agentAPI, DaemonConfigDesc user, int allocId) {
46 this.agentAPI = ReinvocationInterceptor.createProxy(agentAPI);
47 this.allocId = allocId;
48 this.config = user;
49 this.workspace = config.getHeadResource().getInstallation().getAllocsWorkspacePath(allocId);
50 }
51
52 public Void call() throws Exception {
53 boolean accepted = agentAPI.reserveAllocation(allocId, ComputeResourceInfo.createLocalInfo(allocId));
54 log.info("Agent registered: '" + accepted + "'");
55 while (accepted && processRequest());
56 log.info("Shuting down...");
57 return null;
58 }
59
60 private boolean processRequest() throws RemoteException {
61 Serializable result = null;
62 DaemonRequest<?> req = null;
63 try {
64 log.info("Querying daemon for next request...");
65 req = queryNextRequest();
66 } catch(Exception e) {
67
68 log.warn("Request process failed with exception", e);
69 agentAPI.reportAgentProblem(allocId, e);
70 return false;
71 }
72
73 if (req == null) {
74 log.info("Maximum retransmissions reached while trying to contact daemon...");
75 return false;
76 }
77
78 if (req instanceof SystemDaemonRequest) {
79 log.info("Daemon sent a " + req + " request...");
80 return ((SystemDaemonRequest)req).systemProcess();
81 }
82
83 log.info("Invoking 'request.process'...");
84 try {
85 result = req.process(workspace);
86 log.info("Request process completed successfully!");
87 } catch(Exception e) {
88
89 result = e;
90 log.warn("Request process failed with exception", e);
91 }
92
93 return agentAPI.reportRequestCompletion(allocId, req.getExecId(), result);
94 }
95
96 private DaemonRequest<?> queryNextRequest() throws RemoteException {
97 try {
98 return agentAPI.getNextRequest(allocId);
99 } catch (RemoteException e) {
100 log.info("Exception while querying next request...", e);
101 return agentAPI.getNextRequestAgain(allocId);
102 }
103 }
104 }
105