thrift.codegen.async_client_pool

Utilities for asynchronously querying multiple servers, building on TAsyncClient.
Terminology note: The names of the artifacts defined in this module are derived from »client pool«, because they operate on a pool of TAsyncClients. However, from a architectural point of view, they often represent a pool of hosts a Thrift client application communicates with using RPC calls.
interface TAsyncClientPoolBase(Interface) : TFutureInterface!Interface
Represents a generic client pool which implements TFutureInterface!Interface using multiple TAsyncClients.
alias TAsyncClientBase!Interface Client
Shorthand for the client type this pool operates on.
void addClient(Client client)
Adds a client to the pool.
bool removeClient(Client client)
Removes a client from the pool.

Returns:

Whether the client was found in the pool.
bool delegate(Exception) rpcFaultFilter() [@property, const]
void rpcFaultFilter(bool delegate(Exception)) [@property]
Called to determine whether an exception comes from a client from the pool not working properly, or if it an exception thrown at the application level.
If the delegate returns true, the server/connection is considered to be at fault, if it returns false, the exception is just passed on to the caller.

By default, returns true for instances of TTransportException and TApplicationException, false otherwise.
bool reopenTransports() [@property, const]
void reopenTransports(bool) [@property]
Whether to open the underlying transports of a client before trying to execute a method if they are not open. This is usually desirable because it allows e.g. to automatically reconnect to a remote server if the network connection is dropped.
Defaults to true.
class TAsyncClientPool(Interface) : TAsyncClientPoolBase!Interface [final]
A TAsyncClientPoolBase implementation which queries multiple servers in a row until a request succeeds, the result of which is then returned.
The definition of »success« can be customized using the rpcFaultFilter() delegate property. If it is non-null and calling it for an exception set by a failed method invocation returns true, the error is considered to be caused by the RPC layer rather than the application layer, and the next server in the pool is tried. If there are no more clients to try, the operation is marked as failed with a TCompoundOperationException.

If a TAsyncClient in the pool fails with an RPC exception for a number of consecutive tries, it is temporarily disabled (not tried any longer) for a certain duration. Both the limit and the timeout can be configured. If all clients fail (and keepTrying is false), the operation fails with a TCompoundOperationException which contains the collected RPC exceptions.
this(Client[] clients)
bool keepTrying() [@property, const]
void keepTrying(bool value) [@property]
Whether to keep trying to find a working client if all have failed in a row.
Defaults to false.
bool permuteClients() [@property, const]
void permuteClients(bool value) [@property]
Whether to use a random permutation of the client pool on every call to execute(). This can be used e.g. as a simple form of load balancing.
Defaults to true.
ushort faultDisableCount() [@property, const]
void faultDisableCount(ushort value) [@property]
The number of consecutive faults after which a client is disabled until faultDisableDuration has passed. 0 to never disable clients.
Defaults to 0.
Duration faultDisableDuration() [@property, const]
void faultDisableDuration(Duration value) [@property]
The duration for which a client is no longer considered after it has failed too often.
Defaults to one second.
TAsyncClientPool!Interface tAsyncClientPool(Interface)(TAsyncClientBase!Interface[] clients)
TAsyncClientPool construction helper to avoid having to explicitly specify the interface type, i.e. to allow the constructor being called using IFTI (see D Bugzilla enhancement request 6082).
class TAsyncFastestClientPool(Interface) : TAsyncClientPoolBase!Interface [final]
A TAsyncClientPoolBase implementation which queries multiple servers at the same time and returns the first success response.
The definition of »success« can be customized using the rpcFaultFilter() delegate property. If it is non-null and calling it for an exception set by a failed method invocation returns true, the error is considered to be caused by the RPC layer rather than the application layer, and the next server in the pool is tried. If all clients fail, the operation is marked as failed with a TCompoundOperationException.
this(Client[] clients)
TAsyncFastestClientPool!Interface tAsyncFastestClientPool(Interface)(TAsyncClientBase!Interface[] clients)
TAsyncFastestClientPool construction helper to avoid having to explicitly specify the interface type, i.e. to allow the constructor being called using IFTI (see D Bugzilla enhancement request 6082).
class TAsyncAggregator(Interface)
class TAsyncAggregator(Interface) : TAsyncAggregator!(BaseService!Interface)
Allows easily aggregating results from a number of TAsyncClients.
Contrary to TAsync{Fallback, Fastest}ClientPool, this class does not simply implement TFutureInterface!Interface. It manages a pool of clients, but allows the user to specify a custom accumulator function to use or to iterate over the results using a TFutureAggregatorRange.

