Returns:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | /++ + Returns a thrift.util.future.TFutureAggregatorRange for the results of + the client pool method invocations. + + The [] (slicing) operator can also be used to obtain the range. + + Params: + timeout = A timeout to pass to the TFutureAggregatorRange constructor, + defaults to zero (no timeout). +/ TFutureAggregatorRange!ReturnType range(Duration timeout = dur!"hnsecs"(0)); auto opSlice() { return range(); } /// Ditto /++ + Returns a future that gathers the results from the clients in the pool + and invokes a user-supplied accumulator function on them, returning its + return value to the client. + + In addition to the TFuture!AccumulatedType interface (where + AccumulatedType is the return type of the accumulator function), the + returned object also offers two additional methods, finish() and + finishGet(): By default, the accumulator functions is called after all + the results from the pool clients have become available. Calling finish() + causes the accumulator future to stop waiting for other results and + immediately invoking the accumulator function on the results currently + available. If all results are already available, finish() is a no-op. + finishGet() is a convenience shortcut for combining it with + a call to get() immediately afterwards, like waitGet() is for wait(). + + The acc alias can point to any callable accepting either an array of + return values or an array of return values and an array of exceptions; + see isAccumulator!() for details. The default accumulator concatenates + return values that can be concatenated with each others (e.g. arrays), + and simply returns an array of values otherwise, failing with a + TCompoundOperationException no values were returned. + + The accumulator function is not executed in any of the async manager + worker threads associated with the async clients, but instead it is + invoked when the actual result is requested for the first time after the + operation has been completed. This also includes checking the status + of the operation once it is no longer running, since the accumulator + has to be run to determine whether the operation succeeded or failed. +/ auto accumulate(alias acc = defaultAccumulator)() if (isAccumulator!acc); |
Example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | // Some Thrift service. interface Foo { int foo(string name); byte[] bar(); } // Create the aggregator pool – client0, client1, client2 are some // TAsyncClient!Foo instances, but in theory could also be other // TFutureInterface!Foo implementations (e.g. some async client pool). auto pool = new TAsyncAggregator!Foo([client0, client1, client2]); foreach (val; pool.foo("baz").range(dur!"seconds"(1))) { // Process all the results that are available before a second has passed, // in the order they arrive. writeln(val); } auto sumRoots = pool.foo("baz").accumulate!((int[] vals, Exceptions[] exs){ if (vals.empty) { throw new TCompoundOperationException("All clients failed", exs); } // Just to illustrate that the type of the values can change, convert the // numbers to double and sum up their roots. double result = 0; foreach (v; vals) result += sqrt(cast(double)v); return result; })(); // Wait up to three seconds for the result, and then accumulate what has // arrived so far. sumRoots.completion.wait(dur!"seconds"(3)); writeln(sumRoots.finishGet()); // For scalars, the default accumulator returns an array of the values. pragma(msg, typeof(pool.foo("").accumulate().get()); // int[]. // For lists, etc., it concatenates the results together. pragma(msg, typeof(pool.bar().accumulate().get())); // byte[]. |
Note:
1 2 | fun(ValueType[] values); fun(ValueType[] values, Exception[] exceptions); |