Modern server machines have multiple CPUs, each with multiple cores. Utilizing all these cores requires either running multiple processes on the same machine or writing programs that use multiple threads.
Since many aspects of a machine translation system (training, tuning, using) lend themselves very easily to parallel processing, Moses increasingly uses multi-threading in its components. At this point, the following components allow for parallel execution when the switch "--threads NUM" is added with an appropriate maximum number of threads executed at the same time:
moses
mert
extract-rules
Multi-threading in Moses is based on the C++ Boost libraries, and two Moses helper libraries that make the type of multi-threading that is typical for Moses more convenient: ThreadPool and OutputCollector.
We will explain the implementation of multi-threaded processing on hand of a simple example.
The part of the program that is to be run in parallel threads is called a task, and it needs to be placed into a class of its own.
class ExampleTask : public Moses::Task
{
public:
ExampleTask() {}
~ExampleTask() {}
void Run() {
std::cout << "Hello World." << endl;
}
}
Such a class now allows to be instantiated and run:
ExampleTask *task = new ExampleTask() new->Run(); delete(new);
This will print "Hello World.", and is otherwise not very exciting.
Let's make the task a bit more interesting. Our new tasks waits for a random amount of time and then prints out a message:
ExampleTask(string message):m_message(message) {}
void Run() {
// length of pause
int r = rand()%10;
// pause
int j = 0;
for(int i=0; i<1e8*r; i++) { j+=i; }
// write message (and length of pause)
std::cout << m_message << " (" << r << ")" << endl;
}
We can now create multiple instances of this task, and execute each:
// set up tasks
srand(time(NULL));
ExampleTask *task0 = new ExampleTask("zero");
ExampleTask *task1 = new ExampleTask("one");
ExampleTask *task2 = new ExampleTask("two");
// serial execution
task0->Run();
task1->Run();
task2->Run();
This will print out three lines (the random numbers in parenthesis will vary):
zero (2) one (4) two (5)
Okay, where is the multi-threading? Here it comes.
Instead of simply running one of the tasks after the other, we assign them to a thread pool. Once assigned, they are spawned off to a thread and will be executed in parallel to the running main process.
// set up thread pool int thread_count = 10; Moses::ThreadPool pool(thread_count); // submit tasks pool.Submit(task0); pool.Submit(task1); pool.Submit(task2); // wait for all threads to finish pool.Stop(true);
That's all too easy to be true, right? Yes, it is.
Since the three threads are running in parallel, there is no telling when they print out their message. Not only could the lines be printed in a different order then the tasks were scheduled, the threads may even write all over each other.
This is the catch with multi-threading: any interaction with non-local data structures must be handled very carefully. Ideally, threads only change local data (defined in the class), and once they are done (after pool.Stop(true)), results can be read out. This is in fact what happens in multi-threaded mert.
In our case, as in the decoder, we want to output text line by line (the decoder outputs translation, and possibly additional information such as n-best lists).
The Moses code offers the class OutputCollector to buffer up the output until it is safe to print out. In the simplest case, it prints to STDOUT, but it can also write to a file, and indeed it offers both regular output (default STDOUT) and debugging output (default STDERR), which both can be redirected to different files.
Moses::OutputCollector* outputCollector = new Moses::OutputCollector();
A task can then send its output to the output collector with the function Write, for example:
m_collector->Write(id, "Hello World!");
The id is the sequential number of the sentence, starting at 0. This helps the output collector to keep track of what can be written out and what needs to be buffered. The output collector will not write output for sentence 1, if it has not yet received output for sentence 0.
By default, the Task objects are deleted after execution. However, you may want to keep the objects around. This happens for instance in mert, where each Task finds an optimized weight setting, which is to be processed afterwards. In this case, you have to add the following lines to your Task definition:
virtual bool DeleteAfterExecution() {
return false;
}
By default, when a thread is submitted to the ThreadPool by calling its Submit() function, it is added to an internal queue, and the main process immediately resumes. That means, if a million threads are scheduled, the thread queue is filled with a million instances of the Task, which may consume a lot of memory.
If you want to restrict the number of threads in the queue, you can call, say, pool.SetQueueLimit(1000) to limit it to 1000 queued Task instances. When the queue is full, Submit() blocks.
Below now the complete example.
Note:
m_id (a sequential number starting at 0), and a pointer to the output collector m_collector.
#ifdef WITH_THREADS .. #else .. #endif)
output-file.txt (lines 43-45) instead of STDOUT.
01: #include <iostream>
02: #include <fstream>
03: #include <ostream>
04: #include <cstdlib>
05: #include <sstream>
06: #include "ThreadPool.h"
07: #include "OutputCollector.h"
08:
09: using namespace std;
10:
11: class ExampleTask : public Moses::Task
12: {
13: private:
14: unsigned int m_id;
15: string m_message;
16: Moses::OutputCollector* m_collector;
17: public:
18: ExampleTask(unsigned int id, string message, Moses::OutputCollector* collector):
19: m_id(id),
20: m_message(message),
21: m_collector(collector) {}
22:
23: ~ExampleTask() {}
24:
25: void Run() {
26: // length of pause
27: int r = rand()%10;
28:
29: // pause
30: int j = 0;
31: for(int i=0; i<1e8*r; i++) { j+=i; }
32:
33: // write message (and length of pause)
34: ostringstream out;
35: out << m_message << " (" << r << ")" << endl;
36: m_collector->Write(m_id, out.str());
37: }
38: };
39:
40: int main ()
41: {
42: // output into file
43: string outfile = "output-file.txt";
44: std::ofstream *outputStream = new ofstream(outfile.c_str());
45: Moses::OutputCollector* outputCollector = new Moses::OutputCollector(outputStream);
46:
47: // set up tasks
48: srand(time(NULL));
49: ExampleTask *task0 = new ExampleTask(0,"zero",outputCollector);
50: ExampleTask *task1 = new ExampleTask(1,"one",outputCollector);
51: ExampleTask *task2 = new ExampleTask(2,"two",outputCollector);
52:
53: #ifdef WITH_THREADS
54: // set up thread pool
55: int thread_count = 10;
56: Moses::ThreadPool pool(thread_count);
57:
58: // submit tasks
59: pool.Submit(task0);
60: pool.Submit(task1);
61: pool.Submit(task2);
62:
63: // wait for all threads to finish
64: pool.Stop(true);
65: #else
66: // fallback: serial execution
67: task0->Run();
68: task1->Run();
69: task2->Run();
70: #endif
71: }
To compile this, you need to copy ThreadPool.h, ThreadPool.cpp, and OutputCollector.h into you code directory or add paths so that they point to the moses/src directory and compile as follows:
g++ -c ThreadPool.cpp -DWITH_THREADS -DBOOST_HAS_PTHREADS g++ -c test.cpp -DWITH_THREADS -DBOOST_HAS_PTHREADS g++ -o test test.o ThreadPool.o -pthread -lboost_thread-mt
Make sure that the Boost libraries are in you compile paths.
When you run this example you will notice that, whatever the lengths of the pauses, the output always appears in the correct order (i.e. zero, one, two).