123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
module thrift.util.future;

import core.atomic;
import core.sync.condition;
import core.sync.mutex;
import core.time : Duration;
import std.array : empty, front, popFront;
import std.conv : to;
import std.exception : enforce;
import std.traits : BaseTypeTuple, isSomeFunction, ParameterTypeTuple, ReturnType;
import thrift.base;
import thrift.util.awaitable;
import thrift.util.cancellation;

/**
 * Represents an operation which is executed asynchronously and the result of
 * which will become available at some point in the future.
 *
 * Once a operation is completed, the result of the operation can be fetched
 * via the get() family of methods. There are three possible cases: Either the
 * operation succeeded, then its return value is returned, or it failed by
 * throwing, in which case the exception is rethrown, or it was cancelled
 * before, then a TCancelledException is thrown. There might be TFuture
 * implementations which never possibly enter the cancelled state.
 *
 * All methods are thread-safe, but keep in mind that any exception object or
 * result (if it is a reference type, of course) is shared between all
 * get()-family invocations.
 */
interface TFuture(ResultType) {
  /**
   * The status the operation is currently in.
   *
   * An operation starts out in RUNNING status, and changes state to one of the
   * others at most once afterwards.
   */
  TFutureStatus status() @property;

  /**
   * A TAwaitable triggered when the operation leaves the RUNNING status.
   */
  TAwaitable completion() @property;

  /**
   * Convenience shorthand for waiting until the result is available and then
   * get()ing it.
   *
   * If the operation has already completed, the result is immediately
   * returned.
   *
   * The result of this method is »alias this«'d to the interface, so that
   * TFuture can be used as a drop-in replacement for a simple value in
   * synchronous code.
   */
  final ResultType waitGet() {
    completion.wait();
    return get();
  }
  final @property auto waitGetProperty() { return waitGet(); }
  alias waitGetProperty this;

  /**
   * Convenience shorthand for waiting until the result is available and then
   * get()ing it.
   *
   * If the operation completes in time, returns its result (resp. throws an
   * exception for the failed/cancelled cases). If not, throws a
   * TFutureException.
   */
  final ResultType waitGet(Duration timeout) {
    enforce(completion.wait(timeout), new TFutureException(
      "Operation did not complete in time."));
    return get();
  }

  /**
   * Returns the result of the operation.
   *
   * Throws: TFutureException if the operation has been cancelled,
   *   TCancelledException if it is not yet done; the set exception if it
   *   failed.
   */
  ResultType get();

  /**
   * Returns the captured exception if the operation failed, or null otherwise.
   *
   * Throws: TFutureException if not yet done, TCancelledException if the
   *   operation has been cancelled.
   */
  Exception getException();
}

/**
 * The states the operation offering a future interface can be in.
 */
enum TFutureStatus : byte {
  RUNNING, /// The operation is still running.
  SUCCEEDED, /// The operation completed without throwing an exception.
  FAILED, /// The operation completed by throwing an exception.
  CANCELLED /// The operation was cancelled.
}

/**
 * A TFuture covering the simple but common case where the result is simply
 * set by a call to succeed()/fail().
 *
 * All methods are thread-safe, but usually, succeed()/fail() are only called
 * from a single thread (different from the thread(s) waiting for the result
 * using the TFuture interface, though).
 */
class TPromise(ResultType) : TFuture!ResultType {
  this() {
    statusMutex_ = new Mutex;
    completionEvent_ = new TOneshotEvent;
  }

  override S status() const @property {
    return atomicLoad(status_);
  }

  override TAwaitable completion() @property {
    return completionEvent_;
  }

  override ResultType get() {
    auto s = atomicLoad(status_);
    enforce(s != S.RUNNING,
      new TFutureException("Operation not yet completed."));

    if (s == S.CANCELLED) throw new TCancelledException;
    if (s == S.FAILED) throw exception_;

    static if (!is(ResultType == void)) {
      return result_;
    }
  }

  override Exception getException() {
    auto s = atomicLoad(status_);
    enforce(s != S.RUNNING,
      new TFutureException("Operation not yet completed."));

    if (s == S.CANCELLED) throw new TCancelledException;
    if (s == S.SUCCEEDED) return null;

    return exception_;
  }

  static if (!is(ResultType == void)) {
    /**
     * Sets the result of the operation, marks it as done, and notifies any
     * waiters.
     *
     * If the operation has been cancelled before, nothing happens.
     *
     * Throws: TFutureException if the operation is already completed.
     */
    void succeed(ResultType result) {
      synchronized (statusMutex_) {
        auto s = atomicLoad(status_);
        if (s == S.CANCELLED) return;

        enforce(s == S.RUNNING,
          new TFutureException("Operation already completed."));
        result_ = result;

        atomicStore(status_, S.SUCCEEDED);
      }

      completionEvent_.trigger();
    }
  } else {
    void succeed() {
      synchronized (statusMutex_) {
        auto s = atomicLoad(status_);
        if (s == S.CANCELLED) return;

        enforce(s == S.RUNNING,
          new TFutureException("Operation already completed."));

        atomicStore(status_, S.SUCCEEDED);
      }

      completionEvent_.trigger();
    }
  }

