001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.commons.pipeline.driver; 019 020 import java.util.LinkedList; 021 import java.util.Queue; 022 import org.apache.commons.logging.Log; 023 import org.apache.commons.logging.LogFactory; 024 import org.apache.commons.pipeline.Feeder; 025 import org.apache.commons.pipeline.Stage; 026 import org.apache.commons.pipeline.StageException; 027 import org.apache.commons.pipeline.StageContext; 028 import static org.apache.commons.pipeline.StageDriver.State.*; 029 import org.apache.commons.pipeline.StageDriver.State; 030 import static org.apache.commons.pipeline.driver.FaultTolerance.*; 031 032 /** 033 * This is a non-threaded version of the AbstractStageDriver. 034 */ 035 public class SynchronousStageDriver extends AbstractStageDriver { 036 private final Log log = LogFactory.getLog(SynchronousStageDriver.class); 037 038 //queue of objects to be processed that are fed to the driver 039 //when it is not in a running state 040 private Queue<Object> queue = new LinkedList<Object>(); 041 042 //Feeder used to feed objects to this stage 043 private final Feeder feeder = new Feeder() { 044 public void feed(Object obj) { 045 synchronized (SynchronousStageDriver.this) { 046 if (currentState == ERROR) throw new IllegalStateException("Unable to process data: driver in fatal error state."); 047 if (currentState != RUNNING) { //enqueue objects if stage has not been started 048 queue.add(obj); 049 return; 050 } 051 } 052 053 try { 054 stage.process(obj); 055 } catch (StageException e) { 056 recordProcessingException(obj, e); 057 if (faultTolerance == NONE) throw fatalError(e); 058 } 059 } 060 }; 061 062 /** 063 * Creates a new instance of SimpleStageDriver 064 * @param stage The stage to be run 065 * @param context The context in which the stage will be run 066 */ 067 public SynchronousStageDriver(Stage stage, StageContext context, FaultTolerance faultTolerance) { 068 super(stage, context, faultTolerance); 069 } 070 071 /** 072 * Get the feeder for the encapsulated stage. Since the SynchronousStageDriver 073 * is designed to run the stage in the main thread of execution, calls 074 * to {@link Feeder#feed(Object)} on the returned feeder will trigger processing 075 * of the object fed to the stage. 076 * @return The Feeder instance for the stage. 077 */ 078 public Feeder getFeeder() { 079 return this.feeder; 080 } 081 082 /** 083 * Performs preprocessing and updates the driver state. 084 * @throws org.apache.commons.pipeline.StageException Thrown if the driver is in an illegal state to be started or an error occurs 085 * during preprocessing. 086 */ 087 public synchronized void start() throws StageException { 088 if (this.currentState == STOPPED) { 089 try { 090 stage.preprocess(); 091 setState(RUNNING); 092 } catch (StageException e) { 093 throw fatalError(e); 094 } 095 096 // feed any queued values before returning control 097 while (!queue.isEmpty()) this.getFeeder().feed(queue.remove()); 098 } else { 099 throw new IllegalStateException("Illegal attempt to start driver from state: " + this.currentState); 100 } 101 } 102 103 /** 104 * Performs postprocessing and releases stage resources, and updates the driver 105 * state accordingly. 106 * @throws org.apache.commons.pipeline.StageException Thrown if an error occurs during postprocessing 107 */ 108 public synchronized void finish() throws StageException { 109 if (this.currentState == RUNNING) { 110 try { 111 testAndSetState(RUNNING, STOP_REQUESTED); 112 if (this.currentState == STOP_REQUESTED) stage.postprocess(); 113 } catch (StageException e) { 114 throw fatalError(e); 115 } finally { 116 stage.release(); 117 testAndSetState(STOP_REQUESTED, STOPPED); 118 } 119 } else { 120 throw new IllegalStateException("Driver is not running (current state: " + this.currentState + ")"); 121 } 122 } 123 124 /** 125 * This method obtains a lock to set the current state of processing 126 * to error, records the error and returns a RuntimeException encapsulating 127 * the specified throwable. 128 */ 129 private RuntimeException fatalError(Throwable t) { 130 try { 131 setState(ERROR); 132 this.recordFatalError(t); 133 stage.release(); 134 this.notifyAll(); 135 } catch (Exception e) { 136 this.recordFatalError(e); 137 } 138 139 return new RuntimeException("Fatal error halted processing of stage: " + stage); 140 } 141 }