View Javadoc

1   package org.apache.jcs.engine;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.io.Serializable;
24  import java.util.ArrayList;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.jcs.engine.behavior.ICacheElement;
29  import org.apache.jcs.engine.behavior.ICacheEventQueue;
30  import org.apache.jcs.engine.behavior.ICacheListener;
31  import org.apache.jcs.engine.stats.StatElement;
32  import org.apache.jcs.engine.stats.Stats;
33  import org.apache.jcs.engine.stats.behavior.IStatElement;
34  import org.apache.jcs.engine.stats.behavior.IStats;
35  import org.apache.jcs.utils.threadpool.ThreadPool;
36  import org.apache.jcs.utils.threadpool.ThreadPoolManager;
37  
38  import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
39  
40  /***
41   * An event queue is used to propagate ordered cache events to one and only one target listener.
42   * <p>
43   * This is a modified version of the experimental version. It uses a PooledExecutor and a
44   * BoundedBuffer to queue up events and execute them as threads become available.
45   * <p>
46   * The PooledExecutor is static, because presumably these processes will be IO bound, so throwing
47   * more than a few threads at them will serve no purpose other than to saturate the IO interface. In
48   * light of this, having one thread per region seems unnecessary. This may prove to be false.
49   */
50  public class PooledCacheEventQueue
51      implements ICacheEventQueue
52  {
53      /*** The type of event queue */
54      private static final String queueType = POOLED_QUEUE_TYPE;
55  
56      /*** The logger */
57      private static final Log log = LogFactory.getLog( PooledCacheEventQueue.class );
58  
59      /*** time to wait for an event before snuffing the background thread
60      if the queue is empty.   make configurable later */
61      private int waitToDieMillis = 10000;
62  
63      /*** The listener to process events */
64      private ICacheListener listener;
65  
66      /*** The listener id */
67      private long listenerId;
68  
69      /*** The name of the cache */
70      private String cacheName;
71  
72      /*** Max failures before self destruction */
73      private int maxFailure;
74  
75      /*** in milliseconds */
76      private int waitBeforeRetry;
77  
78      /*** Has the pool been destroyed */
79      private boolean destroyed = true;
80  
81      /*** Is it working */
82      private boolean working = true;
83  
84      /*** The Thread Pool to execute events with. */
85      private ThreadPool pool = null;
86  
87      /***
88       * Constructor for the CacheEventQueue object
89       * <p>
90       * @param listener
91       * @param listenerId
92       * @param cacheName
93       * @param maxFailure
94       * @param waitBeforeRetry
95       * @param threadPoolName
96       */
97      public PooledCacheEventQueue( ICacheListener listener, long listenerId, String cacheName, int maxFailure,
98                                    int waitBeforeRetry, String threadPoolName )
99      {
100         initialize( listener, listenerId, cacheName, maxFailure, waitBeforeRetry, threadPoolName );
101     }
102 
103     /***
104      * Initializes the queue.
105      * <p>
106      * @param listener
107      * @param listenerId
108      * @param cacheName
109      * @param maxFailure
110      * @param waitBeforeRetry
111      * @param threadPoolName
112      */
113     public void initialize( ICacheListener listener, long listenerId, String cacheName, int maxFailure,
114                             int waitBeforeRetry, String threadPoolName )
115     {
116         if ( listener == null )
117         {
118             throw new IllegalArgumentException( "listener must not be null" );
119         }
120 
121         this.listener = listener;
122         this.listenerId = listenerId;
123         this.cacheName = cacheName;
124         this.maxFailure = maxFailure <= 0 ? 3 : maxFailure;
125         this.waitBeforeRetry = waitBeforeRetry <= 0 ? 500 : waitBeforeRetry;
126 
127         // this will share the same pool with other event queues by default.
128         if ( threadPoolName == null )
129         {
130             threadPoolName = "cache_event_queue";
131         }
132         pool = ThreadPoolManager.getInstance().getPool( threadPoolName );
133 
134         if ( log.isDebugEnabled() )
135         {
136             log.debug( "Initialized: " + this );
137         }
138     }
139 
140     /***
141      * @return the queue type
142      */
143     public String getQueueType()
144     {
145         return queueType;
146     }
147 
148     /***
149      * Event Q is empty.
150      */
151     public synchronized void stopProcessing()
152     {
153         destroyed = true;
154     }
155 
156     /***
157      * Returns the time to wait for events before killing the background thread.
158      * <p>
159      * @return the time to wait before shutting down in ms.
160      */
161     public int getWaitToDieMillis()
162     {
163         return waitToDieMillis;
164     }
165 
166     /***
167      * Sets the time to wait for events before killing the background thread.
168      * <p>
169      * @param wtdm
170      */
171     public void setWaitToDieMillis( int wtdm )
172     {
173         waitToDieMillis = wtdm;
174     }
175 
176     /***
177      * @return String info.
178      */
179     public String toString()
180     {
181         return "CacheEventQueue [listenerId=" + listenerId + ", cacheName=" + cacheName + "]";
182     }
183 
184     /***
185      * @return true if not destroyed.
186      */
187     public boolean isAlive()
188     {
189         return ( !destroyed );
190     }
191 
192     /***
193      * @param aState
194      */
195     public void setAlive( boolean aState )
196     {
197         destroyed = !aState;
198     }
199 
200     /***
201      * @return The listenerId value
202      */
203     public long getListenerId()
204     {
205         return listenerId;
206     }
207 
208     /***
209      * Destroy the queue. Interrupt all threads.
210      */
211     public synchronized void destroy()
212     {
213         if ( !destroyed )
214         {
215             destroyed = true;
216             // TODO decide whether to shutdown or interrupt
217             // pool.getPool().shutdownNow();
218             pool.getPool().interruptAll();
219             if ( log.isInfoEnabled() )
220             {
221                 log.info( "Cache event queue destroyed: " + this );
222             }
223         }
224     }
225 
226     /***
227      * Constructs a PutEvent for the object and passes it to the event queue.
228      * <p>
229      * @param ce The feature to be added to the PutEvent attribute
230      * @exception IOException
231      */
232     public synchronized void addPutEvent( ICacheElement ce )
233         throws IOException
234     {
235         if ( isWorking() )
236         {
237             put( new PutEvent( ce ) );
238         }
239         else
240         {
241             if ( log.isWarnEnabled() )
242             {
243                 log.warn( "Not enqueuing Put Event for [" + this + "] because it's non-functional." );
244             }
245         }
246     }
247 
248     /***
249      * @param key The feature to be added to the RemoveEvent attribute
250      * @exception IOException
251      */
252     public synchronized void addRemoveEvent( Serializable key )
253         throws IOException
254     {
255         if ( isWorking() )
256         {
257             put( new RemoveEvent( key ) );
258         }
259         else
260         {
261             if ( log.isWarnEnabled() )
262             {
263                 log.warn( "Not enqueuing Remove Event for [" + this + "] because it's non-functional." );
264             }
265         }
266     }
267 
268     /***
269      * @exception IOException
270      */
271     public synchronized void addRemoveAllEvent()
272         throws IOException
273     {
274         if ( isWorking() )
275         {
276             put( new RemoveAllEvent() );
277         }
278         else
279         {
280             if ( log.isWarnEnabled() )
281             {
282                 log.warn( "Not enqueuing RemoveAll Event for [" + this + "] because it's non-functional." );
283             }
284         }
285     }
286 
287     /***
288      * @exception IOException
289      */
290     public synchronized void addDisposeEvent()
291         throws IOException
292     {
293         if ( isWorking() )
294         {
295             put( new DisposeEvent() );
296         }
297         else
298         {
299             if ( log.isWarnEnabled() )
300             {
301                 log.warn( "Not enqueuing Dispose Event for [" + this + "] because it's non-functional." );
302             }
303         }
304     }
305 
306     /***
307      * Adds an event to the queue.
308      * <p>
309      * @param event
310      */
311     private void put( AbstractCacheEvent event )
312     {
313         try
314         {
315             pool.execute( event );
316         }
317         catch ( InterruptedException e )
318         {
319             log.error( e );
320         }
321     }
322 
323     /***
324      * @return Statistics info
325      */
326     public String getStats()
327     {
328         return getStatistics().toString();
329     }
330 
331     /***
332      * @return IStats
333      */
334     public IStats getStatistics()
335     {
336         IStats stats = new Stats();
337         stats.setTypeName( "Pooled Cache Event Queue" );
338 
339         ArrayList elems = new ArrayList();
340 
341         IStatElement se = null;
342 
343         se = new StatElement();
344         se.setName( "Working" );
345         se.setData( "" + this.working );
346         elems.add( se );
347 
348         se = new StatElement();
349         se.setName( "Destroyed" );
350         se.setData( "" + this.isAlive() );
351         elems.add( se );
352 
353         se = new StatElement();
354         se.setName( "Empty" );
355         se.setData( "" + this.isEmpty() );
356         elems.add( se );
357 
358         if ( pool.getQueue() != null )
359         {
360             if ( pool.getQueue() instanceof BoundedBuffer )
361             {
362                 BoundedBuffer bb = (BoundedBuffer) pool.getQueue();
363                 se = new StatElement();
364                 se.setName( "Queue Size" );
365                 se.setData( "" + bb.size() );
366                 elems.add( se );
367 
368                 se = new StatElement();
369                 se.setName( "Queue Capacity" );
370                 se.setData( "" + bb.capacity() );
371                 elems.add( se );
372             }
373         }
374 
375         se = new StatElement();
376         se.setName( "Pool Size" );
377         se.setData( "" + pool.getPool().getPoolSize() );
378         elems.add( se );
379 
380         se = new StatElement();
381         se.setName( "Maximum Pool Size" );
382         se.setData( "" + pool.getPool().getMaximumPoolSize() );
383         elems.add( se );
384 
385         // get an array and put them in the Stats object
386         IStatElement[] ses = (IStatElement[]) elems.toArray( new StatElement[elems.size()] );
387         stats.setStatElements( ses );
388 
389         return stats;
390     }
391 
392     // /////////////////////////// Inner classes /////////////////////////////
393 
394     /***
395      * Retries before declaring failure.
396      * <p>
397      * @author asmuts
398      * @created January 15, 2002
399      */
400     private abstract class AbstractCacheEvent
401         implements Runnable
402     {
403         /*** Times failed to process */
404         int failures = 0;
405 
406         /*** Has the event been processed */
407         boolean done = false;
408 
409         /***
410          * Main processing method for the AbstractCacheEvent object. It calls the abstract doRun
411          * method that all concrete instances must implement.
412          */
413         public void run()
414         {
415             try
416             {
417                 doRun();
418             }
419             catch ( IOException e )
420             {
421                 if ( log.isWarnEnabled() )
422                 {
423                     log.warn( e );
424                 }
425                 if ( ++failures >= maxFailure )
426                 {
427                     if ( log.isWarnEnabled() )
428                     {
429                         log.warn( "Error while running event from Queue: " + this
430                             + ". Dropping Event and marking Event Queue as non-functional." );
431                     }
432                     setWorking( false );
433                     setAlive( false );
434                     return;
435                 }
436                 if ( log.isInfoEnabled() )
437                 {
438                     log.info( "Error while running event from Queue: " + this + ". Retrying..." );
439                 }
440                 try
441                 {
442                     Thread.sleep( waitBeforeRetry );
443                     run();
444                 }
445                 catch ( InterruptedException ie )
446                 {
447                     if ( log.isErrorEnabled() )
448                     {
449                         log.warn( "Interrupted while sleeping for retry on event " + this + "." );
450                     }
451                     setWorking( false );
452                     setAlive( false );
453                 }
454             }
455         }
456 
457         /***
458          * @exception IOException
459          */
460         protected abstract void doRun()
461             throws IOException;
462     }
463 
464     /***
465      * An event that puts an item to a ICacheListener
466      */
467     private class PutEvent
468         extends AbstractCacheEvent
469     {
470         /*** The payload */
471         private ICacheElement ice;
472 
473         /***
474          * Constructor for the PutEvent object
475          * @param ice
476          * @exception IOException
477          */
478         PutEvent( ICacheElement ice )
479             throws IOException
480         {
481             this.ice = ice;
482         }
483 
484         /***
485          * Tells the ICacheListener to handle the put.
486          * <p>
487          * @exception IOException
488          */
489         protected void doRun()
490             throws IOException
491         {
492             listener.handlePut( ice );
493         }
494 
495         /*** @return debugging info */
496         public String toString()
497         {
498             return new StringBuffer( "PutEvent for key: " ).append( ice.getKey() ).append( " value: " )
499                 .append( ice.getVal() ).toString();
500         }
501     }
502 
503     /***
504      * An event that knows how to call remove on an ICacheListener
505      */
506     private class RemoveEvent
507         extends AbstractCacheEvent
508     {
509         /*** The payload, the key to remove */
510         private Serializable key;
511 
512         /***
513          * Constructor for the RemoveEvent object
514          * @param key
515          * @exception IOException
516          */
517         RemoveEvent( Serializable key )
518             throws IOException
519         {
520             this.key = key;
521         }
522 
523         /***
524          * Calls remove on the listener.
525          * <p>
526          * @exception IOException
527          */
528         protected void doRun()
529             throws IOException
530         {
531             listener.handleRemove( cacheName, key );
532         }
533 
534         /*** @return debugging info */
535         public String toString()
536         {
537             return new StringBuffer( "RemoveEvent for " ).append( key ).toString();
538         }
539     }
540 
541     /***
542      * An event that knows how to call remove all on an ICacheListener
543      */
544     private class RemoveAllEvent
545         extends AbstractCacheEvent
546     {
547         /***
548          * Call removeAll on the listener.
549          * <p>
550          * @exception IOException
551          */
552         protected void doRun()
553             throws IOException
554         {
555             listener.handleRemoveAll( cacheName );
556         }
557 
558         /*** @return debugging info */
559         public String toString()
560         {
561             return "RemoveAllEvent";
562         }
563     }
564 
565     /***
566      * The Event put into the queue for dispose requests.
567      */
568     private class DisposeEvent
569         extends AbstractCacheEvent
570     {
571         /***
572          * Called when gets to the end of the queue
573          * <p>
574          * @exception IOException
575          */
576         protected void doRun()
577             throws IOException
578         {
579             listener.handleDispose( cacheName );
580         }
581 
582         /*** @return debugging info */
583         public String toString()
584         {
585             return "DisposeEvent";
586         }
587     }
588 
589     /***
590      * @return whether or not the queue is functional
591      */
592     public boolean isWorking()
593     {
594         return working;
595     }
596 
597     /***
598      * @param isWorkingArg whether the queue is functional
599      */
600     public void setWorking( boolean isWorkingArg )
601     {
602         working = isWorkingArg;
603     }
604 
605     /***
606      * If the Queue is using a bounded channel we can determine the size. If it is zero or we can't
607      * determine the size, we return true.
608      * <p>
609      * @return whether or not there are items in the queue
610      */
611     public boolean isEmpty()
612     {
613         if ( pool.getQueue() == null )
614         {
615             return pool.getQueue().peek() == null;
616         }
617         else
618         {
619             if ( pool.getQueue() instanceof BoundedBuffer )
620             {
621                 BoundedBuffer bb = (BoundedBuffer) pool.getQueue();
622                 return bb.size() == 0;
623             }
624             else
625             {
626                 return true;
627             }
628         }
629     }
630 
631     /***
632      * Returns the number of elements in the queue. If the queue cannot determine the size
633      * accurately it will return 1.
634      * <p>
635      * @return number of items in the queue.
636      */
637     public int size()
638     {
639         if ( pool.getQueue() == null )
640         {
641             return pool.getQueue().peek() == null ? 0 : 1;
642         }
643         else
644         {
645             if ( pool.getQueue() instanceof BoundedBuffer )
646             {
647                 BoundedBuffer bb = (BoundedBuffer) pool.getQueue();
648                 return bb.size();
649             }
650             else
651             {
652                 return 1;
653             }
654         }
655     }
656 }