View Javadoc

1   /*
2    * Copyright 2007-2008 the original author or authors.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.gwe.utils.services;
18  
19  import java.util.ArrayList;
20  import java.util.List;
21  import java.util.Map;
22  import java.util.TreeMap;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  
27  /**
28   * @author Marco Ruiz
29   * @since Aug 1, 2007
30   */
31  public class ProcessingPermitBroker implements Runnable {
32  	private static Log log = LogFactory.getLog(ProcessingPermitBroker.class);
33  
34  	private static ThreadGroup threadGroup = new ThreadGroup("Brokered Services"); 
35  	
36  	private List<PermitRequest> pendingRequests = new ArrayList<PermitRequest>();
37  	private Map<Long, PermitRequest> processingRequests = new TreeMap<Long, PermitRequest>();
38  //	private Map<Long, PermitRequest> completedRequests = new TreeMap<Long, PermitRequest>();
39  
40  	private int maxParallel;
41  
42  	public ProcessingPermitBroker(String name) {
43  		this(0, name);  // 0 means no maximum
44  	}
45  
46  	public ProcessingPermitBroker(int maxParallelRequest, String name) {
47  		this.maxParallel = maxParallelRequest;
48  		Thread brokerThread = new Thread(threadGroup, this, name + " Permit's Broker");
49  		brokerThread.setDaemon(true);
50  		brokerThread.start();
51  	}
52  
53  	public synchronized void run() {
54  		while (true) {
55  			while (pendingRequests.size() == 0 || (maxParallel > 0 && processingRequests.size() >= maxParallel)) {
56  				try {
57  					log.debug("Manager thread is going to sleep...");
58  					this.wait();
59  					log.debug("Manager thread woke up!");
60  				} catch (InterruptedException e) {
61  					// TODO Auto-generated catch block
62  				}
63  			}
64  
65  			PermitRequest handle = pendingRequests.remove(0);
66  			log.debug("Manager is granting permission to request [" + handle + "]");
67  			processingRequests.put(handle.getId(), handle);
68  			synchronized (handle.getLock()) {
69  				handle.getLock().notifyAll();
70  			}
71  		}
72  	}
73  
74  	public long createPermit() {
75  		log.debug("Client is soliciting processing permit");
76  		PermitRequest handle = PermitRequest.createDescriptor();
77  		synchronized (handle.getLock()) {
78  			try {
79  				addHandle(handle);
80  				handle.getLock().wait();
81  			} catch (InterruptedException e) {
82  				// TODO Auto-generated catch block
83  			}
84  		}
85  		log.debug("Client has been granted processing permission for request [" + handle + "]");
86  		return handle.getId();
87  	}
88  
89  	private synchronized void addHandle(PermitRequest handle) {
90  		log.debug("Adding request [" + handle + "] to waiting queue. Waking up manager thread...");
91  		pendingRequests.add(handle);
92  		this.notifyAll();
93  	}
94  
95  	public synchronized void destroyPermit(long reqId) {
96  		PermitRequest handle = processingRequests.remove(reqId);
97  		log.debug("Client is reporting that is done with permit [" + handle + "]. Waking up manager thread...");
98  //		completedRequests.put(handle.getId(), handle);
99  		this.notifyAll();
100 	}
101 }