thrift.async.base

Defines the interface used for client-side handling of asynchronous I/O operations, based on coroutines.
The main piece of the »client side« (e.g. for TAsyncClient users) of the API is TFuture, which represents an asynchronously executed operation, which can have a return value, throw exceptions, and which can be waited upon.

On the »implementation side«, the idea is that by using a TAsyncTransport instead of a normal TTransport and executing the work through a TAsyncManager, the same code as for synchronous I/O can be used for asynchronous operation as well, for example:

1
2
3
4
5
6
7
8
9
10
11
12
13
auto socket = new TAsyncSocket(someTAsyncSocketManager(), host, port);
// …
socket.asyncManager.execute(socket, {
  SomeThriftStruct s;

  // Waiting for socket I/O will not block an entire thread but cause
  // the async manager to execute another task in the meantime, because
  // we are using TAsyncSocket instead of TSocket.
  s.read(socket);

  // Do something with s, e.g. set a TPromise result to it.
  writeln(s);
});
interface TAsyncManager
Manages one or more asynchronous transport resources (e.g. sockets in the case of TAsyncSocketManager) and allows work items to be submitted for them.
Implementations will typically run one or more background threads for executing the work, which is one of the reasons for a TAsyncManager to be used. Each work item is run in its own fiber and is expected to yield() away while waiting for time-consuming operations.

The second important purpose of TAsyncManager is to serialize access to the transport resources – without taking care of that, e.g. issuing multiple RPC calls over the same connection in rapid succession would likely lead to more than one request being written at the same time, causing only garbage to arrive at the remote end.

All methods are thread-safe.
void execute(TAsyncTransport transport, void delegate() work, TCancellation cancellation = null)
Submits a work item to be executed asynchronously.
Access to asnyc transports is serialized – if two work items associated with the same transport are submitted, the second delegate will not be invoked until the first has returned, even it the latter context-switches away (because it is waiting for I/O) and the async manager is idle otherwise.

Optionally, a TCancellation instance can be specified. If present, triggering it will be considered a request to cancel the work item, if it is still waiting for the associated transport to become available. Delegates which are already being processed (i.e. waiting for I/O) are not affected because this would bring the connection into an undefined state (as probably half-written request or a half-read response would be left behind).

Parameters:

transportThe TAsyncTransport the work delegate will operate on. Must be associated with this TAsyncManager instance.
workThe operations to execute on the given transport. Must never throw, errors should be handled in another way. nothrow semantics are difficult to enforce in combination with fibres though, so currently exceptions are just swallowed by TAsyncManager implementations.
cancellationIf set, can be used to request cancellatinon of this work item if it is still waiting to be executed.

Note:

The work item will likely be executed in a different thread, so make sure the code it relies on is thread-safe. An exception are the async transports themselves, to which access is serialized as noted above.
void delay(Duration duration, void delegate() work)
Submits a delegate to be executed after a certain amount of time has passed.
The actual amount of time elapsed can be higher if the async manager instance is busy and thus should not be relied on. The

Parameters:

durationThe amount of time to wait before starting to execute the work delegate.
workThe code to execute after the specified amount of time has passed.

Example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// A very basic example – usually, the actuall work item would enqueue
// some async transport operation.
auto asyncMangager = someAsyncManager();

TFuture!int calculate() {
  // Create a promise and asynchronously set its value after three
  // seconds have passed.
  auto promise = new TPromise!int;
  asyncManager.delay(dur!"seconds"(3), {
    promise.succeed(42);
  });

  // Immediately return it to the caller.
  return promise;
}

// This will wait until the result is available and then print it.
writeln(calculate().waitGet());
bool stop(Duration waitFinishTimeout = dur!"hnsecs"(-1))
Shuts down all background threads or other facilities that might have been started in order to execute work items. This function is typically called during program shutdown.
If there are still tasks to be executed when the timeout expires, any currently executed work items will never receive any notifications for async transports managed by this instance, queued work items will be silently dropped, and implementations are allowed to leak resources.

Parameters:

waitFinishTimeoutIf positive, waits for all work items to be finished for the specified amount of time, if negative, waits for completion without ever timing out, if zero, immediately shuts down the background facilities.
interface TAsyncTransport : TTransport
A TTransport which uses a TAsyncManager to schedule non-blocking operations.
The actual type of device is not specified; typically, implementations will depend on an interface derived from TAsyncManager to be notified of changes in the transport state.

The peeking, reading, writing and flushing methods must always be called from within the associated async manager.
TAsyncManager asyncManager() [@property]
The TAsyncManager associated with this transport.
interface TAsyncSocketManager : TAsyncManager
A TAsyncManager providing notificiations for socket events.
void addOneshotListener(Socket socket, TAsyncEventType eventType, Duration timeout, TSocketEventListener listener)
void addOneshotListener(Socket socket, TAsyncEventType eventType, TSocketEventListener listener)
Adds a listener that is triggered once when an event of the specified type occurs, and removed afterwards.

Parameters:

socketThe socket to listen for events at.
eventTypeThe type of the event to listen for.
timeoutThe period of time after which the listener will be called with TAsyncEventReason.TIMED_OUT if no event happened.
listenerThe delegate to call when an event happened.
enum TAsyncEventType
Types of events that can happen for an asynchronous transport.
READ
New data became available to read.
WRITE
The transport became ready to be written to.
alias void delegate(TAsyncEventReason callReason) TSocketEventListener
The type of the delegates used to register socket event handlers.
enum TAsyncEventReason : byte
The reason a listener was called.
NORMAL
The event listened for was triggered normally.
TIMED_OUT
A timeout for the event was set, and it expired.