The purpose of this repo is to facilitate multi-threaded processing of mongo
collections. The parent class ThreadedDocumentProcessor.py
should not be changed. This class will handle threading, exceptions, errors, progress messages, and document iteration.
python3 -m pip install --user pymongo
- Update the
query
variable inmain()
ofprocessDocuments.py
to specify a query on thecollection
- Update the
processDocument()
method to specify what the program does to each document in the collection- If you return anything from the function, it must be a python
dictionary
- This dictionary will be formatted into
json
and written to theoutput.json
file
- If you return anything from the function, it must be a python
python3 processDocuments.py <collection_name> <number_of_threads>
- This file is automatically created when the program is run if any exceptions are caught
- This is the log file for any
exceptions
that arise during document processing
- This file is automatically created when the program is run
- This is where the program will write
json
output, if any
- Utility functions for
ThreadedDocumentProcessor.py
- Again, this class should not be changed.
- This class performs the following
- Makes a connection to
mongodb
. - Gets a
cursor
to thecollection
of interest, with aquery
specified by the user. - Creates
threads
and breaks up the input space (thedocuments
of the specifiedcollection
) among the threads. - Calls the user-specified
processDocument()
method from eachthread
on eachdocument
assigned to thatthread
. - Handles any exceptions that arise and prints error messages to the
errors.log
file. - Logs progress messages to the console.
- Writes any user-specified data to the
output.json
file.
- Makes a connection to
- Takes command-line input (see Running the Program section) and passes it to the program constructor
- Defines a
processDocument()
method to be filled out by the user
- If you need to stop the program and restart it for any reason, you can read the
document_number
as the last integer printed in each thread's progress output, anddocuments_processed_by_this_thread
as the numerator of the fraction in the same progress message, and pass those in explicitly inThreadedDocumentProcessor.py
as two additional arguments, in that order, in therun()
method. - It would look like this:
thread = Thread(target=ThreadedDocumentProcessor.iterateDocuments, args=(self, i, <document_number>, <documents_processed_by_this_thread>))
- This will cause the script to restart where it left off.
- Note: this happens automatically if an exception is thrown, just not if you kill the script with
ctrl+c