1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.gwe.utils.services;
18
19 import java.util.ArrayList;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.TreeMap;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26
27
28
29
30
31 public class ProcessingPermitBroker implements Runnable {
32 private static Log log = LogFactory.getLog(ProcessingPermitBroker.class);
33
34 private static ThreadGroup threadGroup = new ThreadGroup("Brokered Services");
35
36 private List<PermitRequest> pendingRequests = new ArrayList<PermitRequest>();
37 private Map<Long, PermitRequest> processingRequests = new TreeMap<Long, PermitRequest>();
38
39
40 private int maxParallel;
41
42 public ProcessingPermitBroker(String name) {
43 this(0, name);
44 }
45
46 public ProcessingPermitBroker(int maxParallelRequest, String name) {
47 this.maxParallel = maxParallelRequest;
48 Thread brokerThread = new Thread(threadGroup, this, name + " Permit's Broker");
49 brokerThread.setDaemon(true);
50 brokerThread.start();
51 }
52
53 public synchronized void run() {
54 while (true) {
55 while (pendingRequests.size() == 0 || (maxParallel > 0 && processingRequests.size() >= maxParallel)) {
56 try {
57 log.debug("Manager thread is going to sleep...");
58 this.wait();
59 log.debug("Manager thread woke up!");
60 } catch (InterruptedException e) {
61
62 }
63 }
64
65 PermitRequest handle = pendingRequests.remove(0);
66 log.debug("Manager is granting permission to request [" + handle + "]");
67 processingRequests.put(handle.getId(), handle);
68 synchronized (handle.getLock()) {
69 handle.getLock().notifyAll();
70 }
71 }
72 }
73
74 public long createPermit() {
75 log.debug("Client is soliciting processing permit");
76 PermitRequest handle = PermitRequest.createDescriptor();
77 synchronized (handle.getLock()) {
78 try {
79 addHandle(handle);
80 handle.getLock().wait();
81 } catch (InterruptedException e) {
82
83 }
84 }
85 log.debug("Client has been granted processing permission for request [" + handle + "]");
86 return handle.getId();
87 }
88
89 private synchronized void addHandle(PermitRequest handle) {
90 log.debug("Adding request [" + handle + "] to waiting queue. Waking up manager thread...");
91 pendingRequests.add(handle);
92 this.notifyAll();
93 }
94
95 public synchronized void destroyPermit(long reqId) {
96 PermitRequest handle = processingRequests.remove(reqId);
97 log.debug("Client is reporting that is done with permit [" + handle + "]. Waking up manager thread...");
98
99 this.notifyAll();
100 }
101 }