  /**
   * Marks the operation as failed with the specified exception and notifies
   * any waiters.
   *
   * If the operation was already cancelled, nothing happens.
   *
   * Throws: TFutureException if the operation is already completed.
   */
  void fail(Exception exception) {
    synchronized (statusMutex_) {
      auto status = atomicLoad(status_);
      if (status == S.CANCELLED) return;

      enforce(status == S.RUNNING,
        new TFutureException("Operation already completed."));
      exception_ = exception;

      atomicStore(status_, S.FAILED);
    }

    completionEvent_.trigger();
  }


  /**
   * Marks this operation as completed and takes over the outcome of another
   * TFuture of the same type.
   *
   * If this operation was already cancelled, nothing happens. If the other
   * operation was cancelled, this operation is marked as failed with a
   * TCancelledException.
   *
   * Throws: TFutureException if the passed in future was not completed or
   *   this operation is already completed.
   */
  void complete(TFuture!ResultType future) {
    synchronized (statusMutex_) {
      auto status = atomicLoad(status_);
      if (status == S.CANCELLED) return;
      enforce(status == S.RUNNING,
        new TFutureException("Operation already completed."));

      enforce(future.status != S.RUNNING, new TFutureException(
        "The passed TFuture is not yet completed."));

      status = future.status;
      if (status == S.CANCELLED) {
        status = S.FAILED;
        exception_ = new TCancelledException;
      } else if (status == S.FAILED) {
        exception_ = future.getException();
      } else static if (!is(ResultType == void)) {
        result_ = future.get();
      }

      atomicStore(status_, status);
    }

    completionEvent_.trigger();
  }

  /**
   * Marks this operation as cancelled and notifies any waiters.
   *
   * If the operation is already completed, nothing happens.
   */
  void cancel() {
    synchronized (statusMutex_) {
      auto status = atomicLoad(status_);
      if (status == S.RUNNING) atomicStore(status_, S.CANCELLED);
    }

    completionEvent_.trigger();
  }

private:
  // Convenience alias because TFutureStatus is ubiquitous in this class.
  alias TFutureStatus S;

  // The status the promise is currently in.
  shared S status_;

  union {
    static if (!is(ResultType == void)) {
      // Set if status_ is SUCCEEDED.
      ResultType result_;
    }
    // Set if status_ is FAILED.
    Exception exception_;
  }

  // Protects status_.
  // As for result_ and exception_: They are only set once, while status_ is
  // still RUNNING, so given that the operation has already completed, reading
  // them is safe without holding some kind of lock.
  Mutex statusMutex_;

  // Triggered when the event completes.
  TOneshotEvent completionEvent_;
}

///
class TFutureException : TException {
  ///
  this(string msg = "", string file = __FILE__, size_t line = __LINE__,
    Throwable next = null)
  {
    super(msg, file, line, next);
  }
}

/**
 * Creates an interface that is similiar to a given one, but accepts an
 * additional, optional TCancellation parameter each method, and returns
 * TFutures instead of plain return values.
 *
 * For example, given the following declarations:
 * ---
 * interface Foo {
 *   void bar();
 *   string baz(int a);
 * }
 * alias TFutureInterface!Foo FutureFoo;
 * ---
 *
 * FutureFoo would be equivalent to:
 * ---
 * interface FutureFoo {
 *   TFuture!void bar(TCancellation cancellation = null);
 *   TFuture!string baz(int a, TCancellation cancellation = null);
 * }
 * ---
 */
template TFutureInterface(Interface) if (is(Interface _ == interface)) {
  mixin({
    string code = "interface TFutureInterface \n";

    static if (is(Interface Bases == super) && Bases.length > 0) {
      code ~= ": ";
      foreach (i; 0 .. Bases.length) {
        if (i > 0) code ~= ", ";
        code ~= "TFutureInterface!(BaseTypeTuple!Interface[" ~ to!string(i) ~ "]) ";
      }
    }

    code ~= "{\n";

    foreach (methodName; __traits(derivedMembers, Interface)) {
      enum qn = "Interface." ~ methodName;
      static if (isSomeFunction!(mixin(qn))) {
        code ~= "TFuture!(ReturnType!(" ~ qn ~ ")) " ~ methodName ~
          "(ParameterTypeTuple!(" ~ qn ~ "), TCancellation cancellation = null);\n";
      }
    }

    code ~= "}\n";
    return code;
  }());
}

/**
 * An input range that aggregates results from multiple asynchronous operations,
 * returning them in the order they arrive.
 *
 * Additionally, a timeout can be set after which results from not yet finished
 * futures will no longer be waited for, e.g. to ensure the time it takes to
 * iterate over a set of results is limited.
 */
