| Sesat > Docs + Support > Moving from FAST to Solr review > documentprocessor |
The documentprocessor is a pipeline framework written by Morten Tvenning. The reason for doing this is that when moving from FAST to solr there was a lot of value in moving the existing FAST pipeline steps over to the new Solr platform.
Most parts of the documentprocessor is the same as the FAST pipeline frameworks, but I needed to do a few workarounds and so there is a converter script.
There is a lot less paths and configuration in this implementation of the pipeline so it should be easier to maintain overall under Solr than under FAST.
Using multiple processors is not yet fully supported (where is no document processing distributor). But you can alternate in the code between two if you want to.
Table of contents
Checkout at the code from our Subversion repository located here.
svn co http://sesat.no/svn/sesat-documentprocessor/trunk/ documentprocessor
This code is open sourced under the LGPv3 license.
Keeping unicode throughout the pipeline is a priority and you will get errors if there is str() casts in the pipeline steps. String casts are ok as long as they are not written back to the document as str. Just use unicode for less headache.
It is just as easy to just rewrite the pipeline-config from scratch. But given that you dont want to do that:
There is a utility to rewrite the logging and imports of FAST pipeline steps to work with documentprocessor
cd documentprocessor/processorcode python FASTtoSSP-utils/fastpipeline-code-fixer/pipelinecodefixer.py <your-pipeline-step>
This will output what is rewritten in the pipeline step, overwrite <your-pipeline-step> and add <your-pipeline-step>._FAST_BACKUP which was the original pipeline step.
In documentprocessor/code/processors there is a ProcessorExample.py that should more or less show everything that is allowed to do.
All interaction with the document is the same as in FAST pipeline
Fetching configfiles is a bit different, but the cString option still works nicely (you get a normal unicode representation of the file back when asking for fileAsString)
<insert more info here>
documentprocessor runs as a daemon and the executable is ProcessorDaemon.py
python ProcessorDaemon.py
usage ProcessorDaemon: daemon process for a given set of pipelines
needs a solrserver adress, solr schema.xml and pipelien configuration file
processes documents with one or more pipelines and sets up a queue of documents to be added to solr
options: (required)
-S : solr http address
-C : solr schema path
-P : pipeline configuration file
-B : the processors (pipeline step) directory path
options: (optional)
-s : a python settings file that contains the configuration eg. django type settings.py, normal commandline args
: will overwrite the settings in the file if also set
-c : configfile paths files read by the pipelinesteps (default is config/), separated by ;
-x : add xmlrpc support, requires -H and -p
-H : hostname of documentprocessor (xmlrpc)
-p : port to the documentprocessor server (xmlrpc)
-d : turn off daemonize, will also ignore the logfile thas is given and log to std out
-b : batch size for commits to solr
-o : ontheflyadds, adding all documents to solr at once not waiting for the batchsize and flush documents
-l : loglevel 1-5 where 5 is debug and 1 is fatal only
-L : logfile to log to
For testing purposes -d is efficient, it forces the pipeline to run in commandline mode. Logging everything to stdout and not daemonizing.
Example run as daemon:
python ProcessorDaemon.py -S http://127.0.0.1:8080/solr -C ../../solr-testing/yellow-solr/conf/schema.xml -P yellow-pipelineconf/yellow-documentprocessor-config.xml -c yellow-pipelineconf/ConfigFiles/ -H 127.0.0.1 -p 8500 -l 5 -L test1.log -b 10000 -x
Example run with settings file: (also discussed below)
python ProcessorDaemon.py -s settingsfiles/yellowsettings.py
The settingsfile is optional but efficient and possible to store / move.
$ cat settingsfiles/mysettings.py xmlrpchost = "solr1.someserver.com" xmlrpcport = 40001 usexmlrpc = True pipelineconfig = "<path>/solr-documentprocessor/white-pipelineconf/white-documentprocessor-config.xml" solrschema = "<solrpath>/solr-white/solr/conf/schema.xml" configdirs = ["<path>/solr-documentprocessor/white-pipelineconf/ConfigFiles/"] processorpath = ["<path>/solr-documentprocessor/documentprocessor/processors"] solrhost = "http://solr1.someserver.com:8000/solr" daemonize = True batchsize = 10000 ontheflyadds = False loglevel = 4 logfile = "<path>/solr-documentprocessor/logs/procserver_white.log"
The settings file is read in first (before the other commandline parameters) which means that you can overwrite anything in the settings file one the commandline at startup.
So running "python ProcessorDaemon.py -s settingsfiles/mysettings.py -l 5 -d" will log debug and not daemonize or use the logfile set in mysettings.py
Since the pipeline keeps a queue of documents that it might not flush and commit to solr there is a command line script to take down the server cleanly.
usage:
python shutdownpipeline.py usage shutdownpipeline.py required options: -s : settings file for the pipeline
It takes the same settings file that was used to start the processor for taking it down. It only sends the shutdown signal over the xmlrpc interface and waits for the documentprocessor to shut itself down.
These are the fucntions registered to the xmlrpc interface:
| function | what it does | paramters | comments |
|---|---|---|---|
| hasPipeline | checks if the pipeline exists | str pipeline name | |
| addDocumentByDict | dicts are easily serializable default add function |
dict documentasdict str pipeline name |
|
| isAlive | answers true if the xmlrpc interface works | ||
| addDocument | if you import Document.Document from the documentprocssor code a instance of it can be sent to the xmlrpc interface through this function |
Document.Document document str pipeline name optional |
|
| deleteDocumentById | delete a document from the solr index | str id | |
| addDocumentByTupleList | add by tuple list, list of [(field, value)] |
list list-of-tuples str pipeline name |
|
| flush | tells the pipeline to add all queued documents to solr (and commit them if manualcommit is not set) | ||
| commit | tells the pipeline to tell solr to commit the changes (start indexing) | this needs to be done after all documents are fed if the manualcommit i sted (default for the xmlrpc interface) |
|
| shutdown | sends the flush and commit signs, waits for that to clean up, and stops itself. | used to keep everything intact when shutting down |
The document processors are plug in scripts run on the document.
The default document processor looks like this :
"""
logging statements should be written with the self.getName() as the first argument always!
self._logger.error("%s: some message" % self.getName())
"""
from DocumentProcessor import ProcessorStatus
import Processor
class AttributeAdd(Processor.Processor):
def __init__(self, name=""):
Processor.Processor.__init__(self, name)
self.name = name
def ConfigurationChanged(self, arguments):
self._logger = self.getContainer().getLogger()
self._logger.info("%s: ConfigurationChanged OK" % self.getName())
self._param1 = self.getParameter("some-value-in-processor-conf")
def Process(self, uri, document):
self._logger.info("%s: getting this doc id in: %s" % uri)
if document.HasValue(self._param1):
document.Set("test1", "has param :%s" % self._param1)
return ProcessorStatus.OK
return ProcessorStatus.OK_NoChange
The document is of type Document.Document and supports a number of functions (most of which existed in the FAST pipeline)
The inferface for Document.Document is better documented by the pydoc following the code.
Each Processor inherits from Processor which is "inside" a container, and the contanier has logger information and file information
Normally only the getFileAsString and getLogger should be used. The others are only availiable and might contain bugs when called from the Processor.
<processors>
<processor name="WhitePhrasedNameMaker" >
<load module="processors.PhrasedNameMaker" class="PhrasedNameMaker" />
<description>
find the different permutations of phrased name searches
</description>
<config>
<param name="fornavnfield" value= "wpfornavn" type="str" />
<param name="mellomnavnfield" value = "wpmellomnavn" type="str" />
<param name="etternavnfield" value="wpetternavn" type="str" />
<param name="allnamesfields" value="wpphrasednameslist" type="str" />
</config>
</processor>
<processor name="StringSplitter" type="general" hidden="0">
<load module="processors.StringSplitter" class="StringSplitter"/>
<config>
<param name="fields" value="mobiltelefon telefon" type="str"/>
<param name="separator" value=" " type="str"/>
</config>
<description><![CDATA[This processor takes in a list of fields, and splits the fields by the sep and sets the field to that list
]]></description>
<inputs>
</inputs>
</processor>
</processors>
<pipelines>
<pipeline name="white" default="0">
<description><![CDATA[white pipeline configuration ]]></description>
<priority>0</priority>
<processor name="WhitePhrasedNameMaker" />
<processor name="StringSplitter" />
</pipeline>
</pipelines>
</config>
Super small pipeline configuration.
The processors:
The pipeline:
In the client, python example
import xmlrpclib
server = "http://somehost"
port = 8500
xmlrpc = xmlrpclib.Server("%s:%s" % (server, port))
if xmlrpc.isAlive():
print "isAlive() returned True for xmlprc connection, connection good"
docdict = {'id':1, 'testfield':4032, 'testfield2':"mystring}
message = xmlrpc.addDocumentByDict(docdict, "some-pipeline")
print "%s returned this message: %s" % (docdict['id'], message)
Given a svn checkout of the documentprocessor folder or a tar packout the paths are relative to that as base.
The core files are located under documentprocessor/processorcode and their tests are located under tests/
The test coverage there should be about 90% and a few of the Classes are hard to test since they need a working solr instance to talk to. That is documented in The test files
The processors are located under documentprocessor/processors and documentprocessor/default_processors
The tests for the processors are located under processor_tests
When making a new processor testing it should be really simple. The processor tests follows a common template.
So assume that a new processor AttributeUpperCaser.py with processor class AttributeUpperCaser has been made and placed in the documentprocesor/processors/ folder. To test that, add a new file in the processor_tests/ and copy the content of one of the other processors into that.
cd processor-tests/ touch testAttributeUpperCaser.py cat testAttributeCopy.py >> testAttributeUpperCase.py
Or make the file and copy the content in your IDE of choice
Then open the testAttributeUpperCaser.py file and to a find replace for AttributeCopy --> AttrbitueUpperCaser
The test should then run an fail for all but two tests.
Then configure the Parameters that the AttributeUpperCaser.py gets as input. this is done with the pdnormal class variable. So it the AttributeUpperCaser has two self.GetParameter() calls for "input" and "output", put these two into the pdnormal dictionary and remove the others. Then almost all the tests should run, except for testProcessorProcessing and testProcessorProcessingValidateOutput.
pdnormal = {
"input": "myinputfield",
"output":"myoutputfield",
}
__getTestDocument returns a document to be parsed that should be processed, and __getTestDocumentNoOutput is used by the last test to check that the processor returns the right OK_NoChange flag.
Fill up the two documents with content.
def __getTestDocument(self):
return Document(self.id, {
'id': 1,
'myinputfield':"inputvalue",
})
def __getTestDocumentDoNothing(self):
return Document(self.id, {
'id': 1,
})
Then go into the testProcessorProcessingValidateOutput and add a self.failUnless for doc.HasValue("output")
Go to the commandline and do python testAttributeUpperCaser.py
This will test:
A few of the default_processor tests are bigger and a good starting point for implementing more complex tests.