123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

/**
 * Utilities for asynchronously querying multiple servers, building on
 * TAsyncClient.
 *
 * Terminology note: The names of the artifacts defined in this module are
 *   derived from »client pool«, because they operate on a pool of
 *   TAsyncClients. However, from a architectural point of view, they often
 *   represent a pool of hosts a Thrift client application communicates with
 *   using RPC calls.
 */
module thrift.codegen.async_client_pool;

import core.sync.mutex;
import core.time : Duration, dur;
import std.algorithm : map;
import std.array : array, empty;
import std.exception : enforce;
import std.traits : ParameterTypeTuple, ReturnType;
import thrift.base;
import thrift.codegen.base;
import thrift.codegen.async_client;
import thrift.internal.algorithm;
import thrift.internal.codegen;
import thrift.util.awaitable;
import thrift.util.cancellation;
import thrift.util.future;
import thrift.internal.resource_pool;

/**
 * Represents a generic client pool which implements TFutureInterface!Interface
 * using multiple TAsyncClients.
 */
interface TAsyncClientPoolBase(Interface) if (isService!Interface) :
  TFutureInterface!Interface
{
  /// Shorthand for the client type this pool operates on.
  alias TAsyncClientBase!Interface Client;

  /**
   * Adds a client to the pool.
   */
  void addClient(Client client);

  /**
   * Removes a client from the pool.
   *
   * Returns: Whether the client was found in the pool.
   */
  bool removeClient(Client client);

  /**
   * Called to determine whether an exception comes from a client from the
   * pool not working properly, or if it an exception thrown at the
   * application level.
   *
   * If the delegate returns true, the server/connection is considered to be
   * at fault, if it returns false, the exception is just passed on to the
   * caller.
   *
   * By default, returns true for instances of TTransportException and
   * TApplicationException, false otherwise.
   */
  bool delegate(Exception) rpcFaultFilter() const @property;
  void rpcFaultFilter(bool delegate(Exception)) @property; /// Ditto

  /**
   * Whether to open the underlying transports of a client before trying to
   * execute a method if they are not open. This is usually desirable
   * because it allows e.g. to automatically reconnect to a remote server
   * if the network connection is dropped.
   *
   * Defaults to true.
   */
  bool reopenTransports() const @property;
  void reopenTransports(bool) @property; /// Ditto
}

immutable bool delegate(Exception) defaultRpcFaultFilter;
static this() {
  defaultRpcFaultFilter = (Exception e) {
    import thrift.protocol.base;
    import thrift.transport.base;
    return (
      (cast(TTransportException)e !is null) ||
      (cast(TApplicationException)e !is null)
    );
  };
}

/**
 * A TAsyncClientPoolBase implementation which queries multiple servers in a
 * row until a request succeeds, the result of which is then returned.
 *
 * The definition of »success« can be customized using the rpcFaultFilter()
 * delegate property. If it is non-null and calling it for an exception set by
 * a failed method invocation returns true, the error is considered to be
 * caused by the RPC layer rather than the application layer, and the next
 * server in the pool is tried. If there are no more clients to try, the
 * operation is marked as failed with a TCompoundOperationException.
 *
 * If a TAsyncClient in the pool fails with an RPC exception for a number of
 * consecutive tries, it is temporarily disabled (not tried any longer) for
 * a certain duration. Both the limit and the timeout can be configured. If all
 * clients fail (and keepTrying is false), the operation fails with a
 * TCompoundOperationException which contains the collected RPC exceptions.
 */
