1 package org.apache.jcs.engine;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.IOException;
23 import java.io.Serializable;
24 import java.util.ArrayList;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.jcs.engine.behavior.ICacheElement;
29 import org.apache.jcs.engine.behavior.ICacheEventQueue;
30 import org.apache.jcs.engine.behavior.ICacheListener;
31 import org.apache.jcs.engine.stats.StatElement;
32 import org.apache.jcs.engine.stats.Stats;
33 import org.apache.jcs.engine.stats.behavior.IStatElement;
34 import org.apache.jcs.engine.stats.behavior.IStats;
35
36 /***
37 * An event queue is used to propagate ordered cache events to one and only one target listener.
38 * <p>
39 * This is a modified version of the experimental version. It should lazy initilaize the processor
40 * thread, and kill the thread if the queue goes emtpy for a specified period, now set to 1 minute.
41 * If something comes in after that a new processor thread should be created.
42 */
43 public class CacheEventQueue
44 implements ICacheEventQueue
45 {
46 /*** The logger. */
47 private static final Log log = LogFactory.getLog( CacheEventQueue.class );
48
49 /*** The type of queue -- there are pooled and single */
50 private static final String queueType = SINGLE_QUEUE_TYPE;
51
52 /*** default */
53 private static final int DEFAULT_WAIT_TO_DIE_MILLIS = 10000;
54
55 /***
56 * time to wait for an event before snuffing the background thread if the queue is empty. make
57 * configurable later
58 */
59 private int waitToDieMillis = DEFAULT_WAIT_TO_DIE_MILLIS;
60
61 /***
62 * When the events are pulled off the queue, the tell the listener to handle the specific event
63 * type. The work is done by the listener.
64 */
65 private ICacheListener listener;
66
67 /*** Id of the listener registed with this queue */
68 private long listenerId;
69
70 /*** The cache region name, if applicable. */
71 private String cacheName;
72
73 /*** Maximum number of failures before we buy the farm. */
74 private int maxFailure;
75
76 /*** in milliseconds */
77 private int waitBeforeRetry;
78
79 /*** this is true if there is no worker thread. */
80 private boolean destroyed = true;
81
82 /***
83 * This means that the queue is functional. If we reached the max number of failures, the queue
84 * is marked as non functional and will never work again.
85 */
86 private boolean working = true;
87
88 /*** the thread that works the queue. */
89 private Thread processorThread;
90
91 /*** sync */
92 private Object queueLock = new Object();
93
94 /*** the head of the queue */
95 private Node head = new Node();
96
97 /*** the end of the queue */
98 private Node tail = head;
99
100 /*** Number of items in the queue */
101 private int size = 0;
102
103 /***
104 * Constructs with the specified listener and the cache name.
105 * <p>
106 * @param listener
107 * @param listenerId
108 * @param cacheName
109 */
110 public CacheEventQueue( ICacheListener listener, long listenerId, String cacheName )
111 {
112 this( listener, listenerId, cacheName, 10, 500 );
113 }
114
115 /***
116 * Constructor for the CacheEventQueue object
117 * <p>
118 * @param listener
119 * @param listenerId
120 * @param cacheName
121 * @param maxFailure
122 * @param waitBeforeRetry
123 */
124 public CacheEventQueue( ICacheListener listener, long listenerId, String cacheName, int maxFailure,
125 int waitBeforeRetry )
126 {
127 initialize( listener, listenerId, cacheName, maxFailure, waitBeforeRetry, null );
128 }
129
130 /***
131 * Initializes the queue.
132 * <p>
133 * @param listener
134 * @param listenerId
135 * @param cacheName
136 * @param maxFailure
137 * @param waitBeforeRetry
138 * @param threadPoolName
139 */
140 public void initialize( ICacheListener listener, long listenerId, String cacheName, int maxFailure,
141 int waitBeforeRetry, String threadPoolName )
142 {
143 if ( listener == null )
144 {
145 throw new IllegalArgumentException( "listener must not be null" );
146 }
147
148 this.listener = listener;
149 this.listenerId = listenerId;
150 this.cacheName = cacheName;
151 this.maxFailure = maxFailure <= 0 ? 3 : maxFailure;
152 this.waitBeforeRetry = waitBeforeRetry <= 0 ? 500 : waitBeforeRetry;
153
154 if ( log.isDebugEnabled() )
155 {
156 log.debug( "Constructed: " + this );
157 }
158 }
159
160 /***
161 * What type of queue is this.
162 * <p>
163 * @return queueType
164 */
165 public String getQueueType()
166 {
167 return queueType;
168 }
169
170 /***
171 * Kill the processor thread and indicate that the queue is detroyed and no longer alive, but it
172 * can still be working.
173 */
174 public synchronized void stopProcessing()
175 {
176 destroyed = true;
177 processorThread = null;
178 }
179
180 /***
181 * Returns the time to wait for events before killing the background thread.
182 * <p>
183 * @return int
184 */
185 public int getWaitToDieMillis()
186 {
187 return waitToDieMillis;
188 }
189
190 /***
191 * Sets the time to wait for events before killing the background thread.
192 * <p>
193 * @param wtdm the ms for the q to sit idle.
194 */
195 public void setWaitToDieMillis( int wtdm )
196 {
197 waitToDieMillis = wtdm;
198 }
199
200 /***
201 * Creates a brief string identifying the listener and the region.
202 * <p>
203 * @return String debugging info.
204 */
205 public String toString()
206 {
207 return "CacheEventQueue [listenerId=" + listenerId + ", cacheName=" + cacheName + "]";
208 }
209
210 /***
211 * If they queue has an active thread it is considered alive.
212 * <p>
213 * @return The alive value
214 */
215 public synchronized boolean isAlive()
216 {
217 return ( !destroyed );
218 }
219
220 /***
221 * Sets whether the queue is actively processing -- if there are working threads.
222 * <p>
223 * @param aState
224 */
225 public synchronized void setAlive( boolean aState )
226 {
227 destroyed = !aState;
228 }
229
230 /***
231 * @return The listenerId value
232 */
233 public long getListenerId()
234 {
235 return listenerId;
236 }
237
238 /***
239 * Event Q is emtpy.
240 * <p>
241 * Calling destroy interupts the processor thread.
242 */
243 public synchronized void destroy()
244 {
245 if ( !destroyed )
246 {
247 destroyed = true;
248
249 if ( log.isInfoEnabled() )
250 {
251 log.info( "Destroying queue, stats = " + getStatistics() );
252 }
253
254
255
256
257 if ( processorThread != null )
258 {
259 synchronized ( queueLock )
260 {
261 processorThread.interrupt();
262 }
263 }
264 processorThread = null;
265
266 if ( log.isInfoEnabled() )
267 {
268 log.info( "Cache event queue destroyed: " + this );
269 }
270 }
271 else
272 {
273 if ( log.isInfoEnabled() )
274 {
275 log.info( "Destroy was called after queue was destroyed. Doing nothing. Stats = " + getStatistics() );
276 }
277 }
278 }
279
280 /***
281 * This adds a put event to the queue. When it is processed, the element will be put to the
282 * listener.
283 * <p>
284 * @param ce The feature to be added to the PutEvent attribute
285 * @exception IOException
286 */
287 public synchronized void addPutEvent( ICacheElement ce )
288 throws IOException
289 {
290 if ( isWorking() )
291 {
292 put( new PutEvent( ce ) );
293 }
294 else
295 {
296 if ( log.isWarnEnabled() )
297 {
298 log.warn( "Not enqueuing Put Event for [" + this + "] because it's non-functional." );
299 }
300 }
301 }
302
303 /***
304 * This adds a remove event to the queue. When processed the listener's remove method will be
305 * called for the key.
306 * <p>
307 * @param key The feature to be added to the RemoveEvent attribute
308 * @exception IOException
309 */
310 public synchronized void addRemoveEvent( Serializable key )
311 throws IOException
312 {
313 if ( isWorking() )
314 {
315 put( new RemoveEvent( key ) );
316 }
317 else
318 {
319 if ( log.isWarnEnabled() )
320 {
321 log.warn( "Not enqueuing Remove Event for [" + this + "] because it's non-functional." );
322 }
323 }
324 }
325
326 /***
327 * This adds a remove all event to the queue. When it is processed, all elements will be removed
328 * from the cache.
329 * <p>
330 * @exception IOException
331 */
332 public synchronized void addRemoveAllEvent()
333 throws IOException
334 {
335 if ( isWorking() )
336 {
337 put( new RemoveAllEvent() );
338 }
339 else
340 {
341 if ( log.isWarnEnabled() )
342 {
343 log.warn( "Not enqueuing RemoveAll Event for [" + this + "] because it's non-functional." );
344 }
345 }
346 }
347
348 /***
349 * @exception IOException
350 */
351 public synchronized void addDisposeEvent()
352 throws IOException
353 {
354 if ( isWorking() )
355 {
356 put( new DisposeEvent() );
357 }
358 else
359 {
360 if ( log.isWarnEnabled() )
361 {
362 log.warn( "Not enqueuing Dispose Event for [" + this + "] because it's non-functional." );
363 }
364 }
365 }
366
367 /***
368 * Adds an event to the queue.
369 * <p>
370 * @param event
371 */
372 private void put( AbstractCacheEvent event )
373 {
374 Node newNode = new Node();
375 if ( log.isDebugEnabled() )
376 {
377 log.debug( "Event entering Queue for " + cacheName + ": " + event );
378 }
379
380 newNode.event = event;
381
382 synchronized ( queueLock )
383 {
384 size++;
385 tail.next = newNode;
386 tail = newNode;
387 if ( isWorking() )
388 {
389 if ( !isAlive() )
390 {
391 destroyed = false;
392 processorThread = new QProcessor( this );
393 processorThread.start();
394 if ( log.isInfoEnabled() )
395 {
396 log.info( "Cache event queue created: " + this );
397 }
398 }
399 else
400 {
401 queueLock.notify();
402 }
403 }
404 }
405 }
406
407 /***
408 * Returns the next cache event from the queue or null if there are no events in the queue.
409 * <p>
410 * We have an empty node at the head and the tail. When we take an item from the queue we move
411 * the next node to the head and then clear the value from that node. This value is returned.
412 * <p>
413 * When the queue is empty the head node is the same as the tail node.
414 * <p>
415 * @return An event to process.
416 */
417 private AbstractCacheEvent take()
418 {
419 synchronized ( queueLock )
420 {
421
422 if ( head == tail )
423 {
424 return null;
425 }
426
427 Node node = head.next;
428
429 AbstractCacheEvent value = node.event;
430
431 if ( log.isDebugEnabled() )
432 {
433 log.debug( "head.event = " + head.event );
434 log.debug( "node.event = " + node.event );
435 }
436
437
438
439 node.event = null;
440 head = node;
441
442 size--;
443 return value;
444 }
445 }
446
447 /***
448 * This method returns semi-structured data on this queue.
449 * <p>
450 * @see org.apache.jcs.engine.behavior.ICacheEventQueue#getStatistics()
451 * @return information on the status and history of the queue
452 */
453 public IStats getStatistics()
454 {
455 IStats stats = new Stats();
456 stats.setTypeName( "Cache Event Queue" );
457
458 ArrayList elems = new ArrayList();
459
460 IStatElement se = null;
461
462 se = new StatElement();
463 se.setName( "Working" );
464 se.setData( "" + this.working );
465 elems.add( se );
466
467 se = new StatElement();
468 se.setName( "Alive" );
469 se.setData( "" + this.isAlive() );
470 elems.add( se );
471
472 se = new StatElement();
473 se.setName( "Empty" );
474 se.setData( "" + this.isEmpty() );
475 elems.add( se );
476
477 int size = 0;
478 synchronized ( queueLock )
479 {
480
481 if ( head == tail )
482 {
483 size = 0;
484 }
485 else
486 {
487 Node n = head;
488 while ( n != null )
489 {
490 n = n.next;
491 size++;
492 }
493 }
494
495 se = new StatElement();
496 se.setName( "Size" );
497 se.setData( "" + size );
498 elems.add( se );
499 }
500
501
502 IStatElement[] ses = (IStatElement[]) elems.toArray( new StatElement[0] );
503 stats.setStatElements( ses );
504
505 return stats;
506 }
507
508
509
510 /*** The queue is composed of nodes. */
511 private static class Node
512 {
513 /*** Next node in the singly linked list. */
514 Node next = null;
515
516 /*** The payload. */
517 CacheEventQueue.AbstractCacheEvent event = null;
518 }
519
520 /***
521 * This is the thread that works the queue.
522 * <p>
523 * @author asmuts
524 * @created January 15, 2002
525 */
526 private class QProcessor
527 extends Thread
528 {
529 /*** The queue to work */
530 CacheEventQueue queue;
531
532 /***
533 * Constructor for the QProcessor object
534 * <p>
535 * @param aQueue the event queue to take items from.
536 */
537 QProcessor( CacheEventQueue aQueue )
538 {
539 super( "CacheEventQueue.QProcessor-" + aQueue.cacheName );
540
541 setDaemon( true );
542 queue = aQueue;
543 }
544
545 /***
546 * Main processing method for the QProcessor object.
547 * <p>
548 * Waits for a specified time (waitToDieMillis) for something to come in and if no new
549 * events come in during that period the run method can exit and the thread is dereferenced.
550 */
551 public void run()
552 {
553 AbstractCacheEvent event = null;
554
555 while ( queue.isAlive() )
556 {
557 event = queue.take();
558
559 if ( log.isDebugEnabled() )
560 {
561 log.debug( "Event from queue = " + event );
562 }
563
564 if ( event == null )
565 {
566 synchronized ( queueLock )
567 {
568 try
569 {
570 queueLock.wait( queue.getWaitToDieMillis() );
571 }
572 catch ( InterruptedException e )
573 {
574 log.warn( "Interrupted while waiting for another event to come in before we die." );
575 return;
576 }
577 event = queue.take();
578 if ( log.isDebugEnabled() )
579 {
580 log.debug( "Event from queue after sleep = " + event );
581 }
582 }
583 if ( event == null )
584 {
585 queue.stopProcessing();
586 }
587 }
588
589 if ( queue.isWorking() && queue.isAlive() && event != null )
590 {
591 event.run();
592 }
593 }
594 if ( log.isDebugEnabled() )
595 {
596 log.debug( "QProcessor exiting for " + queue );
597 }
598 }
599 }
600
601 /***
602 * Retries before declaring failure.
603 * <p>
604 * @author asmuts
605 * @created January 15, 2002
606 */
607 private abstract class AbstractCacheEvent
608 implements Runnable
609 {
610 /*** Number of failures encountered processing this event. */
611 int failures = 0;
612
613 /*** Have we finished the job */
614 boolean done = false;
615
616 /***
617 * Main processing method for the AbstractCacheEvent object
618 */
619 public void run()
620 {
621 try
622 {
623 doRun();
624 }
625 catch ( IOException e )
626 {
627 if ( log.isWarnEnabled() )
628 {
629 log.warn( e );
630 }
631 if ( ++failures >= maxFailure )
632 {
633 if ( log.isWarnEnabled() )
634 {
635 log.warn( "Error while running event from Queue: " + this
636 + ". Dropping Event and marking Event Queue as non-functional." );
637 }
638 setWorking( false );
639 setAlive( false );
640 return;
641 }
642 if ( log.isInfoEnabled() )
643 {
644 log.info( "Error while running event from Queue: " + this + ". Retrying..." );
645 }
646 try
647 {
648 Thread.sleep( waitBeforeRetry );
649 run();
650 }
651 catch ( InterruptedException ie )
652 {
653 if ( log.isErrorEnabled() )
654 {
655 log.warn( "Interrupted while sleeping for retry on event " + this + "." );
656 }
657
658
659 setWorking( false );
660 setAlive( false );
661 }
662 }
663 }
664
665 /***
666 * @exception IOException
667 */
668 protected abstract void doRun()
669 throws IOException;
670 }
671
672 /***
673 * An element should be put in the cache.
674 * <p>
675 * @author asmuts
676 * @created January 15, 2002
677 */
678 private class PutEvent
679 extends AbstractCacheEvent
680 {
681 /*** The element to put to the listener */
682 private ICacheElement ice;
683
684 /***
685 * Constructor for the PutEvent object.
686 * <p>
687 * @param ice
688 * @exception IOException
689 */
690 PutEvent( ICacheElement ice )
691 throws IOException
692 {
693 this.ice = ice;
694 }
695
696 /***
697 * Call put on the listener.
698 * <p>
699 * @exception IOException
700 */
701 protected void doRun()
702 throws IOException
703 {
704 listener.handlePut( ice );
705 }
706
707 /***
708 * For debugging.
709 * <p>
710 * @return Info on the key and value.
711 */
712 public String toString()
713 {
714 return new StringBuffer( "PutEvent for key: " ).append( ice.getKey() ).append( " value: " )
715 .append( ice.getVal() ).toString();
716 }
717
718 }
719
720 /***
721 * An element should be removed from the cache.
722 * <p>
723 * @author asmuts
724 * @created January 15, 2002
725 */
726 private class RemoveEvent
727 extends AbstractCacheEvent
728 {
729 /*** The key to remove from the listener */
730 private Serializable key;
731
732 /***
733 * Constructor for the RemoveEvent object
734 * <p>
735 * @param key
736 * @exception IOException
737 */
738 RemoveEvent( Serializable key )
739 throws IOException
740 {
741 this.key = key;
742 }
743
744 /***
745 * Call remove on the listener.
746 * <p>
747 * @exception IOException
748 */
749 protected void doRun()
750 throws IOException
751 {
752 listener.handleRemove( cacheName, key );
753 }
754
755 /***
756 * For debugging.
757 * <p>
758 * @return Info on the key to remove.
759 */
760 public String toString()
761 {
762 return new StringBuffer( "RemoveEvent for " ).append( key ).toString();
763 }
764
765 }
766
767 /***
768 * All elements should be removed from the cache when this event is processed.
769 * <p>
770 * @author asmuts
771 * @created January 15, 2002
772 */
773 private class RemoveAllEvent
774 extends AbstractCacheEvent
775 {
776 /***
777 * Call removeAll on the listener.
778 * <p>
779 * @exception IOException
780 */
781 protected void doRun()
782 throws IOException
783 {
784 listener.handleRemoveAll( cacheName );
785 }
786
787 /***
788 * For debugging.
789 * <p>
790 * @return The name of the event.
791 */
792 public String toString()
793 {
794 return "RemoveAllEvent";
795 }
796
797 }
798
799 /***
800 * The cache should be disposed when this event is processed.
801 * <p>
802 * @author asmuts
803 * @created January 15, 2002
804 */
805 private class DisposeEvent
806 extends AbstractCacheEvent
807 {
808 /***
809 * Called when gets to the end of the queue
810 * <p>
811 * @exception IOException
812 */
813 protected void doRun()
814 throws IOException
815 {
816 listener.handleDispose( cacheName );
817 }
818
819 /***
820 * For debugging.
821 * <p>
822 * @return The name of the event.
823 */
824 public String toString()
825 {
826 return "DisposeEvent";
827 }
828 }
829
830 /***
831 * @return whether the queue is functional.
832 */
833 public boolean isWorking()
834 {
835 return working;
836 }
837
838 /***
839 * This means that the queue is functional. If we reached the max number of failures, the queue
840 * is marked as non functional and will never work again.
841 * <p>
842 * @param b
843 */
844 public void setWorking( boolean b )
845 {
846 working = b;
847 }
848
849 /***
850 * @return whether there are any items in the queue.
851 */
852 public boolean isEmpty()
853 {
854 return tail == head;
855 }
856
857 /***
858 * Returns the number of elements in the queue.
859 * <p>
860 * @return number of items in the queue.
861 */
862 public int size()
863 {
864 return size;
865 }
866 }