|| /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ /** * Utilities for asynchronously querying multiple servers, building on * TAsyncClient. * * Terminology note: The names of the artifacts defined in this module are * derived from »client pool«, because they operate on a pool of * TAsyncClients. However, from a architectural point of view, they often * represent a pool of hosts a Thrift client application communicates with * using RPC calls. */ module thrift.codegen.async_client_pool; import core.sync.mutex; import core.time : Duration, dur; import std.algorithm : map; import std.array : array, empty; import std.exception : enforce; import std.traits : ParameterTypeTuple, ReturnType; import thrift.base; import thrift.codegen.base; import thrift.codegen.async_client; import thrift.internal.algorithm; import thrift.internal.codegen; import thrift.util.awaitable; import thrift.util.cancellation; import thrift.util.future; import thrift.internal.resource_pool; /** * Represents a generic client pool which implements TFutureInterface!Interface * using multiple TAsyncClients. */ interface TAsyncClientPoolBase(Interface) if (isService!Interface) : TFutureInterface!Interface { /// Shorthand for the client type this pool operates on. alias TAsyncClientBase!Interface Client; /** * Adds a client to the pool. */ void addClient(Client client); /** * Removes a client from the pool. * * Returns: Whether the client was found in the pool. */ bool removeClient(Client client); /** * Called to determine whether an exception comes from a client from the * pool not working properly, or if it an exception thrown at the * application level. * * If the delegate returns true, the server/connection is considered to be * at fault, if it returns false, the exception is just passed on to the * caller. * * By default, returns true for instances of TTransportException and * TApplicationException, false otherwise. */ bool delegate(Exception) rpcFaultFilter() const @property; void rpcFaultFilter(bool delegate(Exception)) @property; /// Ditto /** * Whether to open the underlying transports of a client before trying to * execute a method if they are not open. This is usually desirable * because it allows e.g. to automatically reconnect to a remote server * if the network connection is dropped. * * Defaults to true. */ bool reopenTransports() const @property; void reopenTransports(bool) @property; /// Ditto } immutable bool delegate(Exception) defaultRpcFaultFilter; static this() { defaultRpcFaultFilter = (Exception e) { import thrift.protocol.base; import thrift.transport.base; return ( (cast(TTransportException)e !is null) || (cast(TApplicationException)e !is null) ); }; } /** * A TAsyncClientPoolBase implementation which queries multiple servers in a * row until a request succeeds, the result of which is then returned. * * The definition of »success« can be customized using the rpcFaultFilter() * delegate property. If it is non-null and calling it for an exception set by * a failed method invocation returns true, the error is considered to be * caused by the RPC layer rather than the application layer, and the next * server in the pool is tried. If there are no more clients to try, the * operation is marked as failed with a TCompoundOperationException. * * If a TAsyncClient in the pool fails with an RPC exception for a number of * consecutive tries, it is temporarily disabled (not tried any longer) for * a certain duration. Both the limit and the timeout can be configured. If all * clients fail (and keepTrying is false), the operation fails with a * TCompoundOperationException which contains the collected RPC exceptions. */ final class TAsyncClientPool(Interface) if (isService!Interface) : TAsyncClientPoolBase!Interface { /// this(Client[] clients) { pool_ = new TResourcePool!Client(clients); rpcFaultFilter_ = defaultRpcFaultFilter; reopenTransports_ = true; } /+override+/ void addClient(Client client) { pool_.add(client); } /+override+/ bool removeClient(Client client) { return pool_.remove(client); } /** * Whether to keep trying to find a working client if all have failed in a * row. * * Defaults to false. */ bool keepTrying() const @property { return pool_.cycle; } /// Ditto void keepTrying(bool value) @property { pool_.cycle = value; } /** * Whether to use a random permutation of the client pool on every call to * execute(). This can be used e.g. as a simple form of load balancing. * * Defaults to true. */ bool permuteClients() const @property { return pool_.permute; } /// Ditto void permuteClients(bool value) @property { pool_.permute = value; } /** * The number of consecutive faults after which a client is disabled until * faultDisableDuration has passed. 0 to never disable clients. * * Defaults to 0. */ ushort faultDisableCount() const @property { return pool_.faultDisableCount; } /// Ditto void faultDisableCount(ushort value) @property { pool_.faultDisableCount = value; } /** * The duration for which a client is no longer considered after it has * failed too often. * * Defaults to one second. */ Duration faultDisableDuration() const @property { return pool_.faultDisableDuration; } /// Ditto void faultDisableDuration(Duration value) @property { pool_.faultDisableDuration = value; } /+override+/ bool delegate(Exception) rpcFaultFilter() const @property { return rpcFaultFilter_; } /+override+/ void rpcFaultFilter(bool delegate(Exception) value) @property { rpcFaultFilter_ = value; } /+override+/ bool reopenTransports() const @property { return reopenTransports_; } /+override+/ void reopenTransports(bool value) @property { reopenTransports_ = value; } mixin(fallbackPoolForwardCode!Interface()); protected: // The actual worker implementation to which RPC method calls are forwarded. auto executeOnPool(string method, Args...)(Args args, TCancellation cancellation ) { auto clients = pool_[]; if (clients.empty) { throw new TException("No clients available to try."); } auto promise = new TPromise!(ReturnType!(MemberType!(Interface, method))); Exception[] rpcExceptions; void tryNext() { while (clients.empty) { Client next; Duration waitTime; if (clients.willBecomeNonempty(next, waitTime)) { if (waitTime > dur!"hnsecs"(0)) { if (waitTime < dur!"usecs"(10)) { import core.thread; Thread.sleep(waitTime); } else { next.transport.asyncManager.delay(waitTime, { tryNext(); }); return; } } } else { promise.fail(new TCompoundOperationException("All clients failed.", rpcExceptions)); return; } } auto client = clients.front; clients.popFront; if (reopenTransports) { if (!client.transport.isOpen) { try { client.transport.open(); } catch (Exception e) { pool_.recordFault(client); tryNext(); return; } } } auto future = mixin("client." ~ method)(args, cancellation); future.completion.addCallback({ if (future.status == TFutureStatus.CANCELLED) { promise.cancel(); return; } auto e = future.getException(); if (e) { if (rpcFaultFilter_ && rpcFaultFilter_(e)) { pool_.recordFault(client); rpcExceptions ~= e; tryNext(); return; } } pool_.recordSuccess(client); promise.complete(future); }); } tryNext(); return promise; } private: TResourcePool!Client pool_; bool delegate(Exception) rpcFaultFilter_; bool reopenTransports_; } /** * TAsyncClientPool construction helper to avoid having to explicitly * specify the interface type, i.e. to allow the constructor being called * using IFTI (see $(DMDBUG 6082, D Bugzilla enhancement request 6082)). */ TAsyncClientPool!Interface tAsyncClientPool(Interface)( TAsyncClientBase!Interface[] clients ) if (isService!Interface) { return new typeof(return)(clients); } private { // Cannot use an anonymous delegate literal for this because they aren't // allowed in class scope. string fallbackPoolForwardCode(Interface)() { string code = ""; foreach (methodName; AllMemberMethodNames!Interface) { enum qn = "Interface." ~ methodName; code ~= "TFuture!(ReturnType!(" ~ qn ~ ")) " ~ methodName ~ "(ParameterTypeTuple!(" ~ qn ~ ") args, TCancellation cancellation = null) {\n"; code ~= "return executeOnPool!(`" ~ methodName ~ "`)(args, cancellation);\n"; code ~= "}\n"; } return code; } } /** * A TAsyncClientPoolBase implementation which queries multiple servers at * the same time and returns the first success response. * * The definition of »success« can be customized using the rpcFaultFilter() * delegate property. If it is non-null and calling it for an exception set by * a failed method invocation returns true, the error is considered to be * caused by the RPC layer rather than the application layer, and the next * server in the pool is tried. If all clients fail, the operation is marked * as failed with a TCompoundOperationException. */ final class TAsyncFastestClientPool(Interface) if (isService!Interface) : TAsyncClientPoolBase!Interface { /// this(Client[] clients) { clients_ = clients; rpcFaultFilter_ = defaultRpcFaultFilter; reopenTransports_ = true; } /+override+/ void addClient(Client client) { clients_ ~= client; } /+override+/ bool removeClient(Client client) { auto oldLength = clients_.length; clients_ = removeEqual(clients_, client); return clients_.length < oldLength; } /+override+/ bool delegate(Exception) rpcFaultFilter() const @property { return rpcFaultFilter_; } /+override+/ void rpcFaultFilter(bool delegate(Exception) value) @property { rpcFaultFilter_ = value; } /+override+/bool reopenTransports() const @property { return reopenTransports_; } /+override+/ void reopenTransports(bool value) @property { reopenTransports_ = value; } mixin(fastestPoolForwardCode!Interface()); private: Client[] clients_; bool delegate(Exception) rpcFaultFilter_; bool reopenTransports_; } /** * TAsyncFastestClientPool construction helper to avoid having to explicitly * specify the interface type, i.e. to allow the constructor being called * using IFTI (see $(DMDBUG 6082, D Bugzilla enhancement request 6082)). */ TAsyncFastestClientPool!Interface tAsyncFastestClientPool(Interface)( TAsyncClientBase!Interface[] clients ) if (isService!Interface) { return new typeof(return)(clients); } private { // Cannot use an anonymous delegate literal for this because they aren't // allowed in class scope. string fastestPoolForwardCode(Interface)() { string code = ""; foreach (methodName; AllMemberMethodNames!Interface) { enum qn = "Interface." ~ methodName; code ~= "TFuture!(ReturnType!(" ~ qn ~ ")) " ~ methodName ~ "(ParameterTypeTuple!(" ~ qn ~ ") args, " ~ "TCancellation cancellation = null) {\n"; code ~= "enum methodName = `" ~ methodName ~ "`;\n"; code ~= q{ alias ReturnType!(MemberType!(Interface, methodName)) ResultType; auto childCancellation = new TCancellationOrigin; TFuture!ResultType[] futures; futures.reserve(clients_.length); foreach (c; clients_) { if (reopenTransports) { if (!c.transport.isOpen) { try { c.transport.open(); } catch (Exception e) { continue; } } } futures ~= mixin("c." ~ methodName)(args, childCancellation); } return new FastestPoolJob!(ResultType)( futures, rpcFaultFilter, cancellation, childCancellation); }; code ~= "}\n"; } return code; } final class FastestPoolJob(Result) : TFuture!Result { this(TFuture!Result[] poolFutures, bool delegate(Exception) rpcFaultFilter, TCancellation cancellation, TCancellationOrigin childCancellation ) { resultPromise_ = new TPromise!Result; poolFutures_ = poolFutures; rpcFaultFilter_ = rpcFaultFilter; childCancellation_ = childCancellation; foreach (future; poolFutures) { future.completion.addCallback({ auto f = future; return { completionCallback(f); }; }()); if (future.status != TFutureStatus.RUNNING) { // If the current future is already completed, we are done, don't // bother adding callbacks for the others (they would just return // immediately after acquiring the lock). return; } } if (cancellation) { cancellation.triggering.addCallback({ resultPromise_.cancel(); childCancellation.trigger(); }); } } TFutureStatus status() const @property { return resultPromise_.status; } TAwaitable completion() @property { return resultPromise_.completion; } Result get() { return resultPromise_.get(); } Exception getException() { return resultPromise_.getException(); } private: void completionCallback(TFuture!Result future) { synchronized { if (future.status == TFutureStatus.CANCELLED) { assert(resultPromise_.status != TFutureStatus.RUNNING); return; } if (resultPromise_.status != TFutureStatus.RUNNING) { // The operation has already been completed. This can happen if // another client completed first, but this callback was already // waiting for the lock when it called cancel(). return; } if (future.status == TFutureStatus.FAILED) { auto e = future.getException(); if (rpcFaultFilter_ && rpcFaultFilter_(e)) { rpcExceptions_ ~= e; if (rpcExceptions_.length == poolFutures_.length) { resultPromise_.fail(new TCompoundOperationException( "All child operations failed, unable to retrieve a result.", rpcExceptions_ )); } return; } } // Store the result to the target promise. resultPromise_.complete(future); // Cancel the other futures, we would just discard their results. // Note: We do this after we have stored the results to our promise, // see the assert at the top of the function. childCancellation_.trigger(); } } TPromise!Result resultPromise_; TFuture!Result[] poolFutures_; Exception[] rpcExceptions_; bool delegate(Exception) rpcFaultFilter_; TCancellationOrigin childCancellation_; } } /** * Allows easily aggregating results from a number of TAsyncClients. * * Contrary to TAsync{Fallback, Fastest}ClientPool, this class does not * simply implement TFutureInterface!Interface. It manages a pool of clients, * but allows the user to specify a custom accumulator function to use or to * iterate over the results using a TFutureAggregatorRange. * * For each service method, TAsyncAggregator offers a method * accepting the same arguments, and an optional TCancellation instance, just * like with TFutureInterface. The return type, however, is a proxy object * that offers the following methods: * --- * /++ * + Returns a thrift.util.future.TFutureAggregatorRange for the results of * + the client pool method invocations. * + * + The [] (slicing) operator can also be used to obtain the range. * + * + Params: * + timeout = A timeout to pass to the TFutureAggregatorRange constructor, * + defaults to zero (no timeout). * +/ * TFutureAggregatorRange!ReturnType range(Duration timeout = dur!"hnsecs"(0)); * auto opSlice() { return range(); } /// Ditto * * /++ * + Returns a future that gathers the results from the clients in the pool * + and invokes a user-supplied accumulator function on them, returning its * + return value to the client. * + * + In addition to the TFuture!AccumulatedType interface (where * + AccumulatedType is the return type of the accumulator function), the * + returned object also offers two additional methods, finish() and * + finishGet(): By default, the accumulator functions is called after all * + the results from the pool clients have become available. Calling finish() * + causes the accumulator future to stop waiting for other results and * + immediately invoking the accumulator function on the results currently * + available. If all results are already available, finish() is a no-op. * + finishGet() is a convenience shortcut for combining it with * + a call to get() immediately afterwards, like waitGet() is for wait(). * + * + The acc alias can point to any callable accepting either an array of * + return values or an array of return values and an array of exceptions; * + see isAccumulator!() for details. The default accumulator concatenates * + return values that can be concatenated with each others (e.g. arrays), * + and simply returns an array of values otherwise, failing with a * + TCompoundOperationException no values were returned. * + * + The accumulator function is not executed in any of the async manager * + worker threads associated with the async clients, but instead it is * + invoked when the actual result is requested for the first time after the * + operation has been completed. This also includes checking the status * + of the operation once it is no longer running, since the accumulator * + has to be run to determine whether the operation succeeded or failed. * +/ * auto accumulate(alias acc = defaultAccumulator)() if (isAccumulator!acc); * --- * * Example: * --- * // Some Thrift service. * interface Foo { * int foo(string name); * byte[] bar(); * } * * // Create the aggregator pool – client0, client1, client2 are some * // TAsyncClient!Foo instances, but in theory could also be other * // TFutureInterface!Foo implementations (e.g. some async client pool). * auto pool = new TAsyncAggregator!Foo([client0, client1, client2]); * * foreach (val; pool.foo("baz").range(dur!"seconds"(1))) { * // Process all the results that are available before a second has passed, * // in the order they arrive. * writeln(val); * } * * auto sumRoots = pool.foo("baz").accumulate!((int[] vals, Exceptions[] exs){ * if (vals.empty) { * throw new TCompoundOperationException("All clients failed", exs); * } * * // Just to illustrate that the type of the values can change, convert the * // numbers to double and sum up their roots. * double result = 0; * foreach (v; vals) result += sqrt(cast(double)v); * return result; * })(); * * // Wait up to three seconds for the result, and then accumulate what has * // arrived so far. * sumRoots.completion.wait(dur!"seconds"(3)); * writeln(sumRoots.finishGet()); * * // For scalars, the default accumulator returns an array of the values. * pragma(msg, typeof(pool.foo("").accumulate().get()); // int[]. * * // For lists, etc., it concatenates the results together. * pragma(msg, typeof(pool.bar().accumulate().get())); // byte[]. * --- * * Note: For the accumulate!() interface, you might currently hit a »cannot use * local '…' as parameter to non-global template accumulate«-error, see * $(DMDBUG 5710, DMD issue 5710). If your accumulator function does not need * to access the surrounding scope, you might want to use a function literal * instead of a delegate to avoid the issue. */ class TAsyncAggregator(Interface) if (isBaseService!Interface) { /// Shorthand for the client type this instance operates on. alias TAsyncClientBase!Interface Client; /// this(Client[] clients) { clients_ = clients; } /// Whether to open the underlying transports of a client before trying to /// execute a method if they are not open. This is usually desirable /// because it allows e.g. to automatically reconnect to a remote server /// if the network connection is dropped. /// /// Defaults to true. bool reopenTransports = true; mixin AggregatorOpDispatch!(); private: Client[] clients_; } /// Ditto class TAsyncAggregator(Interface) if (isDerivedService!Interface) : TAsyncAggregator!(BaseService!Interface) { /// Shorthand for the client type this instance operates on. alias TAsyncClientBase!Interface Client; /// this(Client[] clients) { super(cast(TAsyncClientBase!(BaseService!Interface)[])clients); } mixin AggregatorOpDispatch!(); } /** * Whether fun is a valid accumulator function for values of type ValueType. * * For this to be true, fun must be a callable matching one of the following * argument lists: * --- * fun(ValueType[] values); * fun(ValueType[] values, Exception[] exceptions); * --- * * The second version is passed the collected array exceptions from all the * clients in the pool. * * The return value of the accumulator function is passed to the client (via * the result future). If it throws an exception, the operation is marked as * failed with the given exception instead. */ template isAccumulator(ValueType, alias fun) { enum isAccumulator = is(typeof(fun(cast(ValueType[])[]))) || is(typeof(fun(cast(ValueType[])[], cast(Exception[])[]))); } /** * TAsyncAggregator construction helper to avoid having to explicitly * specify the interface type, i.e. to allow the constructor being called * using IFTI (see $(DMDBUG 6082, D Bugzilla enhancement request 6082)). */ TAsyncAggregator!Interface tAsyncAggregator(Interface)( TAsyncClientBase!Interface[] clients ) if (isService!Interface) { return new typeof(return)(clients); } private { mixin template AggregatorOpDispatch() { auto opDispatch(string name, Args...)(Args args) if ( is(typeof(mixin("Interface.init." ~ name)(args))) ) { alias ReturnType!(MemberType!(Interface, name)) ResultType; auto childCancellation = new TCancellationOrigin; TFuture!ResultType[] futures; futures.reserve(clients_.length); foreach (c; cast(Client[])clients_) { if (reopenTransports) { if (!c.transport.isOpen) { try { c.transport.open(); } catch (Exception e) { continue; } } } futures ~= mixin("c." ~ name)(args, childCancellation); } return AggregationResult!ResultType(futures, childCancellation); } } struct AggregationResult(T) { auto opSlice() { return range(); } auto range(Duration timeout = dur!"hnsecs"(0)) { return tFutureAggregatorRange(futures_, childCancellation_, timeout); } auto accumulate(alias acc = defaultAccumulator)() if (isAccumulator!(T, acc)) { return new AccumulatorJob!(T, acc)(futures_, childCancellation_); } private: TFuture!T[] futures_; TCancellationOrigin childCancellation_; } auto defaultAccumulator(T)(T[] values, Exception[] exceptions) { if (values.empty) { throw new TCompoundOperationException("All clients failed", exceptions); } static if (is(typeof(T.init ~ T.init))) { import std.algorithm; return reduce!"a ~ b"(values); } else { return values; } } final class AccumulatorJob(T, alias accumulator) if ( isAccumulator!(T, accumulator) ) : TFuture!(AccumulatorResult!(T, accumulator)) { this(TFuture!T[] futures, TCancellationOrigin childCancellation) { futures_ = futures; childCancellation_ = childCancellation; resultMutex_ = new Mutex; completionEvent_ = new TOneshotEvent; foreach (future; futures) { future.completion.addCallback({ auto f = future; return { synchronized (resultMutex_) { if (f.status == TFutureStatus.CANCELLED) { if (!finished_) { status_ = TFutureStatus.CANCELLED; finished_ = true; } return; } if (f.status == TFutureStatus.FAILED) { exceptions_ ~= f.getException(); } else { results_ ~= f.get(); } if (results_.length + exceptions_.length == futures_.length) { finished_ = true; completionEvent_.trigger(); } } }; }()); } } TFutureStatus status() @property { synchronized (resultMutex_) { if (!finished_) return TFutureStatus.RUNNING; if (status_ != TFutureStatus.RUNNING) return status_; try { result_ = invokeAccumulator!accumulator(results_, exceptions_); status_ = TFutureStatus.SUCCEEDED; } catch (Exception e) { exception_ = e; status_ = TFutureStatus.FAILED; } return status_; } } TAwaitable completion() @property { return completionEvent_; } AccumulatorResult!(T, accumulator) get() { auto s = status; enforce(s != TFutureStatus.RUNNING, new TFutureException("Operation not yet completed.")); if (s == TFutureStatus.CANCELLED) throw new TCancelledException; if (s == TFutureStatus.FAILED) throw exception_; return result_; } Exception getException() { auto s = status; enforce(s != TFutureStatus.RUNNING, new TFutureException("Operation not yet completed.")); if (s == TFutureStatus.CANCELLED) throw new TCancelledException; if (s == TFutureStatus.SUCCEEDED) { return null; } return exception_; } void finish() { synchronized (resultMutex_) { if (!finished_) { finished_ = true; childCancellation_.trigger(); completionEvent_.trigger(); } } } auto finishGet() { finish(); return get(); } private: TFuture!T[] futures_; TCancellationOrigin childCancellation_; bool finished_; T[] results_; Exception[] exceptions_; TFutureStatus status_; Mutex resultMutex_; union { AccumulatorResult!(T, accumulator) result_; Exception exception_; } TOneshotEvent completionEvent_; } auto invokeAccumulator(alias accumulator, T)( T[] values, Exception[] exceptions ) if ( isAccumulator!(T, accumulator) ) { static if (is(typeof(accumulator(values, exceptions)))) { return accumulator(values, exceptions); } else { return accumulator(values); } } template AccumulatorResult(T, alias acc) { alias typeof(invokeAccumulator!acc(cast(T[])[], cast(Exception[])[])) AccumulatorResult; } } |