final class TAsyncClientPool(Interface) if (isService!Interface) :
  TAsyncClientPoolBase!Interface
{
  ///
  this(Client[] clients) {
    pool_ = new TResourcePool!Client(clients);
    rpcFaultFilter_ = defaultRpcFaultFilter;
    reopenTransports_ = true;
  }

  /+override+/ void addClient(Client client) {
    pool_.add(client);
  }

  /+override+/ bool removeClient(Client client) {
    return pool_.remove(client);
  }

  /**
   * Whether to keep trying to find a working client if all have failed in a
   * row.
   *
   * Defaults to false.
   */
  bool keepTrying() const @property {
    return pool_.cycle;
  }

  /// Ditto
  void keepTrying(bool value) @property {
    pool_.cycle = value;
  }

  /**
   * Whether to use a random permutation of the client pool on every call to
   * execute(). This can be used e.g. as a simple form of load balancing.
   *
   * Defaults to true.
   */
  bool permuteClients() const @property {
    return pool_.permute;
  }

  /// Ditto
  void permuteClients(bool value) @property {
    pool_.permute = value;
  }

  /**
   * The number of consecutive faults after which a client is disabled until
   * faultDisableDuration has passed. 0 to never disable clients.
   *
   * Defaults to 0.
   */
  ushort faultDisableCount() const @property {
    return pool_.faultDisableCount;
  }

  /// Ditto
  void faultDisableCount(ushort value) @property {
    pool_.faultDisableCount = value;
  }

  /**
   * The duration for which a client is no longer considered after it has
   * failed too often.
   *
   * Defaults to one second.
   */
  Duration faultDisableDuration() const @property {
    return pool_.faultDisableDuration;
  }

  /// Ditto
  void faultDisableDuration(Duration value) @property {
    pool_.faultDisableDuration = value;
  }

  /+override+/ bool delegate(Exception) rpcFaultFilter() const @property {
    return rpcFaultFilter_;
  }

  /+override+/ void rpcFaultFilter(bool delegate(Exception) value) @property {
    rpcFaultFilter_ = value;
  }

  /+override+/ bool reopenTransports() const @property {
    return reopenTransports_;
  }

  /+override+/ void reopenTransports(bool value) @property {
    reopenTransports_ = value;
  }

  mixin(fallbackPoolForwardCode!Interface());

protected:
  // The actual worker implementation to which RPC method calls are forwarded.
  auto executeOnPool(string method, Args...)(Args args,
    TCancellation cancellation
  ) {
    auto clients = pool_[];
    if (clients.empty) {
      throw new TException("No clients available to try.");
    }

    auto promise = new TPromise!(ReturnType!(MemberType!(Interface, method)));
    Exception[] rpcExceptions;

    void tryNext() {
      while (clients.empty) {
        Client next;
        Duration waitTime;
        if (clients.willBecomeNonempty(next, waitTime)) {
          if (waitTime > dur!"hnsecs"(0)) {
            if (waitTime < dur!"usecs"(10)) {
              import core.thread;
              Thread.sleep(waitTime);
            } else {
              next.transport.asyncManager.delay(waitTime, { tryNext(); });
              return;
            }
          }
        } else {
          promise.fail(new TCompoundOperationException("All clients failed.",
            rpcExceptions));
          return;
        }
      }

      auto client = clients.front;
      clients.popFront;

      if (reopenTransports) {
        if (!client.transport.isOpen) {
          try {
            client.transport.open();
          } catch (Exception e) {
            pool_.recordFault(client);
            tryNext();
            return;
          }
        }
      }

      auto future = mixin("client." ~ method)(args, cancellation);
      future.completion.addCallback({
        if (future.status == TFutureStatus.CANCELLED) {
          promise.cancel();
          return;
        }

        auto e = future.getException();
        if (e) {
          if (rpcFaultFilter_ && rpcFaultFilter_(e)) {
            pool_.recordFault(client);
            rpcExceptions ~= e;
            tryNext();
            return;
          }
        }
        pool_.recordSuccess(client);
        promise.complete(future);
      });
    }

    tryNext();
    return promise;
  }

private:
  TResourcePool!Client pool_;
  bool delegate(Exception) rpcFaultFilter_;
  bool reopenTransports_;
}

/**
 * TAsyncClientPool construction helper to avoid having to explicitly
 * specify the interface type, i.e. to allow the constructor being called
 * using IFTI (see $(DMDBUG 6082, D Bugzilla enhancement request 6082)).
 */
TAsyncClientPool!Interface tAsyncClientPool(Interface)(
  TAsyncClientBase!Interface[] clients
) if (isService!Interface) {
  return new typeof(return)(clients);
}

private {
  // Cannot use an anonymous delegate literal for this because they aren't
  // allowed in class scope.
  string fallbackPoolForwardCode(Interface)() {
    string code = "";

    foreach (methodName; AllMemberMethodNames!Interface) {
      enum qn = "Interface." ~ methodName;
      code ~= "TFuture!(ReturnType!(" ~ qn ~ ")) " ~ methodName ~
        "(ParameterTypeTuple!(" ~ qn ~ ") args, TCancellation cancellation = null) {\n";
      code ~= "return executeOnPool!(`" ~ methodName ~ "`)(args, cancellation);\n";
      code ~= "}\n";
    }

    return code;
  }
}

