Выбрать главу

        // ...

        cout << "New toast " << ++toasted << endl;

        butterer->moreToastReady();

      }

    } catch(Interrupted_Exception&) { /* Exit */ }

    cout << "Toaster off" << endl;

  }

};

int main() {

  try {

    cout << "Press <Return> to quit" << endl;

    CountedPtr<Jammer> jammer(new Jammer);

    CountedPtr<Butterer> butterer(new Butterer(jammer));

    ThreadedExecutor executor;

    executor.execute(new Toaster(butterer));

    executor.execute(butterer);

    executor.execute(jammer);

    cin.get();

    executor.interrupt();

  } catch(Synchronization_Exception& e) {

    cerr << e.what() << endl;

  }

} ///:~

The classes are defined in the reverse order that they operate to simplify forward-referencing issues.

Jammer and Butterer both contain a Mutex, a Condition, and some kind of internal state information that changes to indicate that the process should suspend or resume. (Toaster doesn’t need these since it is the producer and doesn’t have to wait on anything.) The two run( ) functions perform an operation, set a state flag, and then call wait( ) to suspend the task. The moreToastReady( ) and moreButteredToastReady( ) functions change their respective state flags to indicate that something has changed and the process should consider resuming and then call signal( ) to wake up the thread.

The difference between this example and the previous one is that, at least conceptually, something is being produced here: toast. The rate of toast production is randomized a bit, to add some uncertainty. And you’ll see that when you run the program, things aren’t going right, because many pieces of toast appear to be getting dropped on the floor—not buttered, not jammed.

Solving threading problems with queues

Often, threading problems are based on the need for tasks to be serialized—that is, to take care of things in order. ToastOMatic.cpp must not only take care of things in order, it must be able to work on one piece of toast without worrying that toast is falling on the floor in the meantime. You can solve many threading problems by using a queue that synchronizes access to the elements within:

//: C11:TQueue.h

#ifndef TQUEUE_H

#define TQUEUE_H

#include "zthread/Thread.h"

#include "zthread/Condition.h"

#include "zthread/Mutex.h"

#include "zthread/Guard.h"

#include <deque>

template<class T> class TQueue {

  ZThread::Mutex lock;

  ZThread::Condition cond;

  std::deque<T> data;

public:

  TQueue() : cond(lock) {}

  void put(T item) {

    ZThread::Guard<ZThread::Mutex> g(lock);

    data.push_back(item);

    cond.signal();

  }

  T get() {

    ZThread::Guard<ZThread::Mutex> g(lock);

    while(data.empty())

      cond.wait();

    T returnVal = data.front();

    data.pop_front();

    return returnVal;

  }

};

#endif // TQUEUE_H ///:~

This builds on the Standard C++ Library deque by adding:

1.Synchronization to ensure that no two threads add objects at the same time.

2.wait( ) and signal( ) so that a consumer thread will automatically suspend if the queue is empty, and resume when more elements become available.

This relatively small amount of code can solve a remarkable number of problems.

Here’s a simple test that serializes the execution of LiftOff objects. The consumer is LiftOffRunner, which pulls each LiftOff object off the TQueue and runs it directly. (That is, it uses its own thread by calling run( ) explicitly rather than starting up a new thread for each task.)

//: C11:TestTQueue.cpp

//{L} ZThread

#include <string>

#include <iostream>

#include "TQueue.h"

#include "zthread/Thread.h"

#include "LiftOff.h"

using namespace ZThread;

using namespace std;

class LiftOffRunner : public Runnable {

  TQueue<LiftOff*> rockets;

public:

  void add(LiftOff* lo) { rockets.put(lo); }

  void run() {

    try {

      while(!Thread::interrupted()) {

        LiftOff* rocket = rockets.get();

        rocket->run();

      }

    } catch(Interrupted_Exception&) { /* Exit */ }

    cout << "Exiting LiftOffRunner" << endl;

  }

};

int main() {

  try {

    LiftOffRunner* lor = new LiftOffRunner;

    Thread t(lor);

    for(int i = 0; i < 5; i++)

      lor->add(new LiftOff(10, i));

    cin.get();

    lor->add(new LiftOff(10, 99));

    cin.get();

    t.interrupt();

  } catch(Synchronization_Exception& e) {

    cerr << e.what() << endl;

  }

} ///:~

The tasks are placed on the TQueue by main( ) and are taken off the TQueue by the LiftOffRunner. Notice that LiftOffRunner can ignore the synchronization issues because they are solved by the TQueue.

Proper toasting

To solve the ToastOMatic.cpp problem, we can run the toast through TQueues between processes. And to do this, we will need actual toast objects, which maintain and display their state:

//: C11:ToastOMaticMarkII.cpp

// Solving the problems using TQueues

//{L} ZThread

#include "zthread/Thread.h"

#include "zthread/Mutex.h"

#include "zthread/Guard.h"

#include "zthread/Condition.h"

#include "zthread/ThreadedExecutor.h"

#include "TQueue.h"

#include <iostream>

#include <string>

#include <cstdlib>

#include <ctime>

using namespace ZThread;

using namespace std;

class Toast {

  enum Status { dry, buttered, jammed };

  Status status;

  int id;

public:

  Toast(int idn) : id(idn), status(dry) {}

  void butter() { status = buttered; }

  void jam() { status = jammed; }

  string getStatus() const {