package com.telegraph.online.orbeon; import java.io.IOException; import java.io.InputStream; import java.io.OutputStreamWriter; import java.io.PipedInputStream; import java.io.PipedOutputStream; import org.apache.log4j.Logger; import org.orbeon.oxf.pipeline.api.PipelineContext; import org.orbeon.oxf.processor.SimpleProcessor; import org.xml.sax.ContentHandler; import com.megginson.sax.XMLWriter; /** *
This class was written in response to an issue related to the Orbeon Forms'
* API for custom java processors
* The issue was originally raised on the Orbeon Forms mailing list:
* "How can I get a handle to an InputStream my SimpleProcessor class extension?"
*
Here is a rendition of the above problem statement:
* *"I am writing a custom processor, and I have a question about the * Orbeon SimpleProcessor API I'm hoping someone can help me out with: * How can I get a handle to an InputStream my SimpleProcessor class * extension?
* *
My processor has two inputs coming in: *
#2 can potentially be very big, and so I have written an import * routine which handles it all as an inputstream / sax events. Here is * the signature for my import action method:
*
* public void doImport(InputStream xmlInputStream) throws ImporterException{
* ...
* }
*
* I then have a customer processor, extending Orbeon's SimpleProcessor * class. Now, I know how to generate SAX events from an * org.xml.sax.ContentHandler in my SimpleProcessor extension:
*
* public void generateData(PipelineContext context, ContentHandlercontentHandler) * ... * ContentHandler myContentHandler = ... * readInputAsSAX(context, INPUT_DATA, myContentHandler); * ... * } **
but I'm not sure how to get a handle to a java.io.InputStream for my XML input. Can anyone help?"
* * This class provides the solution for the above problem, converting all events sent to the Orbeon contenthandler, * into a bytestream / inputstream. The conversion to an inputstream is itself handled by a 3rd party library called * XMLWriter from David Megginson, * where as java io pipes are deployed for pumping events from the contenthandler into the inputstream. * *The idea behind XMLWriter is fairly straightforward. Here is how we initialise it:
** pipedInputStream = new PipedInputStream(); * PipedOutputStream pipedOutputStream = new PipedOutputStream(pipedInputStream); * OutputStreamWriter outputStreamWriter = new OutputStreamWriter(pipedOutputStream); * XMLWriter xmlWriter = new XMLWriter(outputStreamWriter); ** *
If we look at the source for XMLWriter, we can see how it achieves the conversion from contentHandler events to an inputstream:
*
* public void startElement (String uri, String localName, String qName, Attributes atts)throws SAXException
* {
* elementLevel++;
* nsSupport.pushContext();
* write('<');
* writeName(uri, localName, qName, true);
* writeAttributes(atts);
* if (elementLevel == 1) {
* forceNSDecls();
* }
* writeNSDecls();
* write('>');
* super.startElement(uri, localName, qName, atts);
* }
*
*
* In order to get Orbeon to pump these SAX events into our XMLWriter ContentHandler, we use piped input- and outputstreams * We will therefore need to implement a separate thread that will pump these events from the source (Orbeon) to the sink (XMLWriter). * This is pretty standard fare for Java pipes, and is well documented elsewhere.
* *This abstract class extends the Orbeon SimpleProcessor class: Any custom processors that need a handle from the inputstream * can simply extend our StreamingSimpleProcessorBase and implement the generateStreamingData(PipelineContext context, ContentHandler contentHandler, InputStream pipedInputStream) * API, instead of SimpleProcessor's generateData(PipelineContext context, ContentHandler contentHandler)
* *Please note that because of how Orbeon Forms uses reflection when invoking our custom processors, it will throw * an exception unless the class extending this class implements * generateData(PipelineContext context, ContentHandler contentHandler). * Forget to implement this method, and Orbeon will throw an exception complaining about not being able to find the * "generate" method for type "data". Adressing this issue is beyond the scope of this implementation, and we offer instead a workaround * (also known as a 'hack'). For all classes extending StreamingSimpleProcessorBase, please add this code snippet:
*
* public void generateData(PipelineContext context, ContentHandler contentHandler) throws Throwable{
* super.generateData(context, contentHandler);
* }
*
*
* @author Henrik Pettersen, ©Sparkling Ideas, 2008
*/
public abstract class StreamingSimpleProcessorBase extends SimpleProcessor{
private static Logger LOGGER = Logger.getLogger(StreamingSimpleProcessorBase.class);
private static final int PUMPER_THREAD_HEARTBEAT_INTERVALL_IN_MS = 500;
private PipedInputStream pipedInputStream;
/**
* Our thread pumping SAX events from Orbeon to our InputStream, does not extend SimpleProccessor.
* We therefore need a callback routine from the thread to this SimpleProcessor class, in order
* to tell Orbeon to start sourcing SAX events for us
* @param context From the Orbeon API
* @param xmlWriter See above class level documentation
* @param input The name of the Orbeon input source. See Orbeon documentation for generateData(PipelineContext context, ContentHandler contentHandler)
* @throws IOException If the stream is broken
*/
public void pumpSaxEvents(PipelineContext context, XMLWriter xmlWriter, String input) throws IOException{
readInputAsSAX(context, input, xmlWriter);
}
/**
* We would have liked to make this class 'final', but because of the issue with Orbeon and its use of reflection (See class level documentation)
* This method is called by Orbeon when the custom processor is referenced in our XPL, see Orbeon Forms documentation for more detials.
* This method
* Reads the SAX events from Orbeon (source), and sticks them into an inputstream (sink) * Once Orbeon has pushed all the events onto our inputstream we wait until the other thread * is done parsing the inputstream, as closing the stream too early may cause a 'Broken Pipe' exception. * The other thread, reading from the inputstream, must signal that it is done, * by calling the finished() method of this class. * All thread management is handled by generateData(PipelineContext context, ContentHandler contentHandler) * in the surrounding class
* *Any exceptions thrown is stored in a member variable, which must be checked for non-null after * the pumping is complete. See generateData(PipelineContext context, ContentHandler contentHandler) * in the surrounding class
*/ public void run(){ try { //Callback to the SteamingSimpleProcessorBase subclass processor.pumpSaxEvents(context, xmlWriter, input); outputStreamWriter.close(); //hang around until the other thread calls the finished() method. while (!finished) try { Thread.sleep(PUMPER_THREAD_HEARTBEAT_INTERVALL_IN_MS); } catch (InterruptedException e) {}; } catch (Throwable t){ //Store the exception, and rethrow in the other thread if non-null throwable = t; }finally{ try { xmlWriter.flush(); outputStreamWriter.close(); } catch (IOException e) { //Do nothing } } } /** * @return Any exceptions thrown whilst pumping the SAX events */ public Throwable getThrowable() { return throwable; } /** * Once you are done reading from the inputstream, you must call this method * in order for this thread to terminate. */ public void finished() { this.finished = true; } } }