/**
 * A TAsyncClientPoolBase implementation which queries multiple servers at
 * the same time and returns the first success response.
 *
 * The definition of »success« can be customized using the rpcFaultFilter()
 * delegate property. If it is non-null and calling it for an exception set by
 * a failed method invocation returns true, the error is considered to be
 * caused by the RPC layer rather than the application layer, and the next
 * server in the pool is tried. If all clients fail, the operation is marked
 * as failed with a TCompoundOperationException.
 */
final class TAsyncFastestClientPool(Interface) if (isService!Interface) :
  TAsyncClientPoolBase!Interface
{
  ///
  this(Client[] clients) {
    clients_ = clients;
    rpcFaultFilter_ = defaultRpcFaultFilter;
    reopenTransports_ = true;
  }

  /+override+/ void addClient(Client client) {
    clients_ ~= client;
  }

  /+override+/ bool removeClient(Client client) {
    auto oldLength = clients_.length;
    clients_ = removeEqual(clients_, client);
    return clients_.length < oldLength;
  }


  /+override+/ bool delegate(Exception) rpcFaultFilter() const @property {
    return rpcFaultFilter_;
  }

  /+override+/ void rpcFaultFilter(bool delegate(Exception) value) @property {
    rpcFaultFilter_ = value;
  }

  /+override+/bool reopenTransports() const @property {
    return reopenTransports_;
  }

  /+override+/ void reopenTransports(bool value) @property {
    reopenTransports_ = value;
  }

  mixin(fastestPoolForwardCode!Interface());

private:
  Client[] clients_;
  bool delegate(Exception) rpcFaultFilter_;
  bool reopenTransports_;
}

/**
 * TAsyncFastestClientPool construction helper to avoid having to explicitly
 * specify the interface type, i.e. to allow the constructor being called
 * using IFTI (see $(DMDBUG 6082, D Bugzilla enhancement request 6082)).
 */
TAsyncFastestClientPool!Interface tAsyncFastestClientPool(Interface)(
  TAsyncClientBase!Interface[] clients
) if (isService!Interface) {
  return new typeof(return)(clients);
}

private {
  // Cannot use an anonymous delegate literal for this because they aren't
  // allowed in class scope.
  string fastestPoolForwardCode(Interface)() {
    string code = "";

    foreach (methodName; AllMemberMethodNames!Interface) {
      enum qn = "Interface." ~ methodName;
      code ~= "TFuture!(ReturnType!(" ~ qn ~ ")) " ~ methodName ~
        "(ParameterTypeTuple!(" ~ qn ~ ") args, " ~
        "TCancellation cancellation = null) {\n";
      code ~= "enum methodName = `" ~ methodName ~ "`;\n";
      code ~= q{
        alias ReturnType!(MemberType!(Interface, methodName)) ResultType;

        auto childCancellation = new TCancellationOrigin;

        TFuture!ResultType[] futures;
        futures.reserve(clients_.length);

        foreach (c; clients_) {
          if (reopenTransports) {
            if (!c.transport.isOpen) {
              try {
                c.transport.open();
              } catch (Exception e) {
                continue;
              }
            }
          }
          futures ~= mixin("c." ~ methodName)(args, childCancellation);
        }

        return new FastestPoolJob!(ResultType)(
          futures, rpcFaultFilter, cancellation, childCancellation);
      };
      code ~= "}\n";
    }

    return code;
  }

  final class FastestPoolJob(Result) : TFuture!Result {
    this(TFuture!Result[] poolFutures, bool delegate(Exception) rpcFaultFilter,
      TCancellation cancellation, TCancellationOrigin childCancellation
    ) {
      resultPromise_ = new TPromise!Result;
      poolFutures_ = poolFutures;
      rpcFaultFilter_ = rpcFaultFilter;
      childCancellation_ = childCancellation;

      foreach (future; poolFutures) {
        future.completion.addCallback({
          auto f = future;
          return { completionCallback(f); };
        }());
        if (future.status != TFutureStatus.RUNNING) {
          // If the current future is already completed, we are done, don't
          // bother adding callbacks for the others (they would just return
          // immediately after acquiring the lock).
          return;
        }
      }

      if (cancellation) {
        cancellation.triggering.addCallback({
          resultPromise_.cancel();
          childCancellation.trigger();
        });
      }
    }

    TFutureStatus status() const @property {
      return resultPromise_.status;
    }

    TAwaitable completion() @property {
      return resultPromise_.completion;
    }

    Result get() {
      return resultPromise_.get();
    }

    Exception getException() {
      return resultPromise_.getException();
    }

  private:
    void completionCallback(TFuture!Result future) {
      synchronized {
        if (future.status == TFutureStatus.CANCELLED) {
          assert(resultPromise_.status != TFutureStatus.RUNNING);
          return;
        }

        if (resultPromise_.status != TFutureStatus.RUNNING) {
          // The operation has already been completed. This can happen if
          // another client completed first, but this callback was already
          // waiting for the lock when it called cancel().
          return;
        }

        if (future.status == TFutureStatus.FAILED) {
          auto e = future.getException();
          if (rpcFaultFilter_ && rpcFaultFilter_(e)) {
            rpcExceptions_ ~= e;

            if (rpcExceptions_.length == poolFutures_.length) {
              resultPromise_.fail(new TCompoundOperationException(
                "All child operations failed, unable to retrieve a result.",
                rpcExceptions_
              ));
            }

            return;
          }
        }

        // Store the result to the target promise.
        resultPromise_.complete(future);

        // Cancel the other futures, we would just discard their results.
        // Note: We do this after we have stored the results to our promise,
        // see the assert at the top of the function.
        childCancellation_.trigger();
      }
    }

    TPromise!Result resultPromise_;
    TFuture!Result[] poolFutures_;
    Exception[] rpcExceptions_;
    bool delegate(Exception) rpcFaultFilter_;
    TCancellationOrigin childCancellation_;
  }
}

