How does theory translate into practice? In my other blog I discussed various approaches to concurrency, including ownership-based type systems. And now I’m faced with the task of applying these methods to a real-life software product, Code Co-op. Our clients asked us to add more concurrency to make one part of Code Co-op, the dispatcher, more responsive. The implementation language is plain old C++, so the methodology must be based more on strict discipline than on the help from the type system.

I would like to use message passing as much as possible but, for performance reasons, I often find myself forced to operate on shared state using locks. I will discuss various trade-offs between message passing and sharing. I invite the readers to share their opinions, or even come up with better implementations.

When writing this code I often thought how much easier it would be to do it in the D programming language, which supports immutable and shared types, not to mention garbage collection. However, D is strongly biased towards message passing, which is restricted to value types and immutable objects à la Erlang. My C++ implementation combines the passing of messages that are not necessarily value types and direct sharing of data structures between threads. I welcome discussion from the D community.

How the Dispatcher Works

The dispatcher is a program, part of Code Co-op, whose task is to distribute various deltas (we call them scripts) to local and remote members of version-controlled projects. Scripts materialize in a special directory and the dispatcher gets notifications every time something changes in that directory. It lists the directory and creates a container of file names. For each file, it reads its header to find out the intended recipients for that script. It looks up the recipients in its own database and maps them to destinations. A destination is a path or an email address. Paths may be local or network, e.g., \\computername\sharename. For the sake of discussion, I’ll concentrate on network paths only.

When decoding the file header, the dispatcher creates two data structures for each script. One is the immutable description of the script called ScriptInfo. It contains data that never change, like the file path, a list of recipients, etc. The other, called ScriptTicket is mutable: it contains a pointer to the corresponding ScriptInfo, a number of flags that mark its progress, and a list of copy requests–each request corresponding to a destination (here, a network path). Notice that C++ has no notion of immutability, so it’s up to the programmer to enforce it in the case of ScriptInfo.

Script Ticket contains mutable state including an array of Copy Requests, and a pointer to the immutable Script Info

In a nutshell, the dispatcher has to iterate over ScriptTickets, iterate over each ticket’s copy requests, perform the copying and, if successful, physically stamp the corresponding file’s header to avoid sending multiple copies in the same direction. Only after all ScriptTicket‘s copy requests have been successful, the corresponding script file is deleted from disk.

There is one big problem: script copying, especially across a network, might be very slow. We don’t want the dispatcher to be completely unresponsive during copying, so we need a separate worker thread to dispatch scripts asynchronously. Of course the main thread has to be able to communicate, back and forth, with the worker thread. Let’s have a look at the data structures that must be either shared or passed between threads.

Inter-Thread Communication Objects

ScriptInfo, being immutable, can be safely shared between threads without any need for synchronization. ScriptTicket on the other hand is mutable. Fortunately it’s a value type, so it may be safely passed between threads by value.

ScriptTicket is not just a flat structure–it contains pointers. A data structure with pointers can be passed around by value as long as it is deep-copied. Not only pointers must be copied but also the objects they point to, recursively. However, deep copy is not always necessary for thread safety, as illustrated by ScriptTicket.

The ticket holds a pointer to the corresponding ScriptInfo but, since ScriptInfo is immutable, there is no need to deep-copy it.

ScriptTicket also contains a container of (mutable) copy requests. C++ Standard Library containers of value types are also value types, even though they are pointer-based. Their copy constructors and the assignment operators perform deep copy. (D might have a problem with this: objects with pointers to mutable data cannot be passed as inter-thread messages.)

There is one more twist: the ticket also contains a pointer to a shared monitor (an object that has its own lock and whose all public methods are “synchronized”–they all take a lock). This monitor (not shown in the picture) is used by the UI part of the dispatcher and must reflect all changes in the status of the scripts being processed. As long as the monitor is correctly implemented, it is safe to embed a (shallow) pointer to it inside the ScriptTicket–an object that is passed “by value” between threads. Here we are stepping out of value semantics, but without sacrificing thread safety.

Note that ScriptTicket is easily described in terms of ownership types. The owner of the ticket is always the current thread (owner::thread). The mutable container, _requests, is owned by the ticket and, consequently, by the current thread. The ScriptInfo inside the ticket is immutable (owner::immutable). The monitor used for UI communication is owned by “self” (owner::self). The latter two can be shared between threads.

