View Javadoc

1   package org.apache.jcs.engine;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.io.Serializable;
24  import java.util.ArrayList;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.jcs.engine.behavior.ICacheElement;
29  import org.apache.jcs.engine.behavior.ICacheEventQueue;
30  import org.apache.jcs.engine.behavior.ICacheListener;
31  import org.apache.jcs.engine.stats.StatElement;
32  import org.apache.jcs.engine.stats.Stats;
33  import org.apache.jcs.engine.stats.behavior.IStatElement;
34  import org.apache.jcs.engine.stats.behavior.IStats;
35  
36  /***
37   * An event queue is used to propagate ordered cache events to one and only one target listener.
38   * <p>
39   * This is a modified version of the experimental version. It should lazy initilaize the processor
40   * thread, and kill the thread if the queue goes emtpy for a specified period, now set to 1 minute.
41   * If something comes in after that a new processor thread should be created.
42   */
43  public class CacheEventQueue
44      implements ICacheEventQueue
45  {
46      /*** The logger. */
47      private static final Log log = LogFactory.getLog( CacheEventQueue.class );
48  
49      /*** The type of queue -- there are pooled and single */
50      private static final String queueType = SINGLE_QUEUE_TYPE;
51  
52      /*** default */
53      private static final int DEFAULT_WAIT_TO_DIE_MILLIS = 10000;
54  
55      /***
56       * time to wait for an event before snuffing the background thread if the queue is empty. make
57       * configurable later
58       */
59      private int waitToDieMillis = DEFAULT_WAIT_TO_DIE_MILLIS;
60  
61      /***
62       * When the events are pulled off the queue, the tell the listener to handle the specific event
63       * type. The work is done by the listener.
64       */
65      private ICacheListener listener;
66  
67      /*** Id of the listener registed with this queue */
68      private long listenerId;
69  
70      /*** The cache region name, if applicable. */
71      private String cacheName;
72  
73      /*** Maximum number of failures before we buy the farm. */
74      private int maxFailure;
75  
76      /*** in milliseconds */
77      private int waitBeforeRetry;
78  
79      /*** this is true if there is no worker thread. */
80      private boolean destroyed = true;
81  
82      /***
83       * This means that the queue is functional. If we reached the max number of failures, the queue
84       * is marked as non functional and will never work again.
85       */
86      private boolean working = true;
87  
88      /*** the thread that works the queue. */
89      private Thread processorThread;
90  
91      /*** sync */
92      private Object queueLock = new Object();
93  
94      /*** the head of the queue */
95      private Node head = new Node();
96  
97      /*** the end of the queue */
98      private Node tail = head;
99  
100     /*** Number of items in the queue */
101     private int size = 0;
102 
103     /***
104      * Constructs with the specified listener and the cache name.
105      * <p>
106      * @param listener
107      * @param listenerId
108      * @param cacheName
109      */
110     public CacheEventQueue( ICacheListener listener, long listenerId, String cacheName )
111     {
112         this( listener, listenerId, cacheName, 10, 500 );
113     }
114 
115     /***
116      * Constructor for the CacheEventQueue object
117      * <p>
118      * @param listener
119      * @param listenerId
120      * @param cacheName
121      * @param maxFailure
122      * @param waitBeforeRetry
123      */
124     public CacheEventQueue( ICacheListener listener, long listenerId, String cacheName, int maxFailure,
125                             int waitBeforeRetry )
126     {
127         initialize( listener, listenerId, cacheName, maxFailure, waitBeforeRetry, null );
128     }
129 
130     /***
131      * Initializes the queue.
132      * <p>
133      * @param listener
134      * @param listenerId
135      * @param cacheName
136      * @param maxFailure
137      * @param waitBeforeRetry
138      * @param threadPoolName
139      */
140     public void initialize( ICacheListener listener, long listenerId, String cacheName, int maxFailure,
141                             int waitBeforeRetry, String threadPoolName )
142     {
143         if ( listener == null )
144         {
145             throw new IllegalArgumentException( "listener must not be null" );
146         }
147 
148         this.listener = listener;
149         this.listenerId = listenerId;
150         this.cacheName = cacheName;
151         this.maxFailure = maxFailure <= 0 ? 3 : maxFailure;
152         this.waitBeforeRetry = waitBeforeRetry <= 0 ? 500 : waitBeforeRetry;
153 
154         if ( log.isDebugEnabled() )
155         {
156             log.debug( "Constructed: " + this );
157         }
158     }
159     
160     /***
161      * What type of queue is this.
162      * <p>
163      * @return queueType
164      */
165     public String getQueueType()
166     {
167         return queueType;
168     }
169 
170     /***
171      * Kill the processor thread and indicate that the queue is detroyed and no longer alive, but it
172      * can still be working.
173      */
174     public synchronized void stopProcessing()
175     {
176         destroyed = true;
177         processorThread = null;
178     }
179 
180     /***
181      * Returns the time to wait for events before killing the background thread.
182      * <p>
183      * @return int
184      */
185     public int getWaitToDieMillis()
186     {
187         return waitToDieMillis;
188     }
189 
190     /***
191      * Sets the time to wait for events before killing the background thread.
192      * <p>
193      * @param wtdm the ms for the q to sit idle.
194      */
195     public void setWaitToDieMillis( int wtdm )
196     {
197         waitToDieMillis = wtdm;
198     }
199 
200     /***
201      * Creates a brief string identifying the listener and the region.
202      * <p>
203      * @return String debugging info.
204      */
205     public String toString()
206     {
207         return "CacheEventQueue [listenerId=" + listenerId + ", cacheName=" + cacheName + "]";
208     }
209 
210     /***
211      * If they queue has an active thread it is considered alive.
212      * <p>
213      * @return The alive value
214      */
215     public synchronized boolean isAlive()
216     {
217         return ( !destroyed );
218     }
219 
220     /***
221      * Sets whether the queue is actively processing -- if there are working threads.
222      * <p>
223      * @param aState
224      */
225     public synchronized void setAlive( boolean aState )
226     {
227         destroyed = !aState;
228     }
229 
230     /***
231      * @return The listenerId value
232      */
233     public long getListenerId()
234     {
235         return listenerId;
236     }
237 
238     /***
239      * Event Q is emtpy.
240      * <p>
241      * Calling destroy interupts the processor thread.
242      */
243     public synchronized void destroy()
244     {
245         if ( !destroyed )
246         {
247             destroyed = true;
248 
249             if ( log.isInfoEnabled() )
250             {
251                 log.info( "Destroying queue, stats =  " + getStatistics() );
252             }
253 
254             // Synchronize on queue so the thread will not wait forever,
255             // and then interrupt the QueueProcessor
256 
257             if ( processorThread != null )
258             {
259                 synchronized ( queueLock )
260                 {
261                     processorThread.interrupt();
262                 }
263             }
264             processorThread = null;
265 
266             if ( log.isInfoEnabled() )
267             {
268                 log.info( "Cache event queue destroyed: " + this );
269             }
270         }
271         else
272         {
273             if ( log.isInfoEnabled() )
274             {
275                 log.info( "Destroy was called after queue was destroyed.  Doing nothing.  Stats =  " + getStatistics() );
276             }
277         }
278     }
279 
280     /***
281      * This adds a put event to the queue. When it is processed, the element will be put to the
282      * listener.
283      * <p>
284      * @param ce The feature to be added to the PutEvent attribute
285      * @exception IOException
286      */
287     public synchronized void addPutEvent( ICacheElement ce )
288         throws IOException
289     {
290         if ( isWorking() )
291         {
292             put( new PutEvent( ce ) );
293         }
294         else
295         {
296             if ( log.isWarnEnabled() )
297             {
298                 log.warn( "Not enqueuing Put Event for [" + this + "] because it's non-functional." );
299             }
300         }
301     }
302 
303     /***
304      * This adds a remove event to the queue. When processed the listener's remove method will be
305      * called for the key.
306      * <p>
307      * @param key The feature to be added to the RemoveEvent attribute
308      * @exception IOException
309      */
310     public synchronized void addRemoveEvent( Serializable key )
311         throws IOException
312     {
313         if ( isWorking() )
314         {
315             put( new RemoveEvent( key ) );
316         }
317         else
318         {
319             if ( log.isWarnEnabled() )
320             {
321                 log.warn( "Not enqueuing Remove Event for [" + this + "] because it's non-functional." );
322             }
323         }
324     }
325 
326     /***
327      * This adds a remove all event to the queue. When it is processed, all elements will be removed
328      * from the cache.
329      * <p>
330      * @exception IOException
331      */
332     public synchronized void addRemoveAllEvent()
333         throws IOException
334     {
335         if ( isWorking() )
336         {
337             put( new RemoveAllEvent() );
338         }
339         else
340         {
341             if ( log.isWarnEnabled() )
342             {
343                 log.warn( "Not enqueuing RemoveAll Event for [" + this + "] because it's non-functional." );
344             }
345         }
346     }
347 
348     /***
349      * @exception IOException
350      */
351     public synchronized void addDisposeEvent()
352         throws IOException
353     {
354         if ( isWorking() )
355         {
356             put( new DisposeEvent() );
357         }
358         else
359         {
360             if ( log.isWarnEnabled() )
361             {
362                 log.warn( "Not enqueuing Dispose Event for [" + this + "] because it's non-functional." );
363             }
364         }
365     }
366 
367     /***
368      * Adds an event to the queue.
369      * <p>
370      * @param event
371      */
372     private void put( AbstractCacheEvent event )
373     {
374         Node newNode = new Node();
375         if ( log.isDebugEnabled() )
376         {
377             log.debug( "Event entering Queue for " + cacheName + ": " + event );
378         }
379 
380         newNode.event = event;
381 
382         synchronized ( queueLock )
383         {
384             size++;
385             tail.next = newNode;
386             tail = newNode;
387             if ( isWorking() )
388             {
389                 if ( !isAlive() )
390                 {
391                     destroyed = false;
392                     processorThread = new QProcessor( this );
393                     processorThread.start();
394                     if ( log.isInfoEnabled() )
395                     {
396                         log.info( "Cache event queue created: " + this );
397                     }
398                 }
399                 else
400                 {
401                     queueLock.notify();
402                 }
403             }
404         }
405     }
406 
407     /***
408      * Returns the next cache event from the queue or null if there are no events in the queue.
409      * <p>
410      * We have an empty node at the head and the tail. When we take an item from the queue we move
411      * the next node to the head and then clear the value from that node. This value is returned.
412      * <p>
413      * When the queue is empty the head node is the same as the tail node.
414      * <p>
415      * @return An event to process.
416      */
417     private AbstractCacheEvent take()
418     {
419         synchronized ( queueLock )
420         {
421             // wait until there is something to read
422             if ( head == tail )
423             {
424                 return null;
425             }
426 
427             Node node = head.next;
428 
429             AbstractCacheEvent value = node.event;
430 
431             if ( log.isDebugEnabled() )
432             {
433                 log.debug( "head.event = " + head.event );
434                 log.debug( "node.event = " + node.event );
435             }
436 
437             // Node becomes the new head (head is always empty)
438 
439             node.event = null;
440             head = node;
441 
442             size--;
443             return value;
444         }
445     }
446 
447     /***
448      * This method returns semi-structured data on this queue.
449      * <p>
450      * @see org.apache.jcs.engine.behavior.ICacheEventQueue#getStatistics()
451      * @return information on the status and history of the queue
452      */
453     public IStats getStatistics()
454     {
455         IStats stats = new Stats();
456         stats.setTypeName( "Cache Event Queue" );
457 
458         ArrayList elems = new ArrayList();
459 
460         IStatElement se = null;
461 
462         se = new StatElement();
463         se.setName( "Working" );
464         se.setData( "" + this.working );
465         elems.add( se );
466 
467         se = new StatElement();
468         se.setName( "Alive" );
469         se.setData( "" + this.isAlive() );
470         elems.add( se );
471 
472         se = new StatElement();
473         se.setName( "Empty" );
474         se.setData( "" + this.isEmpty() );
475         elems.add( se );
476 
477         int size = 0;
478         synchronized ( queueLock )
479         {
480             // wait until there is something to read
481             if ( head == tail )
482             {
483                 size = 0;
484             }
485             else
486             {
487                 Node n = head;
488                 while ( n != null )
489                 {
490                     n = n.next;
491                     size++;
492                 }
493             }
494 
495             se = new StatElement();
496             se.setName( "Size" );
497             se.setData( "" + size );
498             elems.add( se );
499         }
500 
501         // get an array and put them in the Stats object
502         IStatElement[] ses = (IStatElement[]) elems.toArray( new StatElement[0] );
503         stats.setStatElements( ses );
504 
505         return stats;
506     }
507 
508     // /////////////////////////// Inner classes /////////////////////////////
509 
510     /*** The queue is composed of nodes. */
511     private static class Node
512     {
513         /*** Next node in the singly linked list. */
514         Node next = null;
515 
516         /*** The payload. */
517         CacheEventQueue.AbstractCacheEvent event = null;
518     }
519 
520     /***
521      * This is the thread that works the queue.
522      * <p>
523      * @author asmuts
524      * @created January 15, 2002
525      */
526     private class QProcessor
527         extends Thread
528     {
529         /*** The queue to work */
530         CacheEventQueue queue;
531 
532         /***
533          * Constructor for the QProcessor object
534          * <p>
535          * @param aQueue the event queue to take items from.
536          */
537         QProcessor( CacheEventQueue aQueue )
538         {
539             super( "CacheEventQueue.QProcessor-" + aQueue.cacheName );
540 
541             setDaemon( true );
542             queue = aQueue;
543         }
544 
545         /***
546          * Main processing method for the QProcessor object.
547          * <p>
548          * Waits for a specified time (waitToDieMillis) for something to come in and if no new
549          * events come in during that period the run method can exit and the thread is dereferenced.
550          */
551         public void run()
552         {
553             AbstractCacheEvent event = null;
554 
555             while ( queue.isAlive() )
556             {
557                 event = queue.take();
558 
559                 if ( log.isDebugEnabled() )
560                 {
561                     log.debug( "Event from queue = " + event );
562                 }
563 
564                 if ( event == null )
565                 {
566                     synchronized ( queueLock )
567                     {
568                         try
569                         {
570                             queueLock.wait( queue.getWaitToDieMillis() );
571                         }
572                         catch ( InterruptedException e )
573                         {
574                             log.warn( "Interrupted while waiting for another event to come in before we die." );
575                             return;
576                         }
577                         event = queue.take();
578                         if ( log.isDebugEnabled() )
579                         {
580                             log.debug( "Event from queue after sleep = " + event );
581                         }
582                     }
583                     if ( event == null )
584                     {
585                         queue.stopProcessing();
586                     }
587                 }
588 
589                 if ( queue.isWorking() && queue.isAlive() && event != null )
590                 {
591                     event.run();
592                 }
593             }
594             if ( log.isDebugEnabled() )
595             {
596                 log.debug( "QProcessor exiting for " + queue );
597             }
598         }
599     }
600 
601     /***
602      * Retries before declaring failure.
603      * <p>
604      * @author asmuts
605      * @created January 15, 2002
606      */
607     private abstract class AbstractCacheEvent
608         implements Runnable
609     {
610         /*** Number of failures encountered processing this event. */
611         int failures = 0;
612 
613         /*** Have we finished the job */
614         boolean done = false;
615 
616         /***
617          * Main processing method for the AbstractCacheEvent object
618          */
619         public void run()
620         {
621             try
622             {
623                 doRun();
624             }
625             catch ( IOException e )
626             {
627                 if ( log.isWarnEnabled() )
628                 {
629                     log.warn( e );
630                 }
631                 if ( ++failures >= maxFailure )
632                 {
633                     if ( log.isWarnEnabled() )
634                     {
635                         log.warn( "Error while running event from Queue: " + this
636                             + ". Dropping Event and marking Event Queue as non-functional." );
637                     }
638                     setWorking( false );
639                     setAlive( false );
640                     return;
641                 }
642                 if ( log.isInfoEnabled() )
643                 {
644                     log.info( "Error while running event from Queue: " + this + ". Retrying..." );
645                 }
646                 try
647                 {
648                     Thread.sleep( waitBeforeRetry );
649                     run();
650                 }
651                 catch ( InterruptedException ie )
652                 {
653                     if ( log.isErrorEnabled() )
654                     {
655                         log.warn( "Interrupted while sleeping for retry on event " + this + "." );
656                     }
657                     // TODO consider if this is best. maybe we shoudl just
658                     // destroy
659                     setWorking( false );
660                     setAlive( false );
661                 }
662             }
663         }
664 
665         /***
666          * @exception IOException
667          */
668         protected abstract void doRun()
669             throws IOException;
670     }
671 
672     /***
673      * An element should be put in the cache.
674      * <p>
675      * @author asmuts
676      * @created January 15, 2002
677      */
678     private class PutEvent
679         extends AbstractCacheEvent
680     {
681         /*** The element to put to the listener */
682         private ICacheElement ice;
683 
684         /***
685          * Constructor for the PutEvent object.
686          * <p>
687          * @param ice
688          * @exception IOException
689          */
690         PutEvent( ICacheElement ice )
691             throws IOException
692         {
693             this.ice = ice;
694         }
695 
696         /***
697          * Call put on the listener.
698          * <p>
699          * @exception IOException
700          */
701         protected void doRun()
702             throws IOException
703         {
704             listener.handlePut( ice );
705         }
706 
707         /***
708          * For debugging.
709          * <p>
710          * @return Info on the key and value.
711          */
712         public String toString()
713         {
714             return new StringBuffer( "PutEvent for key: " ).append( ice.getKey() ).append( " value: " )
715                 .append( ice.getVal() ).toString();
716         }
717 
718     }
719 
720     /***
721      * An element should be removed from the cache.
722      * <p>
723      * @author asmuts
724      * @created January 15, 2002
725      */
726     private class RemoveEvent
727         extends AbstractCacheEvent
728     {
729         /*** The key to remove from the listener */
730         private Serializable key;
731 
732         /***
733          * Constructor for the RemoveEvent object
734          * <p>
735          * @param key
736          * @exception IOException
737          */
738         RemoveEvent( Serializable key )
739             throws IOException
740         {
741             this.key = key;
742         }
743 
744         /***
745          * Call remove on the listener.
746          * <p>
747          * @exception IOException
748          */
749         protected void doRun()
750             throws IOException
751         {
752             listener.handleRemove( cacheName, key );
753         }
754 
755         /***
756          * For debugging.
757          * <p>
758          * @return Info on the key to remove.
759          */
760         public String toString()
761         {
762             return new StringBuffer( "RemoveEvent for " ).append( key ).toString();
763         }
764 
765     }
766 
767     /***
768      * All elements should be removed from the cache when this event is processed.
769      * <p>
770      * @author asmuts
771      * @created January 15, 2002
772      */
773     private class RemoveAllEvent
774         extends AbstractCacheEvent
775     {
776         /***
777          * Call removeAll on the listener.
778          * <p>
779          * @exception IOException
780          */
781         protected void doRun()
782             throws IOException
783         {
784             listener.handleRemoveAll( cacheName );
785         }
786 
787         /***
788          * For debugging.
789          * <p>
790          * @return The name of the event.
791          */
792         public String toString()
793         {
794             return "RemoveAllEvent";
795         }
796 
797     }
798 
799     /***
800      * The cache should be disposed when this event is processed.
801      * <p>
802      * @author asmuts
803      * @created January 15, 2002
804      */
805     private class DisposeEvent
806         extends AbstractCacheEvent
807     {
808         /***
809          * Called when gets to the end of the queue
810          * <p>
811          * @exception IOException
812          */
813         protected void doRun()
814             throws IOException
815         {
816             listener.handleDispose( cacheName );
817         }
818 
819         /***
820          * For debugging.
821          * <p>
822          * @return The name of the event.
823          */
824         public String toString()
825         {
826             return "DisposeEvent";
827         }
828     }
829 
830     /***
831      * @return whether the queue is functional.
832      */
833     public boolean isWorking()
834     {
835         return working;
836     }
837 
838     /***
839      * This means that the queue is functional. If we reached the max number of failures, the queue
840      * is marked as non functional and will never work again.
841      * <p>
842      * @param b
843      */
844     public void setWorking( boolean b )
845     {
846         working = b;
847     }
848 
849     /***
850      * @return whether there are any items in the queue.
851      */
852     public boolean isEmpty()
853     {
854         return tail == head;
855     }
856 
857     /***
858      * Returns the number of elements in the queue.
859      * <p>
860      * @return number of items in the queue.
861      */
862     public int size()
863     {
864         return size;
865     }
866 }