| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550 | /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ module thrift.util.future; import core.atomic; import core.sync.condition; import core.sync.mutex; import core.time : Duration; import std.array : empty, front, popFront; import std.conv : to; import std.exception : enforce; import std.traits : BaseTypeTuple, isSomeFunction, ParameterTypeTuple, ReturnType; import thrift.base; import thrift.util.awaitable; import thrift.util.cancellation; /** * Represents an operation which is executed asynchronously and the result of * which will become available at some point in the future. * * Once a operation is completed, the result of the operation can be fetched * via the get() family of methods. There are three possible cases: Either the * operation succeeded, then its return value is returned, or it failed by * throwing, in which case the exception is rethrown, or it was cancelled * before, then a TCancelledException is thrown. There might be TFuture * implementations which never possibly enter the cancelled state. * * All methods are thread-safe, but keep in mind that any exception object or * result (if it is a reference type, of course) is shared between all * get()-family invocations. */ interface TFuture(ResultType) { /** * The status the operation is currently in. * * An operation starts out in RUNNING status, and changes state to one of the * others at most once afterwards. */ TFutureStatus status() @property; /** * A TAwaitable triggered when the operation leaves the RUNNING status. */ TAwaitable completion() @property; /** * Convenience shorthand for waiting until the result is available and then * get()ing it. * * If the operation has already completed, the result is immediately * returned. * * The result of this method is »alias this«'d to the interface, so that * TFuture can be used as a drop-in replacement for a simple value in * synchronous code. */ final ResultType waitGet() { completion.wait(); return get(); } final @property auto waitGetProperty() { return waitGet(); } alias waitGetProperty this; /** * Convenience shorthand for waiting until the result is available and then * get()ing it. * * If the operation completes in time, returns its result (resp. throws an * exception for the failed/cancelled cases). If not, throws a * TFutureException. */ final ResultType waitGet(Duration timeout) { enforce(completion.wait(timeout), new TFutureException( "Operation did not complete in time.")); return get(); } /** * Returns the result of the operation. * * Throws: TFutureException if the operation has been cancelled, * TCancelledException if it is not yet done; the set exception if it * failed. */ ResultType get(); /** * Returns the captured exception if the operation failed, or null otherwise. * * Throws: TFutureException if not yet done, TCancelledException if the * operation has been cancelled. */ Exception getException(); } /** * The states the operation offering a future interface can be in. */ enum TFutureStatus : byte { RUNNING, /// The operation is still running. SUCCEEDED, /// The operation completed without throwing an exception. FAILED, /// The operation completed by throwing an exception. CANCELLED /// The operation was cancelled. } /** * A TFuture covering the simple but common case where the result is simply * set by a call to succeed()/fail(). * * All methods are thread-safe, but usually, succeed()/fail() are only called * from a single thread (different from the thread(s) waiting for the result * using the TFuture interface, though). */ class TPromise(ResultType) : TFuture!ResultType { this() { statusMutex_ = new Mutex; completionEvent_ = new TOneshotEvent; } override S status() const @property { return atomicLoad(status_); } override TAwaitable completion() @property { return completionEvent_; } override ResultType get() { auto s = atomicLoad(status_); enforce(s != S.RUNNING, new TFutureException("Operation not yet completed.")); if (s == S.CANCELLED) throw new TCancelledException; if (s == S.FAILED) throw exception_; static if (!is(ResultType == void)) { return result_; } } override Exception getException() { auto s = atomicLoad(status_); enforce(s != S.RUNNING, new TFutureException("Operation not yet completed.")); if (s == S.CANCELLED) throw new TCancelledException; if (s == S.SUCCEEDED) return null; return exception_; } static if (!is(ResultType == void)) { /** * Sets the result of the operation, marks it as done, and notifies any * waiters. * * If the operation has been cancelled before, nothing happens. * * Throws: TFutureException if the operation is already completed. */ void succeed(ResultType result) { synchronized (statusMutex_) { auto s = atomicLoad(status_); if (s == S.CANCELLED) return; enforce(s == S.RUNNING, new TFutureException("Operation already completed.")); result_ = result; atomicStore(status_, S.SUCCEEDED); } completionEvent_.trigger(); } } else { void succeed() { synchronized (statusMutex_) { auto s = atomicLoad(status_); if (s == S.CANCELLED) return; enforce(s == S.RUNNING, new TFutureException("Operation already completed.")); atomicStore(status_, S.SUCCEEDED); } completionEvent_.trigger(); } } /** * Marks the operation as failed with the specified exception and notifies * any waiters. * * If the operation was already cancelled, nothing happens. * * Throws: TFutureException if the operation is already completed. */ void fail(Exception exception) { synchronized (statusMutex_) { auto status = atomicLoad(status_); if (status == S.CANCELLED) return; enforce(status == S.RUNNING, new TFutureException("Operation already completed.")); exception_ = exception; atomicStore(status_, S.FAILED); } completionEvent_.trigger(); } /** * Marks this operation as completed and takes over the outcome of another * TFuture of the same type. * * If this operation was already cancelled, nothing happens. If the other * operation was cancelled, this operation is marked as failed with a * TCancelledException. * * Throws: TFutureException if the passed in future was not completed or * this operation is already completed. */ void complete(TFuture!ResultType future) { synchronized (statusMutex_) { auto status = atomicLoad(status_); if (status == S.CANCELLED) return; enforce(status == S.RUNNING, new TFutureException("Operation already completed.")); enforce(future.status != S.RUNNING, new TFutureException( "The passed TFuture is not yet completed.")); status = future.status; if (status == S.CANCELLED) { status = S.FAILED; exception_ = new TCancelledException; } else if (status == S.FAILED) { exception_ = future.getException(); } else static if (!is(ResultType == void)) { result_ = future.get(); } atomicStore(status_, status); } completionEvent_.trigger(); } /** * Marks this operation as cancelled and notifies any waiters. * * If the operation is already completed, nothing happens. */ void cancel() { synchronized (statusMutex_) { auto status = atomicLoad(status_); if (status == S.RUNNING) atomicStore(status_, S.CANCELLED); } completionEvent_.trigger(); } private: // Convenience alias because TFutureStatus is ubiquitous in this class. alias TFutureStatus S; // The status the promise is currently in. shared S status_; union { static if (!is(ResultType == void)) { // Set if status_ is SUCCEEDED. ResultType result_; } // Set if status_ is FAILED. Exception exception_; } // Protects status_. // As for result_ and exception_: They are only set once, while status_ is // still RUNNING, so given that the operation has already completed, reading // them is safe without holding some kind of lock. Mutex statusMutex_; // Triggered when the event completes. TOneshotEvent completionEvent_; } /// class TFutureException : TException { /// this(string msg = "", string file = __FILE__, size_t line = __LINE__, Throwable next = null) { super(msg, file, line, next); } } /** * Creates an interface that is similiar to a given one, but accepts an * additional, optional TCancellation parameter each method, and returns * TFutures instead of plain return values. * * For example, given the following declarations: * --- * interface Foo { * void bar(); * string baz(int a); * } * alias TFutureInterface!Foo FutureFoo; * --- * * FutureFoo would be equivalent to: * --- * interface FutureFoo { * TFuture!void bar(TCancellation cancellation = null); * TFuture!string baz(int a, TCancellation cancellation = null); * } * --- */ template TFutureInterface(Interface) if (is(Interface _ == interface)) { mixin({ string code = "interface TFutureInterface \n"; static if (is(Interface Bases == super) && Bases.length > 0) { code ~= ": "; foreach (i; 0 .. Bases.length) { if (i > 0) code ~= ", "; code ~= "TFutureInterface!(BaseTypeTuple!Interface[" ~ to!string(i) ~ "]) "; } } code ~= "{\n"; foreach (methodName; __traits(derivedMembers, Interface)) { enum qn = "Interface." ~ methodName; static if (isSomeFunction!(mixin(qn))) { code ~= "TFuture!(ReturnType!(" ~ qn ~ ")) " ~ methodName ~ "(ParameterTypeTuple!(" ~ qn ~ "), TCancellation cancellation = null);\n"; } } code ~= "}\n"; return code; }()); } /** * An input range that aggregates results from multiple asynchronous operations, * returning them in the order they arrive. * * Additionally, a timeout can be set after which results from not yet finished * futures will no longer be waited for, e.g. to ensure the time it takes to * iterate over a set of results is limited. */ final class TFutureAggregatorRange(T) { /** * Constructs a new instance. * * Params: * futures = The set of futures to collect results from. * timeout = If positive, not yet finished futures will be cancelled and * their results will not be taken into account. */ this(TFuture!T[] futures, TCancellationOrigin childCancellation, Duration timeout = dur!"hnsecs"(0) ) { if (timeout > dur!"hnsecs"(0)) { timeoutSysTick_ = TickDuration.currSystemTick + TickDuration.from!"hnsecs"(timeout.total!"hnsecs"); } else { timeoutSysTick_ = TickDuration(0); } queueMutex_ = new Mutex; queueNonEmptyCondition_ = new Condition(queueMutex_); futures_ = futures; childCancellation_ = childCancellation; foreach (future; futures_) { future.completion.addCallback({ auto f = future; return { if (f.status == TFutureStatus.CANCELLED) return; assert(f.status != TFutureStatus.RUNNING); synchronized (queueMutex_) { completedQueue_ ~= f; if (completedQueue_.length == 1) { queueNonEmptyCondition_.notifyAll(); } } }; }()); } } /** * Whether the range is empty. * * This is the case if the results from the completed futures not having * failed have already been popped and either all future have been finished * or the timeout has expired. * * Potentially blocks until a new result is available or the timeout has * expired. */ bool empty() @property { if (finished_) return true; if (bufferFilled_) return false; while (true) { TFuture!T future; synchronized (queueMutex_) { // The while loop is just being cautious about spurious wakeups, in // case they should be possible. while (completedQueue_.empty) { auto remaining = to!Duration(timeoutSysTick_ - TickDuration.currSystemTick); if (remaining <= dur!"hnsecs"(0)) { // No time left, but still no element received – we are empty now. finished_ = true; childCancellation_.trigger(); return true; } queueNonEmptyCondition_.wait(remaining); } future = completedQueue_.front; completedQueue_.popFront(); } ++completedCount_; if (completedCount_ == futures_.length) { // This was the last future in the list, there is no possiblity // another result could ever become available. finished_ = true; } if (future.status == TFutureStatus.FAILED) { // This one failed, loop again and try getting another item from // the queue. exceptions_ ~= future.getException(); } else { resultBuffer_ = future.get(); bufferFilled_ = true; return false; } } } /** * Returns the first element from the range. * * Potentially blocks until a new result is available or the timeout has * expired. * * Throws: TException if the range is empty. */ T front() { enforce(!empty, new TException( "Cannot get front of an empty future aggregator range.")); return resultBuffer_; } /** * Removes the first element from the range. * * Potentially blocks until a new result is available or the timeout has * expired. * * Throws: TException if the range is empty. */ void popFront() { enforce(!empty, new TException( "Cannot pop front of an empty future aggregator range.")); bufferFilled_ = false; } /** * The number of futures the result of which has been returned or which have * failed so far. */ size_t completedCount() @property const { return completedCount_; } /** * The exceptions collected from failed TFutures so far. */ Exception[] exceptions() @property { return exceptions_; } private: TFuture!T[] futures_; TCancellationOrigin childCancellation_; // The system tick this operation will time out, or zero if no timeout has // been set. TickDuration timeoutSysTick_; bool finished_; bool bufferFilled_; T resultBuffer_; Exception[] exceptions_; size_t completedCount_; // The queue of completed futures. This (and the associated condition) are // the only parts of this class that are accessed by multiple threads. TFuture!T[] completedQueue_; Mutex queueMutex_; Condition queueNonEmptyCondition_; } /** * TFutureAggregatorRange construction helper to avoid having to explicitly * specify the value type, i.e. to allow the constructor being called using IFTI * (see $(DMDBUG 6082, D Bugzilla enhancement requet 6082)). */ TFutureAggregatorRange!T tFutureAggregatorRange(T)(TFuture!T[] futures, TCancellationOrigin childCancellation, Duration timeout = dur!"hnsecs"(0) ) { return new TFutureAggregatorRange!T(futures, childCancellation, timeout); } |