Although script tickets are passed by value between threads, they are often passed by reference within threads–not only for performance reasons but also because the caller wants to see the changes made by the callee. In the ownership-based system, such references would be marked as lent and the system would ensure that no aliases to them were ever leaked to the outside world.

The Worker Thread

When working with threads, the most fundamental rule is to separate shared state from thread-local state. Shared state requires synchronization, local doesn’t. This is why I fell out with the OO active object pattern–in particular the Java Thread object. Active objects are notorious for mixing shared and local state. Since the dispatcher was written before I had this epiphany, it used our library of active objects. Not any more!

Here’s how it works: The main thread spawns a worker thread, passing it a function and some data. The new thread calls this function and passes it the data. The types of data that may be passed to a worker thread are restricted. The list is pretty short:

  1. Value data that are passed by copying. Simple examples: an integer, a vector of values, etc. The important thing is that this data is not shared–each thread works on its own copy. To be exact, thread-safe values may safely contain (shallow) pointers to shared monitors and immutable objects. Example: ScriptTicket.
  2. Monitors. Monitor objects are shared; they contain a lock, which is used to synchronize all their public methods. (In Java or D these methods would be synchronized). Examples: WorkQueue, which I will describe shortly. Also, message queues are usually monitors.
  3. Immutable objects–they can be shared without synchronization, since nobody ever modifies them. Example: ThreadInfo.
  4. Unique data, à la auto_ptr. A unique object is guaranteed to be only visible in one thread at a time. The sender of this object automatically loses access to it, so unique objects don’t need synchronization.

That’s it! A vast majority of concurrency problems may be (efficiently) solved using these patterns. Some languages, like Erlang, are restricted to pure values (immutability being used internally as an optimization). The drawback of such restrictions is poor performance in the local case (e.g., when Erlang processes are just threads).

Going back to the dispatcher: The worker thread is spawned with two objects:

  • WorkQueue, which is a monitor and
  • TransportManager, which is passed as a unique object and becomes local to the worker thread.

WorkQueue contains shared data, which is protected by its lock. TransportManager contains data local to the worker thread and doesn’t require any synchronization. Before the rewrite, both of these object were combined into a single active object, and extreme caution had to be used to avert the dangers of incidental sharing. In the current design, WorkQueue has no access to TransportManager so it’s impossible for the main thread to accidentally touch the not-to-be-shared data of the worker thread. You have no idea how this separation simplified the reasoning about and the coding of interactions between threads.

The WorkQueue object is shared between threads, Transport Manager is private to the Worker Thread.

The worker thread function has access to both objects. In its idle state it is blocked waiting for a message from the main thread. The message queue is part of the WorkQueue object. When a message arrives, the worker thread wakes up, picks whatever data it needs from WorkQueue and calls the methods of TransportManager. When it’s done, it loops back and waits for further messages.

Notice that, in the picture, threads are not associated with objects, as they would be in the active-object model. Instead objects exist between threads, when they are shared; or inside threads, when they are not.

The Message Passing Approach

In principle all communication between threads could all be implemented with message passing. I will describe how it could be done in this particular case and then consider other options.

Let’s start with a use case: a hundred scripts are dumped, one by one, into the dispatcher’s directory. In the naive approach, each time the main thread is notified it lists the directory, reads the header of each script from disk, creates script slips and passes them to the worker thread. In the worst case that would create and send about 5,000 bulky script tickets, 4900 of them unnecessary duplicates. That’s because each file drop creates a separate notification followed by a new complete directory listing. Files might be dropped quickly but their processing can take arbitrarily long.

The first optimization that comes to mind is to send whole arrays of tickets at once. If such an array could be passed as a unique object, the copying could be avoided (and the message queue lock would be used once per array). The worst case timing would involve 100 such messages (one per directory change).

This is still very wasteful considering that we’d be reading and interpreting 5,000 file headers, and disk I/O is very expensive. Also, in the current design, the dispatcher’s main thread performs relatively cheap local copying to recipients on the same machine. Without additional checks, each file would be delivered locally, on average, 50 times (the delivery is idempotent, but not free). That’s clearly an overkill, considering that we do store a list of scripts that are already being processed. There’s no need to process them again and again.

So the second optimization is to take the raw directory listing and compare it with the in-progress list, before doing anything expensive with the scripts. The in-progress list is maintained by the worker thread and resides inside the TransportManager. The main thread sends a vector of file names through the message queue (again, using the unique-array trick to avoid copying) and waits for the return message that contains a pruned list of files. It then proceeds with this much shorter list.

Inversion of Control

