Modern server machines have multiple CPUs, each with multiple cores. Utilizing all these cores requires either running multiple processes on the same machine or writing programs that use multiple threads.
Since many aspects of a machine translation system (training, tuning, using) lend themselves very easily to parallel processing, Moses increasingly uses multi-threading in its components. At this point, the following components allow for parallel execution when the switch "--threads NUM" is added with an appropriate maximum number of threads executed at the same time:
moses
mert
extract-rules
Multi-threading in Moses is based on the C++ Boost libraries, and two Moses helper libraries that make the type of multi-threading that is typical for Moses more convenient: ThreadPool
and OutputCollector
.
We will explain the implementation of multi-threaded processing on hand of a simple example.
The part of the program that is to be run in parallel threads is called a task, and it needs to be placed into a class of its own.
class ExampleTask : public Moses::Task { public: ExampleTask() {} ~ExampleTask() {} void Run() { std::cout << "Hello World." << endl; } }
Such a class now allows to be instantiated and run:
ExampleTask *task = new ExampleTask() new->Run(); delete(new);
This will print "Hello World."
, and is otherwise not very exciting.
Let's make the task a bit more interesting. Our new tasks waits for a random amount of time and then prints out a message:
ExampleTask(string message):m_message(message) {} void Run() { // length of pause int r = rand()%10; // pause int j = 0; for(int i=0; i<1e8*r; i++) { j+=i; } // write message (and length of pause) std::cout << m_message << " (" << r << ")" << endl; }
We can now create multiple instances of this task, and execute each:
// set up tasks srand(time(NULL)); ExampleTask *task0 = new ExampleTask("zero"); ExampleTask *task1 = new ExampleTask("one"); ExampleTask *task2 = new ExampleTask("two"); // serial execution task0->Run(); task1->Run(); task2->Run();
This will print out three lines (the random numbers in parenthesis will vary):
zero (2) one (4) two (5)
Okay, where is the multi-threading? Here it comes.
Instead of simply running one of the tasks after the other, we assign them to a thread pool. Once assigned, they are spawned off to a thread and will be executed in parallel to the running main process.
// set up thread pool int thread_count = 10; Moses::ThreadPool pool(thread_count); // submit tasks pool.Submit(task0); pool.Submit(task1); pool.Submit(task2); // wait for all threads to finish pool.Stop(true);
That's all too easy to be true, right? Yes, it is.
Since the three threads are running in parallel, there is no telling when they print out their message. Not only could the lines be printed in a different order then the tasks were scheduled, the threads may even write all over each other.
This is the catch with multi-threading: any interaction with non-local data structures must be handled very carefully. Ideally, threads only change local data (defined in the class), and once they are done (after pool.Stop(true)
), results can be read out. This is in fact what happens in multi-threaded mert
.
In our case, as in the decoder, we want to output text line by line (the decoder outputs translation, and possibly additional information such as n-best lists).
The Moses code offers the class OutputCollector
to buffer up the output until it is safe to print out. In the simplest case, it prints to STDOUT
, but it can also write to a file, and indeed it offers both regular output (default STDOUT
) and debugging output (default STDERR
), which both can be redirected to different files.
Moses::OutputCollector* outputCollector = new Moses::OutputCollector();
A task can then send its output to the output collector with the function Write
, for example:
m_collector->Write(id, "Hello World!");
The id
is the sequential number of the sentence, starting at 0. This helps the output collector to keep track of what can be written out and what needs to be buffered. The output collector will not write output for sentence 1, if it has not yet received output for sentence 0.
By default, the Task
objects are deleted after execution. However, you may want to keep the objects around. This happens for instance in mert
, where each Task
finds an optimized weight setting, which is to be processed afterwards. In this case, you have to add the following lines to your Task
definition:
virtual bool DeleteAfterExecution() { return false; }
By default, when a thread is submitted to the ThreadPool
by calling its Submit()
function, it is added to an internal queue, and the main process immediately resumes. That means, if a million threads are scheduled, the thread queue is filled with a million instances of the Task
, which may consume a lot of memory.
If you want to restrict the number of threads in the queue, you can call, say, pool.SetQueueLimit(1000)
to limit it to 1000 queued Task
instances. When the queue is full, Submit()
blocks.
Below now the complete example.
Note:
m_id
(a sequential number starting at 0), and a pointer to the output collector m_collector
.
#ifdef WITH_THREADS
.. #else
.. #endif
)
output-file.txt
(lines 43-45) instead of STDOUT
.
01: #include <iostream> 02: #include <fstream> 03: #include <ostream> 04: #include <cstdlib> 05: #include <sstream> 06: #include "ThreadPool.h" 07: #include "OutputCollector.h" 08: 09: using namespace std; 10: 11: class ExampleTask : public Moses::Task 12: { 13: private: 14: unsigned int m_id; 15: string m_message; 16: Moses::OutputCollector* m_collector; 17: public: 18: ExampleTask(unsigned int id, string message, Moses::OutputCollector* collector): 19: m_id(id), 20: m_message(message), 21: m_collector(collector) {} 22: 23: ~ExampleTask() {} 24: 25: void Run() { 26: // length of pause 27: int r = rand()%10; 28: 29: // pause 30: int j = 0; 31: for(int i=0; i<1e8*r; i++) { j+=i; } 32: 33: // write message (and length of pause) 34: ostringstream out; 35: out << m_message << " (" << r << ")" << endl; 36: m_collector->Write(m_id, out.str()); 37: } 38: }; 39: 40: int main () 41: { 42: // output into file 43: string outfile = "output-file.txt"; 44: std::ofstream *outputStream = new ofstream(outfile.c_str()); 45: Moses::OutputCollector* outputCollector = new Moses::OutputCollector(outputStream); 46: 47: // set up tasks 48: srand(time(NULL)); 49: ExampleTask *task0 = new ExampleTask(0,"zero",outputCollector); 50: ExampleTask *task1 = new ExampleTask(1,"one",outputCollector); 51: ExampleTask *task2 = new ExampleTask(2,"two",outputCollector); 52: 53: #ifdef WITH_THREADS 54: // set up thread pool 55: int thread_count = 10; 56: Moses::ThreadPool pool(thread_count); 57: 58: // submit tasks 59: pool.Submit(task0); 60: pool.Submit(task1); 61: pool.Submit(task2); 62: 63: // wait for all threads to finish 64: pool.Stop(true); 65: #else 66: // fallback: serial execution 67: task0->Run(); 68: task1->Run(); 69: task2->Run(); 70: #endif 71: }
To compile this, you need to copy ThreadPool.h
, ThreadPool.cpp
, and OutputCollector.h
into you code directory or add paths so that they point to the moses/src
directory and compile as follows:
g++ -c ThreadPool.cpp -DWITH_THREADS -DBOOST_HAS_PTHREADS g++ -c test.cpp -DWITH_THREADS -DBOOST_HAS_PTHREADS g++ -o test test.o ThreadPool.o -pthread -lboost_thread-mt
Make sure that the Boost libraries are in you compile paths.
When you run this example you will notice that, whatever the lengths of the pauses, the output always appears in the correct order (i.e. zero, one, two).