package com.telegraph.online.orbeon; import java.io.IOException; import java.io.InputStream; import java.io.OutputStreamWriter; import java.io.PipedInputStream; import java.io.PipedOutputStream; import org.apache.log4j.Logger; import org.orbeon.oxf.pipeline.api.PipelineContext; import org.orbeon.oxf.processor.SimpleProcessor; import org.xml.sax.ContentHandler; import com.megginson.sax.XMLWriter; /** *

This class was written in response to an issue related to the Orbeon Forms' * API for custom java processors * The issue was originally raised on the Orbeon Forms mailing list:
* "How can I get a handle to an InputStream my SimpleProcessor class extension?" *

* *

Here is a rendition of the above problem statement:

* *

"I am writing a custom processor, and I have a question about the * Orbeon SimpleProcessor API I'm hoping someone can help me out with: * How can I get a handle to an InputStream my SimpleProcessor class * extension?

* *

My processor has two inputs coming in: *

  1. The processor configuration xml
  2. *
  3. An XML document to be imported into my content management system
  4. *
* *

#2 can potentially be very big, and so I have written an import * routine which handles it all as an inputstream / sax events. Here is * the signature for my import action method:

*
 *    public void doImport(InputStream xmlInputStream) throws ImporterException{  
 *        ...
 *    }
 * 
*

I then have a customer processor, extending Orbeon's SimpleProcessor * class. Now, I know how to generate SAX events from an * org.xml.sax.ContentHandler in my SimpleProcessor extension:

*

 *  public void generateData(PipelineContext context, ContentHandlercontentHandler)
 *         ...
 *         ContentHandler myContentHandler = ...
 *         readInputAsSAX(context, INPUT_DATA, myContentHandler);
 *        ...
 * }
 * 
*

but I'm not sure how to get a handle to a java.io.InputStream for my XML input. Can anyone help?"

* *

This class provides the solution for the above problem, converting all events sent to the Orbeon contenthandler, * into a bytestream / inputstream. The conversion to an inputstream is itself handled by a 3rd party library called * XMLWriter from David Megginson, * where as java io pipes are deployed for pumping events from the contenthandler into the inputstream.

* *

The idea behind XMLWriter is fairly straightforward. Here is how we initialise it:

*
 *     pipedInputStream = new PipedInputStream();
 *     PipedOutputStream pipedOutputStream = new PipedOutputStream(pipedInputStream);
 *     OutputStreamWriter outputStreamWriter = new OutputStreamWriter(pipedOutputStream);    	
 *     XMLWriter xmlWriter = new XMLWriter(outputStreamWriter);
 * 
* *

If we look at the source for XMLWriter, we can see how it achieves the conversion from contentHandler events to an inputstream:

*
 *     public void startElement (String uri, String localName, String qName, Attributes atts)throws SAXException
 *        {
 *            elementLevel++;
 *            nsSupport.pushContext();
 *            write('<');
 *            writeName(uri, localName, qName, true);
 *            writeAttributes(atts);
 *            if (elementLevel == 1) {
 *                forceNSDecls();
 *            }
 *            writeNSDecls();
 *            write('>');
 *            super.startElement(uri, localName, qName, atts);
 *        }
 * 
* *

In order to get Orbeon to pump these SAX events into our XMLWriter ContentHandler, we use piped input- and outputstreams * We will therefore need to implement a separate thread that will pump these events from the source (Orbeon) to the sink (XMLWriter). * This is pretty standard fare for Java pipes, and is well documented elsewhere.

* *

This abstract class extends the Orbeon SimpleProcessor class: Any custom processors that need a handle from the inputstream * can simply extend our StreamingSimpleProcessorBase and implement the generateStreamingData(PipelineContext context, ContentHandler contentHandler, InputStream pipedInputStream) * API, instead of SimpleProcessor's generateData(PipelineContext context, ContentHandler contentHandler)

* *

Important

*

Please note that because of how Orbeon Forms uses reflection when invoking our custom processors, it will throw * an exception unless the class extending this class implements * generateData(PipelineContext context, ContentHandler contentHandler). * Forget to implement this method, and Orbeon will throw an exception complaining about not being able to find the * "generate" method for type "data". Adressing this issue is beyond the scope of this implementation, and we offer instead a workaround * (also known as a 'hack'). For all classes extending StreamingSimpleProcessorBase, please add this code snippet:

*
 *     public void generateData(PipelineContext context, ContentHandler contentHandler) throws Throwable{
 *    	super.generateData(context, contentHandler);
 *   }    
 * 
* * @author Henrik Pettersen, ©Sparkling Ideas, 2008 */ public abstract class StreamingSimpleProcessorBase extends SimpleProcessor{ private static Logger LOGGER = Logger.getLogger(StreamingSimpleProcessorBase.class); private static final int PUMPER_THREAD_HEARTBEAT_INTERVALL_IN_MS = 500; private PipedInputStream pipedInputStream; /** * Our thread pumping SAX events from Orbeon to our InputStream, does not extend SimpleProccessor. * We therefore need a callback routine from the thread to this SimpleProcessor class, in order * to tell Orbeon to start sourcing SAX events for us * @param context From the Orbeon API * @param xmlWriter See above class level documentation * @param input The name of the Orbeon input source. See Orbeon documentation for generateData(PipelineContext context, ContentHandler contentHandler) * @throws IOException If the stream is broken */ public void pumpSaxEvents(PipelineContext context, XMLWriter xmlWriter, String input) throws IOException{ readInputAsSAX(context, input, xmlWriter); } /** * We would have liked to make this class 'final', but because of the issue with Orbeon and its use of reflection (See class level documentation) * This method is called by Orbeon when the custom processor is referenced in our XPL, see Orbeon Forms documentation for more detials. * This method *
  1. Creates a new pipedinputstream
  2. *
  3. Initialises the pumper thread
  4. *
  5. Starts the pumperthread
  6. *
  7. Calls the generateStreamingData of the subclass
  8. *
  9. Closes the pumperthread
* @param context See Orbeon Forms documentation * @param contentHandler See Orbeon Forms documentation * @throws Throwable Generous throws clause for maximum flexibility */ public void generateData(PipelineContext context, ContentHandler contentHandler) throws Throwable{ pipedInputStream = new PipedInputStream(); PumperThread pumperThread = new PumperThread(this, pipedInputStream, context, getStreamingInputName()); try { pumperThread.start(); generateStreamingData(context, contentHandler, pipedInputStream); } finally { pumperThread.finished(); if (pumperThread.getThrowable() != null){ throw pumperThread.getThrowable(); } } } /** * What is the name of the input data, from the XPL? See documentation of addInputInfo method in SimpleProcessor class * @return The name of the inputsource we would like to pump into our inputstream */ protected abstract String getStreamingInputName(); /** * See class level comments above. * @param context See Orbeon Forms documentation * @param contentHandler See Orbeon Forms documentation * @param pipedInputStream All Orbeon Forms SAX events will be pumped into this inputstream * @throws Throwable Generous throws clause */ public abstract void generateStreamingData(PipelineContext context, ContentHandler contentHandler, InputStream pipedInputStream) throws Throwable; /** * A thread-based implemenation of Java io pipes. See class level comments of surrounding class for more detail, * or google for 'pipedinputstream thread' * @author henrikpettersen */ private class PumperThread extends Thread { private StreamingSimpleProcessorBase processor; private PipelineContext context; private XMLWriter xmlWriter; private OutputStreamWriter outputStreamWriter; private Throwable throwable; private boolean finished; private String input; /** * The constructor stores all the parameters as member variables, * sets up the io pipes, and configures the XMLWriter * @param processor An instance of a StreaminSimpleProcessorBase subclass, used for callback routine * (see pumpSaxEvents(PipelineContext context, XMLWriter xmlWriter, String input) in surrounding class) * @param pipedInputStream The stream which we will be writing SAX events to * @param context From Orbeon Forms. See generateData(PipelineContext context, ContentHandler contentHandler) * in the Orbeon Forms documentation * @throws IOException For problems with setting up the io pipes */ public PumperThread(StreamingSimpleProcessorBase processor, PipedInputStream pipedInputStream, PipelineContext context, String input) throws IOException { PipedOutputStream pipedOutputStream = new PipedOutputStream(pipedInputStream); OutputStreamWriter outputStreamWriter = new OutputStreamWriter(pipedOutputStream); XMLWriter xmlWriter = new XMLWriter(outputStreamWriter); this.processor = processor; this.context = context; this.xmlWriter = xmlWriter; this.outputStreamWriter = outputStreamWriter; this.input = input; } /** *

Reads the SAX events from Orbeon (source), and sticks them into an inputstream (sink) * Once Orbeon has pushed all the events onto our inputstream we wait until the other thread * is done parsing the inputstream, as closing the stream too early may cause a 'Broken Pipe' exception. * The other thread, reading from the inputstream, must signal that it is done, * by calling the finished() method of this class. * All thread management is handled by generateData(PipelineContext context, ContentHandler contentHandler) * in the surrounding class

* *

Any exceptions thrown is stored in a member variable, which must be checked for non-null after * the pumping is complete. See generateData(PipelineContext context, ContentHandler contentHandler) * in the surrounding class

*/ public void run(){ try { //Callback to the SteamingSimpleProcessorBase subclass processor.pumpSaxEvents(context, xmlWriter, input); outputStreamWriter.close(); //hang around until the other thread calls the finished() method. while (!finished) try { Thread.sleep(PUMPER_THREAD_HEARTBEAT_INTERVALL_IN_MS); } catch (InterruptedException e) {}; } catch (Throwable t){ //Store the exception, and rethrow in the other thread if non-null throwable = t; }finally{ try { xmlWriter.flush(); outputStreamWriter.close(); } catch (IOException e) { //Do nothing } } } /** * @return Any exceptions thrown whilst pumping the SAX events */ public Throwable getThrowable() { return throwable; } /** * Once you are done reading from the inputstream, you must call this method * in order for this thread to terminate. */ public void finished() { this.finished = true; } } }