Let me summarize the algorithm:

  1. List the directory.
  2. Compare this list with the list of scripts currently being processed. Keep the ones that are not already in the in-progress queue.
  3. Perform local processing and local copying of the remaining scripts.
  4. Send the leftovers back to the WorkQueue and insert them into the in-progress queue for remote copying.

It’s relatively easy to translate this algorithm into sequential code, if we were to use synchronous message passing. In that case, after sending a message to WorkQueue, the main thread would wait for the acknowledgment before continuing. The problem is that the worker thread might be busy, well… working. It might take quite some time before it is free to check for messages. All that time the main thread is blocked and the program is unresponsive. Remember, our goal was to decrease the latency!

Asynchronous message passing would work better but it has its own problems. First of all, the nice sequential code would have to be chopped up into separate functions:

  1. Directory notification handler list the directory and sends a message with the list of file names to the worker thread. Return.
  2. In the worker thread, the handler for this particular message compares the list with its own list and sends a message to the main thread with the pruned list. Return.
  3. In the main thread, the handler for the pruned-list message performs local processing and sends another message to the worker thread with the leftover list. Return.
  4. In the worker thread, the handler for that message adds the list to the in-progress queue. Return.

Notice that this sequence would exist only in the mind of the programmer (maybe in some documentation too). It’s very hard to reconstruct it by just looking at the code because it’s scattered between various handlers. This is called reactive programming and it’s notoriously difficult. What makes it even harder is that, after sending a message, the context of the call is lost. The return message must contain enough information to restore this context. There are many advanced techniques to overcome this problem: closures, continuations, co-routines, etc.; but they are mostly restricted to functional languages and are hard to adapt to the object-oriented paradigm.

Another big problem is the lack of direct support for message passing in many languages, including C++, Java, C#, or D. These languages are strongly typed so, barring unsafe casting, the programmer must create a separate message queue type for every message type. This is not only a major annoyance, but raises the problem of composability: A thread might want to wait for several types of messages at once.

Consider how, because of these limitations, type safety is compromised in Windows. All Windows messages have to fit the same mold: (message ID, WPARAM, and LPARAM). All typing information is lost and it’s up to the programmer to recover it, often by unchecked type casting.

In functional languages, composability of message queues (also called channels or mailboxes) is solved using pattern matching. In a nutshell, pattern matching is like a switch statement with each case potentially having a different type. Within the same statement you may be able to match a string message or an integer message. The latest batch of modern all-purpose languages like Scala or F# have excellent support for pattern matching.

Sharing is Sometimes Simpler

Let’s re-consider the problem of list pruning: The main thread has a list of candidates, the worker thread has an in-progress list. We want to remove all items from the candidate list that are present in the in-progress list. In the message-passing approach, the main thread must send the list of candidates to the worker thread and wait for the answer (in the asynchronous case, it might pick up other tasks while waiting for the answer). It has to wait even if the worker thread is not currently using the in-progress list. There is no way for the worker thread to say, “I’m busy, why don’t you do it yourself.”

Actually, there is–if we allow inter-thread sharing. Of course, concurrent access to shared state must be protected, for instance, by locking. In our current implementation, the WorkQueue is a monitor that protects the in-progress list. The main thread takes the WorkQueue lock and compares the two lists directly. The only time it must wait is when the worker thread has already taken the lock and is in the process of accessing the in-progress list. In fact, our the in-progress list is split into two lists: One contains the immutable ScriptInfo objects and is shared as part of WorkQueue. The other contains ScriptTickets and is part of TransportManager, which is not shared and therefore doesn’t require locking.

Sharing has its problems too: races and deadlocks. Races can be avoided if you are religious about protecting all shared data with monitors. Deadlocks are also avoidable, if you use locks sparingly. The more monitors you have in your program, the more likely they will deadlock. On the other hand, the more messages you have, the more data copying you do and the more chopped-up your code is.

So the best approach, in my opinion, is to judiciously combine message passing with sharing.

What’s Next?

So far I have described the preliminary re-write of the dispatcher. At Reliable Software, we usually start the implementation of a new feature by re-writing the surrounding code. I realize that this is a very unusual practice in most software companies yet it’s been working miracles for our product. There is very little “code rot” in Code Co-op after 14 years of continuous development. There are no parts of the product that “better not be touched or they will break.”

The next implementation step will be to divide the actual copying between multiple threads. But that should be a breeze in the newly re-written dispatcher. I’ll keep you posted.

About these ads