123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229 |
|
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* Defines the interface used for client-side handling of asynchronous
* I/O operations, based on coroutines.
*
* The main piece of the »client side« (e.g. for TAsyncClient users) of the
* API is TFuture, which represents an asynchronously executed operation,
* which can have a return value, throw exceptions, and which can be waited
* upon.
*
* On the »implementation side«, the idea is that by using a TAsyncTransport
* instead of a normal TTransport and executing the work through a
* TAsyncManager, the same code as for synchronous I/O can be used for
* asynchronous operation as well, for example:
*
* ---
* auto socket = new TAsyncSocket(someTAsyncSocketManager(), host, port);
* // …
* socket.asyncManager.execute(socket, {
* SomeThriftStruct s;
*
* // Waiting for socket I/O will not block an entire thread but cause
* // the async manager to execute another task in the meantime, because
* // we are using TAsyncSocket instead of TSocket.
* s.read(socket);
*
* // Do something with s, e.g. set a TPromise result to it.
* writeln(s);
* });
* ---
*/
module thrift.async.base;
import core.time : Duration, dur;
import std.socket/+ : Socket+/; // DMD @@BUG314@@
import thrift.base;
import thrift.transport.base;
import thrift.util.cancellation;
/**
* Manages one or more asynchronous transport resources (e.g. sockets in the
* case of TAsyncSocketManager) and allows work items to be submitted for them.
*
* Implementations will typically run one or more background threads for
* executing the work, which is one of the reasons for a TAsyncManager to be
* used. Each work item is run in its own fiber and is expected to yield() away
* while waiting for time-consuming operations.
*
* The second important purpose of TAsyncManager is to serialize access to
* the transport resources – without taking care of that, e.g. issuing multiple
* RPC calls over the same connection in rapid succession would likely lead to
* more than one request being written at the same time, causing only garbage
* to arrive at the remote end.
*
* All methods are thread-safe.
*/
interface TAsyncManager {
/**
* Submits a work item to be executed asynchronously.
*
* Access to asnyc transports is serialized – if two work items associated
* with the same transport are submitted, the second delegate will not be
* invoked until the first has returned, even it the latter context-switches
* away (because it is waiting for I/O) and the async manager is idle
* otherwise.
*
* Optionally, a TCancellation instance can be specified. If present,
* triggering it will be considered a request to cancel the work item, if it
* is still waiting for the associated transport to become available.
* Delegates which are already being processed (i.e. waiting for I/O) are not
* affected because this would bring the connection into an undefined state
* (as probably half-written request or a half-read response would be left
* behind).
*
* Params:
* transport = The TAsyncTransport the work delegate will operate on. Must
* be associated with this TAsyncManager instance.
* work = The operations to execute on the given transport. Must never
* throw, errors should be handled in another way. nothrow semantics are
* difficult to enforce in combination with fibres though, so currently
* exceptions are just swallowed by TAsyncManager implementations.
* cancellation = If set, can be used to request cancellatinon of this work
* item if it is still waiting to be executed.
*
* Note: The work item will likely be executed in a different thread, so make
* sure the code it relies on is thread-safe. An exception are the async
* transports themselves, to which access is serialized as noted above.
*/
void execute(TAsyncTransport transport, void delegate() work,
TCancellation cancellation = null
) in {
assert(transport.asyncManager is this,
"The given transport must be associated with this TAsyncManager.");
}
/**
* Submits a delegate to be executed after a certain amount of time has
* passed.
*
* The actual amount of time elapsed can be higher if the async manager
* instance is busy and thus should not be relied on. The
*
* Params:
* duration = The amount of time to wait before starting to execute the
* work delegate.
* work = The code to execute after the specified amount of time has passed.
*
* Example:
* ---
* // A very basic example – usually, the actuall work item would enqueue
* // some async transport operation.
* auto asyncMangager = someAsyncManager();
*
* TFuture!int calculate() {
* // Create a promise and asynchronously set its value after three
* // seconds have passed.
* auto promise = new TPromise!int;
* asyncManager.delay(dur!"seconds"(3), {
* promise.succeed(42);
* });
*
* // Immediately return it to the caller.
* return promise;
* }
*
* // This will wait until the result is available and then print it.
* writeln(calculate().waitGet());
* ---
*/
void delay(Duration duration, void delegate() work);
/**
* Shuts down all background threads or other facilities that might have
* been started in order to execute work items. This function is typically
* called during program shutdown.
*
* If there are still tasks to be executed when the timeout expires, any
* currently executed work items will never receive any notifications
* for async transports managed by this instance, queued work items will
* be silently dropped, and implementations are allowed to leak resources.
*
* Params:
* waitFinishTimeout = If positive, waits for all work items to be
* finished for the specified amount of time, if negative, waits for
* completion without ever timing out, if zero, immediately shuts down
* the background facilities.
*/
bool stop(Duration waitFinishTimeout = dur!"hnsecs"(-1));
}
/**
* A TTransport which uses a TAsyncManager to schedule non-blocking operations.
*
* The actual type of device is not specified; typically, implementations will
* depend on an interface derived from TAsyncManager to be notified of changes
* in the transport state.
*
* The peeking, reading, writing and flushing methods must always be called
* from within the associated async manager.
*/
interface TAsyncTransport : TTransport {
/**
* The TAsyncManager associated with this transport.
*/
TAsyncManager asyncManager() @property;
}
/**
* A TAsyncManager providing notificiations for socket events.
*/
interface TAsyncSocketManager : TAsyncManager {
/**
* Adds a listener that is triggered once when an event of the specified type
* occurs, and removed afterwards.
*
* Params:
* socket = The socket to listen for events at.
* eventType = The type of the event to listen for.
* timeout = The period of time after which the listener will be called
* with TAsyncEventReason.TIMED_OUT if no event happened.
* listener = The delegate to call when an event happened.
*/
void addOneshotListener(Socket socket, TAsyncEventType eventType,
Duration timeout, TSocketEventListener listener);
/// Ditto
void addOneshotListener(Socket socket, TAsyncEventType eventType,
TSocketEventListener listener);
}
/**
* Types of events that can happen for an asynchronous transport.
*/
enum TAsyncEventType {
READ, /// New data became available to read.
WRITE /// The transport became ready to be written to.
}
/**
* The type of the delegates used to register socket event handlers.
*/
alias void delegate(TAsyncEventReason callReason) TSocketEventListener;
/**
* The reason a listener was called.
*/
enum TAsyncEventReason : byte {
NORMAL, /// The event listened for was triggered normally.
TIMED_OUT /// A timeout for the event was set, and it expired.
}
|