View Javadoc
1   /*
2    * Copyright 2007-2008 the original author or authors.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.gwe.app.agent;
18  
19  import java.io.Serializable;
20  import java.rmi.RemoteException;
21  import java.util.concurrent.Callable;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.gwe.api.ServerAPI4Agent;
26  import org.gwe.api.SystemDaemonRequest;
27  import org.gwe.persistence.model.ComputeResourceInfo;
28  import org.gwe.persistence.model.DaemonConfigDesc;
29  import org.gwe.persistence.model.order.DaemonRequest;
30  import org.gwe.utils.reinvoke.ReinvocationInterceptor;
31  
32  /**
33   * @author Marco Ruiz
34   * @since Sep 5, 2008
35   */
36  public class BaseAgent implements Callable<Void> {
37  
38  	private static Log log = LogFactory.getLog(BaseAgent.class);
39  
40  	protected ServerAPI4Agent agentAPI;
41  	public int allocId;
42  	private String workspace;
43  	public DaemonConfigDesc config;
44  
45  	public BaseAgent(ServerAPI4Agent agentAPI, DaemonConfigDesc user, int allocId) {
46  	    this.agentAPI = ReinvocationInterceptor.createProxy(agentAPI);
47  	    this.allocId = allocId;
48  	    this.config = user;
49  		this.workspace = config.getHeadResource().getInstallation().getAllocsWorkspacePath(allocId);
50      }
51  	
52  	public Void call() throws Exception {
53  		boolean accepted = agentAPI.reserveAllocation(allocId, ComputeResourceInfo.createLocalInfo(allocId));
54  		log.info("Agent registered: '" + accepted + "'");
55  		while (accepted && processRequest());
56  		log.info("Shuting down...");
57  		return null;
58  	}
59  
60  	private boolean processRequest() throws RemoteException {
61  		Serializable result = null;
62  		DaemonRequest<?> req = null;
63  		try {
64  			log.info("Querying daemon for next request...");
65  			req = queryNextRequest();
66  		} catch(Exception e) {
67  			// TODO: Try to recover!!!
68  			log.warn("Request process failed with exception", e);
69  			agentAPI.reportAgentProblem(allocId, e);
70  			return false;
71  		}
72  		
73  		if (req == null) {
74  			log.info("Maximum retransmissions reached while trying to contact daemon...");
75  			return false;
76  		}
77  		
78  		if (req instanceof SystemDaemonRequest) { 
79  			log.info("Daemon sent a " + req + " request...");
80  			return ((SystemDaemonRequest)req).systemProcess();
81  		}
82  
83  		log.info("Invoking 'request.process'...");
84  		try {
85  			result = req.process(workspace);
86  			log.info("Request process completed successfully!");
87  		} catch(Exception e) {
88  			// TODO: Try to recover!!!
89  			result = e;
90  			log.warn("Request process failed with exception", e);
91  		}
92  
93  	    return agentAPI.reportRequestCompletion(allocId, req.getExecId(), result);
94  	}
95  
96  	private DaemonRequest<?> queryNextRequest() throws RemoteException {
97  	    try {
98  	    	return agentAPI.getNextRequest(allocId);
99  	    } catch (RemoteException e) {
100 			log.info("Exception while querying next request...", e);
101     		return agentAPI.getNextRequestAgain(allocId);
102 	    }
103     }
104 }
105