/**
 * Allows easily aggregating results from a number of TAsyncClients.
 *
 * Contrary to TAsync{Fallback, Fastest}ClientPool, this class does not
 * simply implement TFutureInterface!Interface. It manages a pool of clients,
 * but allows the user to specify a custom accumulator function to use or to
 * iterate over the results using a TFutureAggregatorRange.
 *
 * For each service method, TAsyncAggregator offers a method
 * accepting the same arguments, and an optional TCancellation instance, just
 * like with TFutureInterface. The return type, however, is a proxy object
 * that offers the following methods:
 * ---
 * /++
 *  + Returns a thrift.util.future.TFutureAggregatorRange for the results of
 *  + the client pool method invocations.
 *  +
 *  + The [] (slicing) operator can also be used to obtain the range.
 *  +
 *  + Params:
 *  +   timeout = A timeout to pass to the TFutureAggregatorRange constructor,
 *  +     defaults to zero (no timeout).
 *  +/
 * TFutureAggregatorRange!ReturnType range(Duration timeout = dur!"hnsecs"(0));
 * auto opSlice() { return range(); } /// Ditto
 *
 * /++
 *  + Returns a future that gathers the results from the clients in the pool
 *  + and invokes a user-supplied accumulator function on them, returning its
 *  + return value to the client.
 *  +
 *  + In addition to the TFuture!AccumulatedType interface (where
 *  + AccumulatedType is the return type of the accumulator function), the
 *  + returned object also offers two additional methods, finish() and
 *  + finishGet(): By default, the accumulator functions is called after all
 *  + the results from the pool clients have become available. Calling finish()
 *  + causes the accumulator future to stop waiting for other results and
 *  + immediately invoking the accumulator function on the results currently
 *  + available. If all results are already available, finish() is a no-op.
 *  + finishGet() is a convenience shortcut for combining it with
 *  + a call to get() immediately afterwards, like waitGet() is for wait().
 *  +
 *  + The acc alias can point to any callable accepting either an array of
 *  + return values or an array of return values and an array of exceptions;
 *  + see isAccumulator!() for details. The default accumulator concatenates
 *  + return values that can be concatenated with each others (e.g. arrays),
 *  + and simply returns an array of values otherwise, failing with a
 *  + TCompoundOperationException no values were returned.
 *  +
 *  + The accumulator function is not executed in any of the async manager
 *  + worker threads associated with the async clients, but instead it is
 *  + invoked when the actual result is requested for the first time after the
 *  + operation has been completed. This also includes checking the status
 *  + of the operation once it is no longer running, since the accumulator
 *  + has to be run to determine whether the operation succeeded or failed.
 *  +/
 * auto accumulate(alias acc = defaultAccumulator)() if (isAccumulator!acc);
 * ---
 *
 * Example:
 * ---
 * // Some Thrift service.
 * interface Foo {
 *   int foo(string name);
 *   byte[] bar();
 * }
 *
 * // Create the aggregator pool – client0, client1, client2 are some
 * // TAsyncClient!Foo instances, but in theory could also be other
 * // TFutureInterface!Foo implementations (e.g. some async client pool).
 * auto pool = new TAsyncAggregator!Foo([client0, client1, client2]);
 *
 * foreach (val; pool.foo("baz").range(dur!"seconds"(1))) {
 *   // Process all the results that are available before a second has passed,
 *   // in the order they arrive.
 *   writeln(val);
 * }
 *
 * auto sumRoots = pool.foo("baz").accumulate!((int[] vals, Exceptions[] exs){
 *   if (vals.empty) {
 *     throw new TCompoundOperationException("All clients failed", exs);
 *   }
 *
 *   // Just to illustrate that the type of the values can change, convert the
 *   // numbers to double and sum up their roots.
 *   double result = 0;
 *   foreach (v; vals) result += sqrt(cast(double)v);
 *   return result;
 * })();
 *
 * // Wait up to three seconds for the result, and then accumulate what has
 * // arrived so far.
 * sumRoots.completion.wait(dur!"seconds"(3));
 * writeln(sumRoots.finishGet());
 *
 * // For scalars, the default accumulator returns an array of the values.
 * pragma(msg, typeof(pool.foo("").accumulate().get()); // int[].
 *
 * // For lists, etc., it concatenates the results together.
 * pragma(msg, typeof(pool.bar().accumulate().get())); // byte[].
 * ---
 *
 * Note: For the accumulate!() interface, you might currently hit a »cannot use
 * local '…' as parameter to non-global template accumulate«-error, see
 * $(DMDBUG 5710, DMD issue 5710). If your accumulator function does not need
 * to access the surrounding scope, you might want to use a function literal
 * instead of a delegate to avoid the issue.
 */