final class TFutureAggregatorRange(T) {
  /**
   * Constructs a new instance.
   *
   * Params:
   *   futures = The set of futures to collect results from.
   *   timeout = If positive, not yet finished futures will be cancelled and
   *     their results will not be taken into account.
   */
  this(TFuture!T[] futures, TCancellationOrigin childCancellation,
    Duration timeout = dur!"hnsecs"(0)
  ) {
    if (timeout > dur!"hnsecs"(0)) {
      timeoutSysTick_ = TickDuration.currSystemTick +
        TickDuration.from!"hnsecs"(timeout.total!"hnsecs");
    } else {
      timeoutSysTick_ = TickDuration(0);
    }

    queueMutex_ = new Mutex;
    queueNonEmptyCondition_ = new Condition(queueMutex_);
    futures_ = futures;
    childCancellation_ = childCancellation;

    foreach (future; futures_) {
      future.completion.addCallback({
        auto f = future;
        return {
          if (f.status == TFutureStatus.CANCELLED) return;
          assert(f.status != TFutureStatus.RUNNING);

          synchronized (queueMutex_) {
            completedQueue_ ~= f;

            if (completedQueue_.length == 1) {
              queueNonEmptyCondition_.notifyAll();
            }
          }
        };
      }());
    }
  }

  /**
   * Whether the range is empty.
   *
   * This is the case if the results from the completed futures not having
   * failed have already been popped and either all future have been finished
   * or the timeout has expired.
   *
   * Potentially blocks until a new result is available or the timeout has
   * expired.
   */
  bool empty() @property {
    if (finished_) return true;
    if (bufferFilled_) return false;

    while (true) {
      TFuture!T future;
      synchronized (queueMutex_) {
        // The while loop is just being cautious about spurious wakeups, in
        // case they should be possible.
        while (completedQueue_.empty) {
          auto remaining = to!Duration(timeoutSysTick_ -
            TickDuration.currSystemTick);

          if (remaining <= dur!"hnsecs"(0)) {
            // No time left, but still no element received – we are empty now.
            finished_ = true;
            childCancellation_.trigger();
            return true;
          }

          queueNonEmptyCondition_.wait(remaining);
        }

        future = completedQueue_.front;
        completedQueue_.popFront();
      }

      ++completedCount_;
      if (completedCount_ == futures_.length) {
        // This was the last future in the list, there is no possiblity
        // another result could ever become available.
        finished_ = true;
      }

      if (future.status == TFutureStatus.FAILED) {
        // This one failed, loop again and try getting another item from
        // the queue.
        exceptions_ ~= future.getException();
      } else {
        resultBuffer_ = future.get();
        bufferFilled_ = true;
        return false;
      }
    }
  }

  /**
   * Returns the first element from the range.
   *
   * Potentially blocks until a new result is available or the timeout has
   * expired.
   *
   * Throws: TException if the range is empty.
   */
  T front() {
    enforce(!empty, new TException(
      "Cannot get front of an empty future aggregator range."));
    return resultBuffer_;
  }

  /**
   * Removes the first element from the range.
   *
   * Potentially blocks until a new result is available or the timeout has
   * expired.
   *
   * Throws: TException if the range is empty.
   */
  void popFront() {
    enforce(!empty, new TException(
      "Cannot pop front of an empty future aggregator range."));
    bufferFilled_ = false;
  }

  /**
   * The number of futures the result of which has been returned or which have
   * failed so far.
   */
  size_t completedCount() @property const {
    return completedCount_;
  }

  /**
   * The exceptions collected from failed TFutures so far.
   */
  Exception[] exceptions() @property {
    return exceptions_;
  }

private:
  TFuture!T[] futures_;
  TCancellationOrigin childCancellation_;

  // The system tick this operation will time out, or zero if no timeout has
  // been set.
  TickDuration timeoutSysTick_;

  bool finished_;

  bool bufferFilled_;
  T resultBuffer_;

  Exception[] exceptions_;
  size_t completedCount_;

  // The queue of completed futures. This (and the associated condition) are
  // the only parts of this class that are accessed by multiple threads.
  TFuture!T[] completedQueue_;
  Mutex queueMutex_;
  Condition queueNonEmptyCondition_;
}

/**
 * TFutureAggregatorRange construction helper to avoid having to explicitly
 * specify the value type, i.e. to allow the constructor being called using IFTI
 * (see $(DMDBUG 6082, D Bugzilla enhancement requet 6082)).
 */
TFutureAggregatorRange!T tFutureAggregatorRange(T)(TFuture!T[] futures,
  TCancellationOrigin childCancellation, Duration timeout = dur!"hnsecs"(0)
) {
  return new TFutureAggregatorRange!T(futures, childCancellation, timeout);
}