How does theory translate into practice? In my other blog I discussed various approaches to concurrency, including ownership-based type systems. And now I’m faced with the task of applying these methods to a real-life software product, Code Co-op. Our clients asked us to add more concurrency to make one part of Code Co-op, the dispatcher, more responsive. The implementation language is plain old C++, so the methodology must be based more on strict discipline than on the help from the type system.
I would like to use message passing as much as possible but, for performance reasons, I often find myself forced to operate on shared state using locks. I will discuss various trade-offs between message passing and sharing. I invite the readers to share their opinions, or even come up with better implementations.
When writing this code I often thought how much easier it would be to do it in the D programming language, which supports immutable and shared types, not to mention garbage collection. However, D is strongly biased towards message passing, which is restricted to value types and immutable objects à la Erlang. My C++ implementation combines the passing of messages that are not necessarily value types and direct sharing of data structures between threads. I welcome discussion from the D community.
How the Dispatcher Works
The dispatcher is a program, part of Code Co-op, whose task is to distribute various deltas (we call them scripts) to local and remote members of version-controlled projects. Scripts materialize in a special directory and the dispatcher gets notifications every time something changes in that directory. It lists the directory and creates a container of file names. For each file, it reads its header to find out the intended recipients for that script. It looks up the recipients in its own database and maps them to destinations. A destination is a path or an email address. Paths may be local or network, e.g., \\computername\sharename
. For the sake of discussion, I’ll concentrate on network paths only.
When decoding the file header, the dispatcher creates two data structures for each script. One is the immutable description of the script called ScriptInfo
. It contains data that never change, like the file path, a list of recipients, etc. The other, called ScriptTicket
is mutable: it contains a pointer to the corresponding ScriptInfo
, a number of flags that mark its progress, and a list of copy requests–each request corresponding to a destination (here, a network path). Notice that C++ has no notion of immutability, so it’s up to the programmer to enforce it in the case of ScriptInfo
.
In a nutshell, the dispatcher has to iterate over ScriptTicket
s, iterate over each ticket’s copy requests, perform the copying and, if successful, physically stamp the corresponding file’s header to avoid sending multiple copies in the same direction. Only after all ScriptTicket
‘s copy requests have been successful, the corresponding script file is deleted from disk.
There is one big problem: script copying, especially across a network, might be very slow. We don’t want the dispatcher to be completely unresponsive during copying, so we need a separate worker thread to dispatch scripts asynchronously. Of course the main thread has to be able to communicate, back and forth, with the worker thread. Let’s have a look at the data structures that must be either shared or passed between threads.
Inter-Thread Communication Objects
ScriptInfo
, being immutable, can be safely shared between threads without any need for synchronization. ScriptTicket
on the other hand is mutable. Fortunately it’s a value type, so it may be safely passed between threads by value.
ScriptTicket
is not just a flat structure–it contains pointers. A data structure with pointers can be passed around by value as long as it is deep-copied. Not only pointers must be copied but also the objects they point to, recursively. However, deep copy is not always necessary for thread safety, as illustrated by ScriptTicket
.
The ticket holds a pointer to the corresponding ScriptInfo
but, since ScriptInfo
is immutable, there is no need to deep-copy it.
ScriptTicket
also contains a container of (mutable) copy requests. C++ Standard Library containers of value types are also value types, even though they are pointer-based. Their copy constructors and the assignment operators perform deep copy. (D might have a problem with this: objects with pointers to mutable data cannot be passed as inter-thread messages.)
There is one more twist: the ticket also contains a pointer to a shared monitor (an object that has its own lock and whose all public methods are “synchronized”–they all take a lock). This monitor (not shown in the picture) is used by the UI part of the dispatcher and must reflect all changes in the status of the scripts being processed. As long as the monitor is correctly implemented, it is safe to embed a (shallow) pointer to it inside the ScriptTicket
–an object that is passed “by value” between threads. Here we are stepping out of value semantics, but without sacrificing thread safety.
Note that ScriptTicket
is easily described in terms of ownership types. The owner of the ticket is always the current thread (owner::thread). The mutable container, _requests
, is owned by the ticket and, consequently, by the current thread. The ScriptInfo
inside the ticket is immutable (owner::immutable). The monitor used for UI communication is owned by “self” (owner::self). The latter two can be shared between threads.
Although script tickets are passed by value between threads, they are often passed by reference within threads–not only for performance reasons but also because the caller wants to see the changes made by the callee. In the ownership-based system, such references would be marked as lent and the system would ensure that no aliases to them were ever leaked to the outside world.
The Worker Thread
When working with threads, the most fundamental rule is to separate shared state from thread-local state. Shared state requires synchronization, local doesn’t. This is why I fell out with the OO active object pattern–in particular the Java Thread
object. Active objects are notorious for mixing shared and local state. Since the dispatcher was written before I had this epiphany, it used our library of active objects. Not any more!
Here’s how it works: The main thread spawns a worker thread, passing it a function and some data. The new thread calls this function and passes it the data. The types of data that may be passed to a worker thread are restricted. The list is pretty short:
- Value data that are passed by copying. Simple examples: an integer, a vector of values, etc. The important thing is that this data is not shared–each thread works on its own copy. To be exact, thread-safe values may safely contain (shallow) pointers to shared monitors and immutable objects. Example:
ScriptTicket
. - Monitors. Monitor objects are shared; they contain a lock, which is used to synchronize all their public methods. (In Java or D these methods would be
synchronized
). Examples:WorkQueue
, which I will describe shortly. Also, message queues are usually monitors. - Immutable objects–they can be shared without synchronization, since nobody ever modifies them. Example:
ThreadInfo
. - Unique data, à la
auto_ptr
. A unique object is guaranteed to be only visible in one thread at a time. The sender of this object automatically loses access to it, so unique objects don’t need synchronization.
That’s it! A vast majority of concurrency problems may be (efficiently) solved using these patterns. Some languages, like Erlang, are restricted to pure values (immutability being used internally as an optimization). The drawback of such restrictions is poor performance in the local case (e.g., when Erlang processes are just threads).
Going back to the dispatcher: The worker thread is spawned with two objects:
WorkQueue
, which is a monitor andTransportManager
, which is passed as a unique object and becomes local to the worker thread.
WorkQueue
contains shared data, which is protected by its lock. TransportManager
contains data local to the worker thread and doesn’t require any synchronization. Before the rewrite, both of these object were combined into a single active object, and extreme caution had to be used to avert the dangers of incidental sharing. In the current design, WorkQueue
has no access to TransportManager
so it’s impossible for the main thread to accidentally touch the not-to-be-shared data of the worker thread. You have no idea how this separation simplified the reasoning about and the coding of interactions between threads.
The worker thread function has access to both objects. In its idle state it is blocked waiting for a message from the main thread. The message queue is part of the WorkQueue
object. When a message arrives, the worker thread wakes up, picks whatever data it needs from WorkQueue
and calls the methods of TransportManager
. When it’s done, it loops back and waits for further messages.
Notice that, in the picture, threads are not associated with objects, as they would be in the active-object model. Instead objects exist between threads, when they are shared; or inside threads, when they are not.
The Message Passing Approach
In principle all communication between threads could all be implemented with message passing. I will describe how it could be done in this particular case and then consider other options.
Let’s start with a use case: a hundred scripts are dumped, one by one, into the dispatcher’s directory. In the naive approach, each time the main thread is notified it lists the directory, reads the header of each script from disk, creates script slips and passes them to the worker thread. In the worst case that would create and send about 5,000 bulky script tickets, 4900 of them unnecessary duplicates. That’s because each file drop creates a separate notification followed by a new complete directory listing. Files might be dropped quickly but their processing can take arbitrarily long.
The first optimization that comes to mind is to send whole arrays of tickets at once. If such an array could be passed as a unique object, the copying could be avoided (and the message queue lock would be used once per array). The worst case timing would involve 100 such messages (one per directory change).
This is still very wasteful considering that we’d be reading and interpreting 5,000 file headers, and disk I/O is very expensive. Also, in the current design, the dispatcher’s main thread performs relatively cheap local copying to recipients on the same machine. Without additional checks, each file would be delivered locally, on average, 50 times (the delivery is idempotent, but not free). That’s clearly an overkill, considering that we do store a list of scripts that are already being processed. There’s no need to process them again and again.
So the second optimization is to take the raw directory listing and compare it with the in-progress list, before doing anything expensive with the scripts. The in-progress list is maintained by the worker thread and resides inside the TransportManager
. The main thread sends a vector of file names through the message queue (again, using the unique-array trick to avoid copying) and waits for the return message that contains a pruned list of files. It then proceeds with this much shorter list.
Inversion of Control
Let me summarize the algorithm:
- List the directory.
- Compare this list with the list of scripts currently being processed. Keep the ones that are not already in the in-progress queue.
- Perform local processing and local copying of the remaining scripts.
- Send the leftovers back to the
WorkQueue
and insert them into the in-progress queue for remote copying.
It’s relatively easy to translate this algorithm into sequential code, if we were to use synchronous message passing. In that case, after sending a message to WorkQueue
, the main thread would wait for the acknowledgment before continuing. The problem is that the worker thread might be busy, well… working. It might take quite some time before it is free to check for messages. All that time the main thread is blocked and the program is unresponsive. Remember, our goal was to decrease the latency!
Asynchronous message passing would work better but it has its own problems. First of all, the nice sequential code would have to be chopped up into separate functions:
- Directory notification handler list the directory and sends a message with the list of file names to the worker thread. Return.
- In the worker thread, the handler for this particular message compares the list with its own list and sends a message to the main thread with the pruned list. Return.
- In the main thread, the handler for the pruned-list message performs local processing and sends another message to the worker thread with the leftover list. Return.
- In the worker thread, the handler for that message adds the list to the in-progress queue. Return.
Notice that this sequence would exist only in the mind of the programmer (maybe in some documentation too). It’s very hard to reconstruct it by just looking at the code because it’s scattered between various handlers. This is called reactive programming and it’s notoriously difficult. What makes it even harder is that, after sending a message, the context of the call is lost. The return message must contain enough information to restore this context. There are many advanced techniques to overcome this problem: closures, continuations, co-routines, etc.; but they are mostly restricted to functional languages and are hard to adapt to the object-oriented paradigm.
Another big problem is the lack of direct support for message passing in many languages, including C++, Java, C#, or D. These languages are strongly typed so, barring unsafe casting, the programmer must create a separate message queue type for every message type. This is not only a major annoyance, but raises the problem of composability: A thread might want to wait for several types of messages at once.
Consider how, because of these limitations, type safety is compromised in Windows. All Windows messages have to fit the same mold: (message ID, WPARAM
, and LPARAM
). All typing information is lost and it’s up to the programmer to recover it, often by unchecked type casting.
In functional languages, composability of message queues (also called channels or mailboxes) is solved using pattern matching. In a nutshell, pattern matching is like a switch statement with each case potentially having a different type. Within the same statement you may be able to match a string message or an integer message. The latest batch of modern all-purpose languages like Scala or F# have excellent support for pattern matching.
Sharing is Sometimes Simpler
Let’s re-consider the problem of list pruning: The main thread has a list of candidates, the worker thread has an in-progress list. We want to remove all items from the candidate list that are present in the in-progress list. In the message-passing approach, the main thread must send the list of candidates to the worker thread and wait for the answer (in the asynchronous case, it might pick up other tasks while waiting for the answer). It has to wait even if the worker thread is not currently using the in-progress list. There is no way for the worker thread to say, “I’m busy, why don’t you do it yourself.”
Actually, there is–if we allow inter-thread sharing. Of course, concurrent access to shared state must be protected, for instance, by locking. In our current implementation, the WorkQueue
is a monitor that protects the in-progress list. The main thread takes the WorkQueue
lock and compares the two lists directly. The only time it must wait is when the worker thread has already taken the lock and is in the process of accessing the in-progress list. In fact, our the in-progress list is split into two lists: One contains the immutable ScriptInfo
objects and is shared as part of WorkQueue
. The other contains ScriptTicket
s and is part of TransportManager
, which is not shared and therefore doesn’t require locking.
Sharing has its problems too: races and deadlocks. Races can be avoided if you are religious about protecting all shared data with monitors. Deadlocks are also avoidable, if you use locks sparingly. The more monitors you have in your program, the more likely they will deadlock. On the other hand, the more messages you have, the more data copying you do and the more chopped-up your code is.
So the best approach, in my opinion, is to judiciously combine message passing with sharing.
What’s Next?
So far I have described the preliminary re-write of the dispatcher. At Reliable Software, we usually start the implementation of a new feature by re-writing the surrounding code. I realize that this is a very unusual practice in most software companies yet it’s been working miracles for our product. There is very little “code rot” in Code Co-op after 14 years of continuous development. There are no parts of the product that “better not be touched or they will break.”
The next implementation step will be to divide the actual copying between multiple threads. But that should be a breeze in the newly re-written dispatcher. I’ll keep you posted.
February 8, 2010 at 3:41 pm
[…] was a pretty amazing trip. I described it in my other blog, which deals more with actual programming experience. I though it might be a nice break from all […]
February 8, 2010 at 9:27 pm
That’s interesting, and it makes me realize how easy libdispatch makes all this to write. Seeing your list of four steps to do, here’s how I’d write the algorith with libdispatch (and Apple’s block extension to C):
Here you can look at your whole algorithm in one tidy function.
February 9, 2010 at 12:47 am
This is an example of functional-style continuation passing–one way of making reactive programming look more like sequential programming.
This code might look simple, but it really isn’t. Just notice how deeply the continuations nest. This is not sequential code, this is a nesting of anonymous functions, each executing in a different thread.
Look how the list object is apparently shared between threads. As far as I understand, the runtime actually makes immutable copies of local arguments before passing them to threads, so pruneList() can’t really prune the list in-place.
The functions pruneList() and workOnList() are supposed to work on a separate (mutable!) in-progress list. Where would that list be kept?
Functional programming does not mix well with mutable state, so the dispatcher designed using libdispatch would look totally different then it does now. It would probably be unreadable to OO programmers, and there would be much more copying behind the scenes.
February 9, 2010 at 2:28 am
I don’t understand why the WorkerThread pulls more than one script off the queue? Can it actually process more than one script at a time? Even so, it makes sense to me to put more intelligence into the message queue itself. Instead of a simple enqueue/dequeue, make it more like a priority queue which enforces uniqueness. Then, when you try to add a script to the queue that it already knows about, it just ignores it (this effectively puts the pruneList() functionality inside the message queue). The only waste is that you may enqueue the item(s) that the worker is currently working on. However, you can even avoid this in one of two ways:
1) Leave items on the queue until the WorkerThread is *finished* processing them. The WorkerThread just peeks at the queue for its next item. This will avoid all duplicate enqueues. You don’t actually know what the WorkerThread is currently working on.
2) If you need to know what the WorkerThread is processing *now*, simply have both threads maintain a list of scripts, but have the WorkerThread send deltas through the message queue (or a new queue for the back channel):
[M] FileA, FileB, FileC -> [W]
[M] <- FileA, FileB: Start [W]
[M] <- FileA: Finish [W]
[M] <- FileC: Start [W]
[M] <- FileB, FileC: Finish [W]
Essentially, the WorkerThread tells the MainThread how to update its list to reflect reality, and the MainThread can use its private version however it likes to eliminate redundant work requests as well as display progress. I actually like this solution better, and I think it's very much in the spirit of message passing.
February 9, 2010 at 3:21 pm
David, the actual goal of the re-write is to enable parallel processing of multiple scripts at a time. That’s what I’m working on right now.
I also thought of putting more intelligence into the message queue. In fact the whole WorkQueue object is an intelligent message queue. That only shows you that a one-queue-fits-all approach is not flexible enough. But if the programmer is to write their own message queues, he or she will have to deal with low level sharing and synchronization anyway (especially the “peeking” business). The abstraction is ripped open.
Having two synchronized copies of the in-progress list has its own problems. There are more messages and more message handlers, which makes it hard to maintain (the problem of reactive programming) and easy to introduce subtle errors. Try adding to it more threads, which do the physical copying, and things become really hairy.
Finally, notice that the current message-passing design in D will have problems passing non-trivial messages like my ScriptTickets. That’s actually a big worry for me.
February 10, 2010 at 12:21 am
So if I understand correctly, there will be *multiple* WorkerThreads, each of which will process a single script at a time? Or will there be one WorkerThread which processes many scripts by itself?
Of course one type of message queue is not sufficient, any more than one type of container is sufficient (though VB contends otherwise with its Collection). The real question is: which kinds of message queues are most useful and standardizable? We can probably take a hint from SOA and the kinds of messaging which exist there.
I reject the idea that having local copies is harder than the alternatives. I also reject the characterization of my proposed solution as “reactive”. In fact, what I proposed is not at all reactive. In fact, I think what the 2-channel solution does for you is allow you to give up the complexity of deadlock-oriented programming in exchange for the well-understood and fairly intuitive paradigm of inter-process network communication. Anyone who has worked with a web server or app server can reason soundly about the 2-channel solution. Here is what each thread looks like:
WorkerThread:
while (true)
{
workQueue.dequeue(ticket);
statusQueue.enqueue(startMessage(ticket));
processTicket();
statusQueue.enqueue(doneMessage(ticket));
}
MainThread:
while (true)
{
while (!statusQueue.empty())
{
statusQueue.dequeue(ticket);
updateList(ticket);
}
updateDisplay();
scanDirectory();
pruneList();
foreach (ticket : list)
{
workQueue.enqueue(ticket);
}
}
As you can see, nobody is waiting for, or reacting to, anybody else. There is no guarantee that the main thread will see updates exactly when they occur, but it will see them as often as it checks, which should be once per iteration. Maintaining the list is as straightforward as in the alternative designs, and this solution is about as clean as if WorkerThread were a service in an SOA rather than a local thread.
February 9, 2010 at 5:42 am
It all depends what List is. If List is a pointer, only the pointer is copied and the list it points to is thus shared. Of course the C language makes not safety guaranty regarding sharing, so it’s up to the programmer to handle things right.
So it may look like functional-style continuation, but it isn’t necessarily functional. This is C after all, so you still have mutability. I should probably call free(list) after workOnList() though.
As for saying it’s not sequential code… Because the dispatch_async calls are made at the end of each task, you can be sure each task will happen in order so in a way the algorithm is sequential although it runs in parallel with other tasks. But you could use dispatch_async to create other patterns, like execute subtasks in parallel.
February 9, 2010 at 3:27 pm
It sounds like libdispatch makes it easier to write buggy code. I think I’d rather switch to Scala, which offers continuations that are easier to protect from bugs.
February 9, 2010 at 5:46 am
Composability of message queues:
pattern matching in FPLs is just proper tagged union support, which C++ indeed lacks.
It seems that Boost::Variant implements this, but I’m not sure it as practical as the native support found in FPLs.
February 9, 2010 at 5:46 am
You cant use __block so the list is shared between the blocks. To make safes accesses to the list a serial queue would have to be used. Only the local processing would use a concurrent queue.
February 10, 2010 at 1:20 am
David,
The plan is to have the main thread, the WorkQueue thread, and a variable number of copying threads. There is one copying thread per destination (network path) and, since a single script may be going to multiple destinations, there is no one-to-one correspondence between scripts and threads.
In your code you are assuming that the worker thread waits for only one type of message. That’s not true in our case, so there is a problem of having separate handlers for different message types. In particular, at any time a copying thread may be sending a message back to the worker thread, notifying it of copy completion (processTicket() is asynchronous). As far as I know, D’s message queues are parametrized by handler functions, so they are by design reactive.
Your main thread is actively polling. We can’t afford such a busy thread. scanDirectory() should only be called in response to system notification, so the main thread should block waiting on alternative message sources and “react” to them. Only in the very simplest cases you can get away from reactive programming.
I know I put you in a defensive position–I keep throwing at you more and more complications. But that’s because my problem is taken from “real life.” In the blog I had to describe an idealized version, otherwise the post would be big and boring.
February 11, 2010 at 3:44 am
Why is having multiple message types any harder for MP than for DOP? Sounds like an irrelevant detail to me. It also seems irrelevant that messages are sent asynchronously rather than synchronously. If you prefer synchronous semantics, you can always get that by building a little infrastructure. And complaining that my MainThread is actively polling is a little silly. Of course that isn’t exactly what your main thread would look like. The essential point is that there is nothing particularly complicated about passing messages.
The real question is whether it is better than sharing state. None of your criticisms imply that sharing state is a better solution. All you said was: “Your idealized example does not exactly match my real application.” Well of course not! If instead you had said: “That is inferior to sharing state because…”, then you would have a case. As it is, it sounds like you are simply committed to DOP as a theological principle, which is certainly your prerogative. But then, it makes your offer to take MP seriously sound a bit disingenuous.
February 22, 2010 at 12:05 pm
The main issue with MP (other than awkward syntactic support for it in mainstream languages) is the inversion of control. In your code you tried to minimize it by limiting the number of possible messages and by replacing messaging by polling. So that’s what I pointed out.
This approach doesn’t scale and at some point inversion of control creeps in and makes your code look like spaghetti. That’s been my experience.
March 3, 2010 at 2:25 am
“but they are mostly restricted to functional languages and are hard to adapt to the object-oriented paradigm”
C++ is considered “multi-paradigm”. With STL, Boost and C++0x around the corner functional programming becomes just that much easier (while not quite LISP yet).
Why do people keep thinking about C++ as “C with objects”?
March 3, 2010 at 11:08 am
In functional languages side effects are under strict control. Regular functions have no side effects (they are “pure”). Also, mutation is controlled–regular data structures are immutable. With those assumptions it’s easy to avoid data races.
Contrast this with multi-paradigm languages where mutation and side effects are the rule. Closures (which will be introduced in C++0x) can capture mutable environments and, almost invisibly, share mutable variables. So, as usual, C++ added a few more ways to shoot yourself in the foot.
And I’m not even talking about the syntax.
It’s not impossible to use functional techniques in C++0x, but it’s not easy. Scala, on the other hand, has much better intrinsic support for paradigms other than OO so I’d rather look in that direction.
March 3, 2010 at 4:05 pm
All points taken, but with freedom comes responsibility.
While other languages force you to do things certain way (and trying to do otherwise is jumping major hoops), C++ gives you the freedom to do whatever you want or appropriate for the situation, even if you want/need to mix styles/paradigms.
And this is probably its biggest flaw, you have to know what you want and how to do it. You need to make decisions: you have to decide where to draw the line on memory footprint, CPU use, time to market, ease to maintain, etc, etc… and balance between all those. Many other languages have these decided for you most of the time.
May 14, 2010 at 9:22 pm
When can we expect Part 2 of this interesting post ?
May 15, 2010 at 2:05 pm
This work has prompted me to look into the implementation of message passing in a more general way. I’m going to give a talk about this at the Northwest C++ Users Group this coming Wednesday. The blog will follow. And when I’m done with dispatcher improvements, I’ll describe them too.
May 16, 2010 at 9:30 am
[…] is very much a real life threading problem that even expert programmer encounters. (And I am no expert. […]
December 30, 2010 at 8:58 am
Well, its difficult to say whether the comments above redirect to the conclusion. Here, everyone is opposing others comment by making a new feature or a demerit added up to mix up the issue.
Why not follow what is said by DP because its true, C++ has enormous features to suffice the needs.