class TAsyncAggregator(Interface) if (isBaseService!Interface) {
  /// Shorthand for the client type this instance operates on.
  alias TAsyncClientBase!Interface Client;

  ///
  this(Client[] clients) {
    clients_ = clients;
  }

  /// Whether to open the underlying transports of a client before trying to
  /// execute a method if they are not open. This is usually desirable
  /// because it allows e.g. to automatically reconnect to a remote server
  /// if the network connection is dropped.
  ///
  /// Defaults to true.
  bool reopenTransports = true;

  mixin AggregatorOpDispatch!();

private:
  Client[] clients_;
}

/// Ditto
class TAsyncAggregator(Interface) if (isDerivedService!Interface) :
  TAsyncAggregator!(BaseService!Interface)
{
  /// Shorthand for the client type this instance operates on.
  alias TAsyncClientBase!Interface Client;

  ///
  this(Client[] clients) {
    super(cast(TAsyncClientBase!(BaseService!Interface)[])clients);
  }

  mixin AggregatorOpDispatch!();
}

/**
 * Whether fun is a valid accumulator function for values of type ValueType.
 *
 * For this to be true, fun must be a callable matching one of the following
 * argument lists:
 * ---
 * fun(ValueType[] values);
 * fun(ValueType[] values, Exception[] exceptions);
 * ---
 *
 * The second version is passed the collected array exceptions from all the
 * clients in the pool.
 *
 * The return value of the accumulator function is passed to the client (via
 * the result future). If it throws an exception, the operation is marked as
 * failed with the given exception instead.
 */
template isAccumulator(ValueType, alias fun) {
  enum isAccumulator = is(typeof(fun(cast(ValueType[])[]))) ||
    is(typeof(fun(cast(ValueType[])[], cast(Exception[])[])));
}

/**
 * TAsyncAggregator construction helper to avoid having to explicitly
 * specify the interface type, i.e. to allow the constructor being called
 * using IFTI (see $(DMDBUG 6082, D Bugzilla enhancement request 6082)).
 */
TAsyncAggregator!Interface tAsyncAggregator(Interface)(
  TAsyncClientBase!Interface[] clients
) if (isService!Interface) {
  return new typeof(return)(clients);
}

