1 package org.apache.jcs.engine;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.IOException;
23 import java.io.Serializable;
24 import java.util.ArrayList;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.jcs.engine.behavior.ICacheElement;
29 import org.apache.jcs.engine.behavior.ICacheEventQueue;
30 import org.apache.jcs.engine.behavior.ICacheListener;
31 import org.apache.jcs.engine.stats.StatElement;
32 import org.apache.jcs.engine.stats.Stats;
33 import org.apache.jcs.engine.stats.behavior.IStatElement;
34 import org.apache.jcs.engine.stats.behavior.IStats;
35 import org.apache.jcs.utils.threadpool.ThreadPool;
36 import org.apache.jcs.utils.threadpool.ThreadPoolManager;
37
38 import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
39
40 /***
41 * An event queue is used to propagate ordered cache events to one and only one target listener.
42 * <p>
43 * This is a modified version of the experimental version. It uses a PooledExecutor and a
44 * BoundedBuffer to queue up events and execute them as threads become available.
45 * <p>
46 * The PooledExecutor is static, because presumably these processes will be IO bound, so throwing
47 * more than a few threads at them will serve no purpose other than to saturate the IO interface. In
48 * light of this, having one thread per region seems unnecessary. This may prove to be false.
49 */
50 public class PooledCacheEventQueue
51 implements ICacheEventQueue
52 {
53 /*** The type of event queue */
54 private static final String queueType = POOLED_QUEUE_TYPE;
55
56 /*** The logger */
57 private static final Log log = LogFactory.getLog( PooledCacheEventQueue.class );
58
59 /*** time to wait for an event before snuffing the background thread
60 if the queue is empty. make configurable later */
61 private int waitToDieMillis = 10000;
62
63 /*** The listener to process events */
64 private ICacheListener listener;
65
66 /*** The listener id */
67 private long listenerId;
68
69 /*** The name of the cache */
70 private String cacheName;
71
72 /*** Max failures before self destruction */
73 private int maxFailure;
74
75 /*** in milliseconds */
76 private int waitBeforeRetry;
77
78 /*** Has the pool been destroyed */
79 private boolean destroyed = true;
80
81 /*** Is it working */
82 private boolean working = true;
83
84 /*** The Thread Pool to execute events with. */
85 private ThreadPool pool = null;
86
87 /***
88 * Constructor for the CacheEventQueue object
89 * <p>
90 * @param listener
91 * @param listenerId
92 * @param cacheName
93 * @param maxFailure
94 * @param waitBeforeRetry
95 * @param threadPoolName
96 */
97 public PooledCacheEventQueue( ICacheListener listener, long listenerId, String cacheName, int maxFailure,
98 int waitBeforeRetry, String threadPoolName )
99 {
100 initialize( listener, listenerId, cacheName, maxFailure, waitBeforeRetry, threadPoolName );
101 }
102
103 /***
104 * Initializes the queue.
105 * <p>
106 * @param listener
107 * @param listenerId
108 * @param cacheName
109 * @param maxFailure
110 * @param waitBeforeRetry
111 * @param threadPoolName
112 */
113 public void initialize( ICacheListener listener, long listenerId, String cacheName, int maxFailure,
114 int waitBeforeRetry, String threadPoolName )
115 {
116 if ( listener == null )
117 {
118 throw new IllegalArgumentException( "listener must not be null" );
119 }
120
121 this.listener = listener;
122 this.listenerId = listenerId;
123 this.cacheName = cacheName;
124 this.maxFailure = maxFailure <= 0 ? 3 : maxFailure;
125 this.waitBeforeRetry = waitBeforeRetry <= 0 ? 500 : waitBeforeRetry;
126
127
128 if ( threadPoolName == null )
129 {
130 threadPoolName = "cache_event_queue";
131 }
132 pool = ThreadPoolManager.getInstance().getPool( threadPoolName );
133
134 if ( log.isDebugEnabled() )
135 {
136 log.debug( "Initialized: " + this );
137 }
138 }
139
140 /***
141 * @return the queue type
142 */
143 public String getQueueType()
144 {
145 return queueType;
146 }
147
148 /***
149 * Event Q is empty.
150 */
151 public synchronized void stopProcessing()
152 {
153 destroyed = true;
154 }
155
156 /***
157 * Returns the time to wait for events before killing the background thread.
158 * <p>
159 * @return the time to wait before shutting down in ms.
160 */
161 public int getWaitToDieMillis()
162 {
163 return waitToDieMillis;
164 }
165
166 /***
167 * Sets the time to wait for events before killing the background thread.
168 * <p>
169 * @param wtdm
170 */
171 public void setWaitToDieMillis( int wtdm )
172 {
173 waitToDieMillis = wtdm;
174 }
175
176 /***
177 * @return String info.
178 */
179 public String toString()
180 {
181 return "CacheEventQueue [listenerId=" + listenerId + ", cacheName=" + cacheName + "]";
182 }
183
184 /***
185 * @return true if not destroyed.
186 */
187 public boolean isAlive()
188 {
189 return ( !destroyed );
190 }
191
192 /***
193 * @param aState
194 */
195 public void setAlive( boolean aState )
196 {
197 destroyed = !aState;
198 }
199
200 /***
201 * @return The listenerId value
202 */
203 public long getListenerId()
204 {
205 return listenerId;
206 }
207
208 /***
209 * Destroy the queue. Interrupt all threads.
210 */
211 public synchronized void destroy()
212 {
213 if ( !destroyed )
214 {
215 destroyed = true;
216
217
218 pool.getPool().interruptAll();
219 if ( log.isInfoEnabled() )
220 {
221 log.info( "Cache event queue destroyed: " + this );
222 }
223 }
224 }
225
226 /***
227 * Constructs a PutEvent for the object and passes it to the event queue.
228 * <p>
229 * @param ce The feature to be added to the PutEvent attribute
230 * @exception IOException
231 */
232 public synchronized void addPutEvent( ICacheElement ce )
233 throws IOException
234 {
235 if ( isWorking() )
236 {
237 put( new PutEvent( ce ) );
238 }
239 else
240 {
241 if ( log.isWarnEnabled() )
242 {
243 log.warn( "Not enqueuing Put Event for [" + this + "] because it's non-functional." );
244 }
245 }
246 }
247
248 /***
249 * @param key The feature to be added to the RemoveEvent attribute
250 * @exception IOException
251 */
252 public synchronized void addRemoveEvent( Serializable key )
253 throws IOException
254 {
255 if ( isWorking() )
256 {
257 put( new RemoveEvent( key ) );
258 }
259 else
260 {
261 if ( log.isWarnEnabled() )
262 {
263 log.warn( "Not enqueuing Remove Event for [" + this + "] because it's non-functional." );
264 }
265 }
266 }
267
268 /***
269 * @exception IOException
270 */
271 public synchronized void addRemoveAllEvent()
272 throws IOException
273 {
274 if ( isWorking() )
275 {
276 put( new RemoveAllEvent() );
277 }
278 else
279 {
280 if ( log.isWarnEnabled() )
281 {
282 log.warn( "Not enqueuing RemoveAll Event for [" + this + "] because it's non-functional." );
283 }
284 }
285 }
286
287 /***
288 * @exception IOException
289 */
290 public synchronized void addDisposeEvent()
291 throws IOException
292 {
293 if ( isWorking() )
294 {
295 put( new DisposeEvent() );
296 }
297 else
298 {
299 if ( log.isWarnEnabled() )
300 {
301 log.warn( "Not enqueuing Dispose Event for [" + this + "] because it's non-functional." );
302 }
303 }
304 }
305
306 /***
307 * Adds an event to the queue.
308 * <p>
309 * @param event
310 */
311 private void put( AbstractCacheEvent event )
312 {
313 try
314 {
315 pool.execute( event );
316 }
317 catch ( InterruptedException e )
318 {
319 log.error( e );
320 }
321 }
322
323 /***
324 * @return Statistics info
325 */
326 public String getStats()
327 {
328 return getStatistics().toString();
329 }
330
331 /***
332 * @return IStats
333 */
334 public IStats getStatistics()
335 {
336 IStats stats = new Stats();
337 stats.setTypeName( "Pooled Cache Event Queue" );
338
339 ArrayList elems = new ArrayList();
340
341 IStatElement se = null;
342
343 se = new StatElement();
344 se.setName( "Working" );
345 se.setData( "" + this.working );
346 elems.add( se );
347
348 se = new StatElement();
349 se.setName( "Destroyed" );
350 se.setData( "" + this.isAlive() );
351 elems.add( se );
352
353 se = new StatElement();
354 se.setName( "Empty" );
355 se.setData( "" + this.isEmpty() );
356 elems.add( se );
357
358 if ( pool.getQueue() != null )
359 {
360 if ( pool.getQueue() instanceof BoundedBuffer )
361 {
362 BoundedBuffer bb = (BoundedBuffer) pool.getQueue();
363 se = new StatElement();
364 se.setName( "Queue Size" );
365 se.setData( "" + bb.size() );
366 elems.add( se );
367
368 se = new StatElement();
369 se.setName( "Queue Capacity" );
370 se.setData( "" + bb.capacity() );
371 elems.add( se );
372 }
373 }
374
375 se = new StatElement();
376 se.setName( "Pool Size" );
377 se.setData( "" + pool.getPool().getPoolSize() );
378 elems.add( se );
379
380 se = new StatElement();
381 se.setName( "Maximum Pool Size" );
382 se.setData( "" + pool.getPool().getMaximumPoolSize() );
383 elems.add( se );
384
385
386 IStatElement[] ses = (IStatElement[]) elems.toArray( new StatElement[elems.size()] );
387 stats.setStatElements( ses );
388
389 return stats;
390 }
391
392
393
394 /***
395 * Retries before declaring failure.
396 * <p>
397 * @author asmuts
398 * @created January 15, 2002
399 */
400 private abstract class AbstractCacheEvent
401 implements Runnable
402 {
403 /*** Times failed to process */
404 int failures = 0;
405
406 /*** Has the event been processed */
407 boolean done = false;
408
409 /***
410 * Main processing method for the AbstractCacheEvent object. It calls the abstract doRun
411 * method that all concrete instances must implement.
412 */
413 public void run()
414 {
415 try
416 {
417 doRun();
418 }
419 catch ( IOException e )
420 {
421 if ( log.isWarnEnabled() )
422 {
423 log.warn( e );
424 }
425 if ( ++failures >= maxFailure )
426 {
427 if ( log.isWarnEnabled() )
428 {
429 log.warn( "Error while running event from Queue: " + this
430 + ". Dropping Event and marking Event Queue as non-functional." );
431 }
432 setWorking( false );
433 setAlive( false );
434 return;
435 }
436 if ( log.isInfoEnabled() )
437 {
438 log.info( "Error while running event from Queue: " + this + ". Retrying..." );
439 }
440 try
441 {
442 Thread.sleep( waitBeforeRetry );
443 run();
444 }
445 catch ( InterruptedException ie )
446 {
447 if ( log.isErrorEnabled() )
448 {
449 log.warn( "Interrupted while sleeping for retry on event " + this + "." );
450 }
451 setWorking( false );
452 setAlive( false );
453 }
454 }
455 }
456
457 /***
458 * @exception IOException
459 */
460 protected abstract void doRun()
461 throws IOException;
462 }
463
464 /***
465 * An event that puts an item to a ICacheListener
466 */
467 private class PutEvent
468 extends AbstractCacheEvent
469 {
470 /*** The payload */
471 private ICacheElement ice;
472
473 /***
474 * Constructor for the PutEvent object
475 * @param ice
476 * @exception IOException
477 */
478 PutEvent( ICacheElement ice )
479 throws IOException
480 {
481 this.ice = ice;
482 }
483
484 /***
485 * Tells the ICacheListener to handle the put.
486 * <p>
487 * @exception IOException
488 */
489 protected void doRun()
490 throws IOException
491 {
492 listener.handlePut( ice );
493 }
494
495 /*** @return debugging info */
496 public String toString()
497 {
498 return new StringBuffer( "PutEvent for key: " ).append( ice.getKey() ).append( " value: " )
499 .append( ice.getVal() ).toString();
500 }
501 }
502
503 /***
504 * An event that knows how to call remove on an ICacheListener
505 */
506 private class RemoveEvent
507 extends AbstractCacheEvent
508 {
509 /*** The payload, the key to remove */
510 private Serializable key;
511
512 /***
513 * Constructor for the RemoveEvent object
514 * @param key
515 * @exception IOException
516 */
517 RemoveEvent( Serializable key )
518 throws IOException
519 {
520 this.key = key;
521 }
522
523 /***
524 * Calls remove on the listener.
525 * <p>
526 * @exception IOException
527 */
528 protected void doRun()
529 throws IOException
530 {
531 listener.handleRemove( cacheName, key );
532 }
533
534 /*** @return debugging info */
535 public String toString()
536 {
537 return new StringBuffer( "RemoveEvent for " ).append( key ).toString();
538 }
539 }
540
541 /***
542 * An event that knows how to call remove all on an ICacheListener
543 */
544 private class RemoveAllEvent
545 extends AbstractCacheEvent
546 {
547 /***
548 * Call removeAll on the listener.
549 * <p>
550 * @exception IOException
551 */
552 protected void doRun()
553 throws IOException
554 {
555 listener.handleRemoveAll( cacheName );
556 }
557
558 /*** @return debugging info */
559 public String toString()
560 {
561 return "RemoveAllEvent";
562 }
563 }
564
565 /***
566 * The Event put into the queue for dispose requests.
567 */
568 private class DisposeEvent
569 extends AbstractCacheEvent
570 {
571 /***
572 * Called when gets to the end of the queue
573 * <p>
574 * @exception IOException
575 */
576 protected void doRun()
577 throws IOException
578 {
579 listener.handleDispose( cacheName );
580 }
581
582 /*** @return debugging info */
583 public String toString()
584 {
585 return "DisposeEvent";
586 }
587 }
588
589 /***
590 * @return whether or not the queue is functional
591 */
592 public boolean isWorking()
593 {
594 return working;
595 }
596
597 /***
598 * @param isWorkingArg whether the queue is functional
599 */
600 public void setWorking( boolean isWorkingArg )
601 {
602 working = isWorkingArg;
603 }
604
605 /***
606 * If the Queue is using a bounded channel we can determine the size. If it is zero or we can't
607 * determine the size, we return true.
608 * <p>
609 * @return whether or not there are items in the queue
610 */
611 public boolean isEmpty()
612 {
613 if ( pool.getQueue() == null )
614 {
615 return pool.getQueue().peek() == null;
616 }
617 else
618 {
619 if ( pool.getQueue() instanceof BoundedBuffer )
620 {
621 BoundedBuffer bb = (BoundedBuffer) pool.getQueue();
622 return bb.size() == 0;
623 }
624 else
625 {
626 return true;
627 }
628 }
629 }
630
631 /***
632 * Returns the number of elements in the queue. If the queue cannot determine the size
633 * accurately it will return 1.
634 * <p>
635 * @return number of items in the queue.
636 */
637 public int size()
638 {
639 if ( pool.getQueue() == null )
640 {
641 return pool.getQueue().peek() == null ? 0 : 1;
642 }
643 else
644 {
645 if ( pool.getQueue() instanceof BoundedBuffer )
646 {
647 BoundedBuffer bb = (BoundedBuffer) pool.getQueue();
648 return bb.size();
649 }
650 else
651 {
652 return 1;
653 }
654 }
655 }
656 }