For each service method, TAsyncAggregator offers a method accepting the same arguments, and an optional TCancellation instance, just like with TFutureInterface. The return type, however, is a proxy object that offers the following methods:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/++
 + Returns a thrift.util.future.TFutureAggregatorRange for the results of
 + the client pool method invocations.
 +
 + The [] (slicing) operator can also be used to obtain the range.
 +
 + Params:
 +   timeout = A timeout to pass to the TFutureAggregatorRange constructor,
 +     defaults to zero (no timeout).
 +/
TFutureAggregatorRange!ReturnType range(Duration timeout = dur!"hnsecs"(0));
auto opSlice() { return range(); } /// Ditto

/++
 + Returns a future that gathers the results from the clients in the pool
 + and invokes a user-supplied accumulator function on them, returning its
 + return value to the client.
 +
 + In addition to the TFuture!AccumulatedType interface (where
 + AccumulatedType is the return type of the accumulator function), the
 + returned object also offers two additional methods, finish() and
 + finishGet(): By default, the accumulator functions is called after all
 + the results from the pool clients have become available. Calling finish()
 + causes the accumulator future to stop waiting for other results and
 + immediately invoking the accumulator function on the results currently
 + available. If all results are already available, finish() is a no-op.
 + finishGet() is a convenience shortcut for combining it with
 + a call to get() immediately afterwards, like waitGet() is for wait().
 +
 + The acc alias can point to any callable accepting either an array of
 + return values or an array of return values and an array of exceptions;
 + see isAccumulator!() for details. The default accumulator concatenates
 + return values that can be concatenated with each others (e.g. arrays),
 + and simply returns an array of values otherwise, failing with a
 + TCompoundOperationException no values were returned.
 +
 + The accumulator function is not executed in any of the async manager
 + worker threads associated with the async clients, but instead it is
 + invoked when the actual result is requested for the first time after the
 + operation has been completed. This also includes checking the status
 + of the operation once it is no longer running, since the accumulator
 + has to be run to determine whether the operation succeeded or failed.
 +/
auto accumulate(alias acc = defaultAccumulator)() if (isAccumulator!acc);

Example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// Some Thrift service.
interface Foo {
  int foo(string name);
  byte[] bar();
}

// Create the aggregator pool – client0, client1, client2 are some
// TAsyncClient!Foo instances, but in theory could also be other
// TFutureInterface!Foo implementations (e.g. some async client pool).
auto pool = new TAsyncAggregator!Foo([client0, client1, client2]);

foreach (val; pool.foo("baz").range(dur!"seconds"(1))) {
  // Process all the results that are available before a second has passed,
  // in the order they arrive.
  writeln(val);
}

auto sumRoots = pool.foo("baz").accumulate!((int[] vals, Exceptions[] exs){
  if (vals.empty) {
    throw new TCompoundOperationException("All clients failed", exs);
  }

  // Just to illustrate that the type of the values can change, convert the
  // numbers to double and sum up their roots.
  double result = 0;
  foreach (v; vals) result += sqrt(cast(double)v);
  return result;
})();

// Wait up to three seconds for the result, and then accumulate what has
// arrived so far.
sumRoots.completion.wait(dur!"seconds"(3));
writeln(sumRoots.finishGet());

// For scalars, the default accumulator returns an array of the values.
pragma(msg, typeof(pool.foo("").accumulate().get()); // int[].

// For lists, etc., it concatenates the results together.
pragma(msg, typeof(pool.bar().accumulate().get())); // byte[].

Note:

For the accumulate!() interface, you might currently hit a »cannot use local '…' as parameter to non-global template accumulate«-error, see DMD issue 5710. If your accumulator function does not need to access the surrounding scope, you might want to use a function literal instead of a delegate to avoid the issue.
alias TAsyncClientBase!Interface Client
Shorthand for the client type this instance operates on.
this(Client[] clients)
bool reopenTransports
Whether to open the underlying transports of a client before trying to execute a method if they are not open. This is usually desirable because it allows e.g. to automatically reconnect to a remote server if the network connection is dropped.
Defaults to true.
template isAccumulator(ValueType, alias fun)
Whether fun is a valid accumulator function for values of type ValueType.
For this to be true, fun must be a callable matching one of the following argument lists:
1
2
fun(ValueType[] values);
fun(ValueType[] values, Exception[] exceptions);

The second version is passed the collected array exceptions from all the clients in the pool.

The return value of the accumulator function is passed to the client (via the result future). If it throws an exception, the operation is marked as failed with the given exception instead.
TAsyncAggregator!Interface tAsyncAggregator(Interface)(TAsyncClientBase!Interface[] clients)
TAsyncAggregator construction helper to avoid having to explicitly specify the interface type, i.e. to allow the constructor being called using IFTI (see D Bugzilla enhancement request 6082).