private {
  mixin template AggregatorOpDispatch() {
    auto opDispatch(string name, Args...)(Args args) if (
      is(typeof(mixin("Interface.init." ~ name)(args)))
    ) {
      alias ReturnType!(MemberType!(Interface, name)) ResultType;

      auto childCancellation = new TCancellationOrigin;

      TFuture!ResultType[] futures;
      futures.reserve(clients_.length);

      foreach (c; cast(Client[])clients_) {
        if (reopenTransports) {
          if (!c.transport.isOpen) {
            try {
              c.transport.open();
            } catch (Exception e) {
              continue;
            }
          }
        }
        futures ~= mixin("c." ~ name)(args, childCancellation);
      }

      return AggregationResult!ResultType(futures, childCancellation);
    }
  }

  struct AggregationResult(T) {
    auto opSlice() {
      return range();
    }

    auto range(Duration timeout = dur!"hnsecs"(0)) {
      return tFutureAggregatorRange(futures_, childCancellation_, timeout);
    }

    auto accumulate(alias acc = defaultAccumulator)() if (isAccumulator!(T, acc)) {
      return new AccumulatorJob!(T, acc)(futures_, childCancellation_);
    }

  private:
    TFuture!T[] futures_;
    TCancellationOrigin childCancellation_;
  }

  auto defaultAccumulator(T)(T[] values, Exception[] exceptions) {
    if (values.empty) {
      throw new TCompoundOperationException("All clients failed",
        exceptions);
    }

    static if (is(typeof(T.init ~ T.init))) {
      import std.algorithm;
      return reduce!"a ~ b"(values);
    } else {
      return values;
    }
  }

  final class AccumulatorJob(T, alias accumulator) if (
    isAccumulator!(T, accumulator)
  ) : TFuture!(AccumulatorResult!(T, accumulator)) {
    this(TFuture!T[] futures, TCancellationOrigin childCancellation) {
      futures_ = futures;
      childCancellation_ = childCancellation;
      resultMutex_ = new Mutex;
      completionEvent_ = new TOneshotEvent;

      foreach (future; futures) {
        future.completion.addCallback({
          auto f = future;
          return {
            synchronized (resultMutex_) {
              if (f.status == TFutureStatus.CANCELLED) {
                if (!finished_) {
                  status_ = TFutureStatus.CANCELLED;
                  finished_ = true;
                }
                return;
              }

              if (f.status == TFutureStatus.FAILED) {
                exceptions_ ~= f.getException();
              } else {
                results_ ~= f.get();
              }

              if (results_.length + exceptions_.length == futures_.length) {
                finished_ = true;
                completionEvent_.trigger();
              }
            }
          };
        }());
      }
    }

    TFutureStatus status() @property {
      synchronized (resultMutex_) {
        if (!finished_) return TFutureStatus.RUNNING;
        if (status_ != TFutureStatus.RUNNING) return status_;

        try {
          result_ = invokeAccumulator!accumulator(results_, exceptions_);
          status_ = TFutureStatus.SUCCEEDED;
        } catch (Exception e) {
          exception_ = e;
          status_ = TFutureStatus.FAILED;
        }

        return status_;
      }
    }

    TAwaitable completion() @property {
      return completionEvent_;
    }

    AccumulatorResult!(T, accumulator) get() {
      auto s = status;

      enforce(s != TFutureStatus.RUNNING,
        new TFutureException("Operation not yet completed."));

      if (s == TFutureStatus.CANCELLED) throw new TCancelledException;
      if (s == TFutureStatus.FAILED) throw exception_;
      return result_;
    }

    Exception getException() {
      auto s = status;
      enforce(s != TFutureStatus.RUNNING,
        new TFutureException("Operation not yet completed."));

      if (s == TFutureStatus.CANCELLED) throw new TCancelledException;

      if (s == TFutureStatus.SUCCEEDED) {
        return null;
      }
      return exception_;
    }

    void finish() {
      synchronized (resultMutex_) {
        if (!finished_) {
          finished_ = true;
          childCancellation_.trigger();
          completionEvent_.trigger();
        }
      }
    }

    auto finishGet() {
      finish();
      return get();
    }

  private:
    TFuture!T[] futures_;
    TCancellationOrigin childCancellation_;

    bool finished_;
    T[] results_;
    Exception[] exceptions_;

    TFutureStatus status_;
    Mutex resultMutex_;
    union {
      AccumulatorResult!(T, accumulator) result_;
      Exception exception_;
    }
    TOneshotEvent completionEvent_;
  }

  auto invokeAccumulator(alias accumulator, T)(
    T[] values, Exception[] exceptions
  ) if (
    isAccumulator!(T, accumulator)
  ) {
    static if (is(typeof(accumulator(values, exceptions)))) {
      return accumulator(values, exceptions);
    } else {
      return accumulator(values);
    }
  }

  template AccumulatorResult(T, alias acc) {
    alias typeof(invokeAccumulator!acc(cast(T[])[], cast(Exception[])[]))
      AccumulatorResult;
  }
}