123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

/**
 * Transports for reading from/writing to Thrift »log files«.
 *
 * These transports are not »stupid« sources and sinks just reading and
 * writing bytes from a file verbatim, but organize the contents in the form
 * of so-called »events«, which refers to the data written between two flush()
 * calls.
 *
 * Chunking is supported, events are guaranteed to never span chunk boundaries.
 * As a consequence, an event can never be larger than the chunk size. The
 * chunk size used is not saved with the file, so care has to be taken to make
 * sure the same chunk size is used for reading and writing.
 */
module thrift.transport.file;

import core.thread : Thread;
import std.array : empty;
import std.algorithm : min, max;
import std.concurrency;
import std.conv : to;
import std.datetime : AutoStart, dur, Duration, StopWatch;
import std.exception;
import std.stdio : File;
import thrift.base;
import thrift.transport.base;

/// The default chunk size, in bytes.
enum DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024;

/// The type used to represent event sizes in the file.
alias uint EventSize;

version (BigEndian) {
  static assert(false,
    "Little endian byte order is assumed in thrift.transport.file.");
}

/**
 * A transport used to read log files. It can never be written to, calling
 * write() throws.
 *
 * Contrary to the C++ design, explicitly opening the transport/file before
 * using is necessary to allow manually closing the file without relying on the
 * object lifetime. Otherwise, it's a straight port of the C++ implementation.
 */
final class TFileReaderTransport : TBaseTransport {
  /**
   * Creates a new file writer transport.
   *
   * Params:
   *   path = Path of the file to opperate on.
   */
  this(string path) {
    path_ = path;
    chunkSize_ = DEFAULT_CHUNK_SIZE;
    readBufferSize_ = DEFAULT_READ_BUFFER_SIZE;
    readTimeout_ = DEFAULT_READ_TIMEOUT;
    corruptedEventSleepDuration_ = DEFAULT_CORRUPTED_EVENT_SLEEP_DURATION;
    maxEventSize = DEFAULT_MAX_EVENT_SIZE;
  }

  override bool isOpen() @property {
    return isOpen_;
  }

  override bool peek() {
    if (!isOpen) return false;

    // If there is no event currently processed, try fetching one from the
    // file.
    if (!currentEvent_) {
      currentEvent_ = readEvent();

      if (!currentEvent_) {
        // Still nothing there, couldn't read a new event.
        return false;
      }
    }
    // check if there is anything to read
    return (currentEvent_.length - currentEventPos_) > 0;
  }

  override void open() {
    if (isOpen) return;
    try {
      file_ = File(path_, "rb");
    } catch (Exception e) {
      throw new TTransportException("Error on opening input file.",
        TTransportException.Type.NOT_OPEN, __FILE__, __LINE__, e);
    }
    isOpen_ = true;
  }

  override void close() {
    if (!isOpen) return;

    file_.close();
    isOpen_ = false;
    readState_.resetAllValues();
  }

  override size_t read(ubyte[] buf) {
    enforce(isOpen, new TTransportException(
      "Cannot read if file is not open.", TTransportException.Type.NOT_OPEN));

    // If there is no event currently processed, try fetching one from the
    // file.
    if (!currentEvent_) {
      currentEvent_ = readEvent();

      if (!currentEvent_) {
        // Still nothing there, couldn't read a new event.
        return 0;
      }
    }

    auto len = buf.length;
    auto remaining = currentEvent_.length - currentEventPos_;

    if (remaining <= len) {
      // If less than the requested length is available, read as much as
      // possible.
      buf[0 .. remaining] = currentEvent_[currentEventPos_ .. $];
      currentEvent_ = null;
      currentEventPos_ = 0;
      return remaining;
    }

    // There will still be data left in the buffer after reading, pass out len
    // bytes.
    buf[] = currentEvent_[currentEventPos_ .. currentEventPos_ + len];
    currentEventPos_ += len;
    return len;
  }

  ulong getNumChunks() {
    enforce(isOpen, new TTransportException(
      "Cannot get number of chunks if file not open.",
      TTransportException.Type.NOT_OPEN));

    try {
      auto fileSize = file_.size();
      if (fileSize == 0) {
        // Empty files have no chunks.
        return 0;
      }
      return ((fileSize)/chunkSize_) + 1;
    } catch (Exception e) {
      throw new TTransportException("Error getting file size.", __FILE__,
        __LINE__, e);
    }
  }

  ulong getCurChunk() {
    return offset_ / chunkSize_;
  }

  void seekToChunk(long chunk) {
    enforce(isOpen, new TTransportException(
      "Cannot get number of chunks if file not open.",
      TTransportException.Type.NOT_OPEN));

    auto numChunks = getNumChunks();

    if (chunk < 0) {
      // Count negative indices from the end.
      chunk += numChunks;
    }

    if (chunk < 0) {
      logError("Incorrect chunk number for reverse seek, seeking to " ~
       "beginning instead: %s", chunk);
      chunk = 0;
    }

    bool seekToEnd;
    long minEndOffset;
    if (chunk >= numChunks) {
      logError("Trying to seek to non-existing chunk, seeking to " ~
       "end of file instead: %s", chunk);
      seekToEnd = true;
      chunk = numChunks - 1;
      // this is the min offset to process events till
      minEndOffset = file_.size();
    }

    readState_.resetAllValues();
    currentEvent_ = null;

    try {
      file_.seek(chunk * chunkSize_);
      offset_ = chunk * chunkSize_;
    } catch (Exception e) {
      throw new TTransportException("Error seeking to chunk", __FILE__,
        __LINE__, e);
    }

    if (seekToEnd) {
      // Never wait on the end of the file for new content, we just want to
      // find the last one.
      auto oldReadTimeout = readTimeout_;
      scope (exit) readTimeout_ = oldReadTimeout;
      readTimeout_ = dur!"hnsecs"(0);

      // Keep on reading unti the last event at point of seekToChunk call.
      while ((offset_ + readState_.bufferPos_) < minEndOffset) {
        if (readEvent() is null) {
          break;
        }
      }
    }
  }

  void seekToEnd() {
    seekToChunk(getNumChunks());
  }

  /**
   * The size of the chunks the file is divided into, in bytes.
   */
  ulong chunkSize() @property const {
    return chunkSize_;
  }

  /// ditto
  void chunkSize(ulong value) @property {
    enforce(!isOpen, new TTransportException(
      "Cannot set chunk size after TFileReaderTransport has been opened."));
    enforce(value > EventSize.sizeof, new TTransportException("Chunks must " ~
      "be large enough to accomodate at least a single byte of payload data."));
    chunkSize_ = value;
  }

  /**
   * If positive, wait the specified duration for new data when arriving at
   * end of file. If negative, wait forever (tailing mode), waking up to check
   * in the specified interval. If zero, do not wait at all.
   *
   * Defaults to 500 ms.
   */
  Duration readTimeout() @property const {
    return readTimeout_;
  }

  /// ditto
  void readTimeout(Duration value) @property {
    readTimeout_ = value;
  }

  /// ditto
  enum DEFAULT_READ_TIMEOUT = dur!"msecs"(500);

  /**
   * Read buffer size, in bytes.
   *
   * Defaults to 1 MiB.
   */
  size_t readBufferSize() @property const {
    return readBufferSize_;
  }

  /// ditto
  void readBufferSize(size_t value) @property {
    if (readBuffer_) {
      enforce(value <= readBufferSize_,
        "Cannot shrink read buffer after first read.");
      readBuffer_.length = value;
    }
    readBufferSize_ = value;
  }

  /// ditto
  enum DEFAULT_READ_BUFFER_SIZE = 1 * 1024 * 1024;

  /**
   * Arbitrary event size limit, in bytes. Must be smaller than chunk size.
   *
   * Defaults to zero (no limit).
   */
  size_t maxEventSize() @property const {
    return maxEventSize_;
  }

  /// ditto
  void maxEventSize(size_t value) @property {
    enforce(value <= chunkSize_ - EventSize.sizeof, "Events cannot span " ~
      "mutiple chunks, maxEventSize must be smaller than chunk size.");
    maxEventSize_ = value;
  }

  /// ditto
  enum DEFAULT_MAX_EVENT_SIZE = 0;

  /**
   * The interval at which the thread wakes up to check for the next chunk
   * in tailing mode.
   *
   * Defaults to one second.
   */
  Duration corruptedEventSleepDuration() const {
    return corruptedEventSleepDuration_;
  }

  /// ditto
  void corruptedEventSleepDuration(Duration value) {
    corruptedEventSleepDuration_ = value;
  }

  /// ditto
  enum DEFAULT_CORRUPTED_EVENT_SLEEP_DURATION = dur!"seconds"(1);

  /**
   * The maximum number of corrupted events tolerated before the whole chunk
   * is skipped.
   *
   * Defaults to zero.
   */
  uint maxCorruptedEvents() @property const {
    return maxCorruptedEvents_;
  }

  /// ditto
  void maxCorruptedEvents(uint value) @property {
    maxCorruptedEvents_ = value;
  }

  /// ditto
  enum DEFAULT_MAX_CORRUPTED_EVENTS = 0;

private:
  ubyte[] readEvent() {
    if (!readBuffer_) {
      readBuffer_ = new ubyte[readBufferSize_];
    }

    bool timeoutExpired;
    while (1) {
      // read from the file if read buffer is exhausted
      if (readState_.bufferPos_ == readState_.bufferLen_) {
        // advance the offset pointer
        offset_ += readState_.bufferLen_;

        try {
          // Need to clear eof flag before reading, otherwise tailing a file
          // does not work.
          file_.clearerr();

          auto usedBuf = file_.rawRead(readBuffer_);
          readState_.bufferLen_ = usedBuf.length;
        } catch (Exception e) {
          readState_.resetAllValues();
          throw new TTransportException("Error while reading from file",
            __FILE__, __LINE__, e);
        }

        readState_.bufferPos_ = 0;
        readState_.lastDispatchPos_ = 0;

        if (readState_.bufferLen_ == 0) {
          // Reached end of file.
          if (readTimeout_ < dur!"hnsecs"(0)) {
            // Tailing mode, sleep for the specified duration and try again.
            Thread.sleep(-readTimeout_);
            continue;
          } else if (readTimeout_ == dur!"hnsecs"(0) || timeoutExpired) {
            // Either no timeout set, or it has already expired.
            readState_.resetState(0);
            return null;
          } else {
            // Timeout mode, sleep for the specified amount of time and retry.
            Thread.sleep(readTimeout_);
            timeoutExpired = true;
            continue;
          }
        }
      }

      // Attempt to read an event from the buffer.
      while (readState_.bufferPos_ < readState_.bufferLen_) {
        if (readState_.readingSize_) {
          if (readState_.eventSizeBuffPos_ == 0) {
            if ((offset_ + readState_.bufferPos_)/chunkSize_ !=
              ((offset_ + readState_.bufferPos_ + 3)/chunkSize_))
            {
              readState_.bufferPos_++;
              continue;
            }
          }

          readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++] =
            readBuffer_[readState_.bufferPos_++];

          if (readState_.eventSizeBuffPos_ == 4) {
            auto size = (cast(uint[])readState_.eventSizeBuff_)[0];

            if (size == 0) {
              // This is part of the zero padding between chunks.
              readState_.resetState(readState_.lastDispatchPos_);
              continue;
            }

            // got a valid event
            readState_.readingSize_ = false;
            readState_.eventLen_ = size;
            readState_.eventPos_ = 0;

            // check if the event is corrupted and perform recovery if required
            if (isEventCorrupted()) {
              performRecovery();
              // start from the top
              break;
            }
          }
        } else {
          if (!readState_.event_) {
            readState_.event_ = new ubyte[readState_.eventLen_];
          }

          // take either the entire event or the remaining bytes in the buffer
          auto reclaimBuffer = min(readState_.bufferLen_ - readState_.bufferPos_,
            readState_.eventLen_ - readState_.eventPos_);

          // copy data from read buffer into event buffer
          readState_.event_[
            readState_.eventPos_ .. readState_.eventPos_ + reclaimBuffer
          ] = readBuffer_[
            readState_.bufferPos_ .. readState_.bufferPos_ + reclaimBuffer
          ];

          // increment position ptrs
          readState_.eventPos_ += reclaimBuffer;
          readState_.bufferPos_ += reclaimBuffer;

          // check if the event has been read in full
          if (readState_.eventPos_ == readState_.eventLen_) {
            // Reset the read state and return the completed event.
            auto completeEvent = readState_.event_;
            readState_.event_ = null;
            readState_.resetState(readState_.bufferPos_);
            return completeEvent;
          }
        }
      }
    }
  }

  bool isEventCorrupted() {
    if ((maxEventSize_ > 0) && (readState_.eventLen_ > maxEventSize_)) {
      // Event size is larger than user-speficied max-event size
      logError("Corrupt event read: Event size (%s) greater than max " ~
        "event size (%s)", readState_.eventLen_, maxEventSize_);
      return true;
    } else if (readState_.eventLen_ > chunkSize_) {
      // Event size is larger than chunk size
      logError("Corrupt event read: Event size (%s) greater than chunk " ~
        "size (%s)", readState_.eventLen_, chunkSize_);
      return true;
    } else if (((offset_ + readState_.bufferPos_ - EventSize.sizeof) / chunkSize_) !=
      ((offset_ + readState_.bufferPos_ + readState_.eventLen_ - EventSize.sizeof) / chunkSize_))
    {
      // Size indicates that event crosses chunk boundary
      logError("Read corrupt event. Event crosses chunk boundary. " ~
        "Event size: %s. Offset: %s", readState_.eventLen_,
        (offset_ + readState_.bufferPos_ + EventSize.sizeof)
      );

      return true;
    }

    return false;
  }

  void performRecovery() {
    // perform some kickass recovery
    auto curChunk = getCurChunk();
    if (lastBadChunk_ == curChunk) {
      numCorruptedEventsInChunk_++;
    } else {
      lastBadChunk_ = curChunk;
      numCorruptedEventsInChunk_ = 1;
    }

    if (numCorruptedEventsInChunk_ < maxCorruptedEvents_) {
      // maybe there was an error in reading the file from disk
      // seek to the beginning of chunk and try again
      seekToChunk(curChunk);
    } else {
      // Just skip ahead to the next chunk if we not already at the last chunk.
      if (curChunk != (getNumChunks() - 1)) {
        seekToChunk(curChunk + 1);
      } else if (readTimeout_ < dur!"hnsecs"(0)) {
        // We are in tailing mode, wait until there is enough data to start
        // the next chunk.
        while(curChunk == (getNumChunks() - 1)) {
          Thread.sleep(corruptedEventSleepDuration_);
        }
        seekToChunk(curChunk + 1);
      } else {
        // Pretty hosed at this stage, rewind the file back to the last
        // successful point and punt on the error.
        readState_.resetState(readState_.lastDispatchPos_);
        currentEvent_ = null;
        currentEventPos_ = 0;

        throw new TTransportException("File corrupted at offset: " ~
          to!string(offset_ + readState_.lastDispatchPos_),
          TTransportException.Type.CORRUPTED_DATA);
      }
    }
  }

  string path_;
  File file_;
  bool isOpen_;
  long offset_;
  ubyte[] currentEvent_;
  size_t currentEventPos_;
  ulong chunkSize_;
  Duration readTimeout_;
  size_t maxEventSize_;

  // Read buffer – lazily allocated on the first read().
  ubyte[] readBuffer_;
  size_t readBufferSize_;

  static struct ReadState {
    ubyte[] event_;
    size_t eventLen_;
    size_t eventPos_;

    // keep track of event size
    ubyte[4] eventSizeBuff_;
    ubyte eventSizeBuffPos_;
    bool readingSize_ = true;

    // read buffer variables
    size_t bufferPos_;
    size_t bufferLen_;

    // last successful dispatch point
    size_t lastDispatchPos_;

    void resetState(size_t lastDispatchPos) {
      readingSize_ = true;
      eventSizeBuffPos_ = 0;
      lastDispatchPos_ = lastDispatchPos;
    }

    void resetAllValues() {
      resetState(0);
      bufferPos_ = 0;
      bufferLen_ = 0;
      event_ = null;
    }
  }
  ReadState readState_;

  ulong lastBadChunk_;
  uint maxCorruptedEvents_;
  uint numCorruptedEventsInChunk_;
  Duration corruptedEventSleepDuration_;
}

/**
 * A transport used to write log files. It can never be read from, calling
 * read() throws.
 *
 * Contrary to the C++ design, explicitly opening the transport/file before
 * using is necessary to allow manually closing the file without relying on the
 * object lifetime.
 */
final class TFileWriterTransport : TBaseTransport {
  /**
   * Creates a new file writer transport.
   *
   * Params:
   *   path = Path of the file to opperate on.
   */
  this(string path) {
    path_ = path;

    chunkSize_ = DEFAULT_CHUNK_SIZE;
    eventBufferSize_ = DEFAULT_EVENT_BUFFER_SIZE;
    ioErrorSleepDuration = DEFAULT_IO_ERROR_SLEEP_DURATION;
    maxFlushBytes_ = DEFAULT_MAX_FLUSH_BYTES;
    maxFlushInterval_ = DEFAULT_MAX_FLUSH_INTERVAL;
  }

  override bool isOpen() @property {
    return isOpen_;
  }

  /**
   * A file writer transport can never be read from.
   */
  override bool peek() {
    return false;
  }

  override void open() {
    if (isOpen) return;

    writerThread_ = spawn(
      &writerThread,
      path_,
      chunkSize_,
      maxFlushBytes_,
      maxFlushInterval_,
      ioErrorSleepDuration_
    );
    setMaxMailboxSize(writerThread_, eventBufferSize_, OnCrowding.block);
    isOpen_ = true;
  }

  /**
   * Closes the transport, i.e. the underlying file and the writer thread.
   */
  override void close() {
    if (!isOpen) return;

    prioritySend(writerThread_, ShutdownMessage(), thisTid); // FIXME: Should use normal send here.
    receive((ShutdownMessage msg, Tid tid){});
    isOpen_ = false;
  }

  /**
   * Enqueues the passed slice of data for writing and immediately returns.
   * write() only blocks if the event buffer has been exhausted.
   *
   * The transport must be open when calling this.
   *
   * Params:
   *   buf = Slice of data to write.
   */
  override void write(in ubyte[] buf) {
    enforce(isOpen, new TTransportException(
      "Cannot write to non-open file.", TTransportException.Type.NOT_OPEN));

    if (buf.empty) {
      logError("Cannot write empty event, skipping.");
      return;
    }

    auto maxSize = chunkSize - EventSize.sizeof;
    enforce(buf.length <= maxSize, new TTransportException(
      "Cannot write more than " ~ to!string(maxSize) ~
      "bytes at once due to chunk size."));

    send(writerThread_, buf.idup);
  }

  /**
   * Flushes any pending data to be written.
   *
   * The transport must be open when calling this.
   *
   * Throws: TTransportException if an error occurs.
   */
  override void flush() {
    enforce(isOpen, new TTransportException(
      "Cannot flush file if not open.", TTransportException.Type.NOT_OPEN));

    send(writerThread_, FlushMessage(), thisTid);
    receive((FlushMessage msg, Tid tid){});
  }

  /**
   * The size of the chunks the file is divided into, in bytes.
   *
   * A single event (write call) never spans multiple chunks – this
   * effectively limits the event size to chunkSize - EventSize.sizeof.
   */
  ulong chunkSize() @property {
    return chunkSize_;
  }

  /// ditto
  void chunkSize(ulong value) @property {
    enforce(!isOpen, new TTransportException(
      "Cannot set chunk size after TFileWriterTransport has been opened."));
    chunkSize_ = value;
  }

  /**
   * The maximum number of write() calls buffered, or zero for no limit.
   *
   * If the buffer is exhausted, write() will block until space becomes
   * available.
   */
  size_t eventBufferSize() @property {
    return eventBufferSize_;
  }

  /// ditto
  void eventBufferSize(size_t value) @property {
    eventBufferSize_ = value;
    if (isOpen) {
      setMaxMailboxSize(writerThread_, value, OnCrowding.throwException);
    }
  }

  /// ditto
  enum DEFAULT_EVENT_BUFFER_SIZE = 10_000;

  /**
   * Maximum number of bytes buffered before writing and flushing the file
   * to disk.
   *
   * Currently cannot be set after the first call to write().
   */
  size_t maxFlushBytes() @property {
    return maxFlushBytes_;
  }

  /// ditto
  void maxFlushBytes(size_t value) @property {
    maxFlushBytes_ = value;
    if (isOpen) {
      send(writerThread_, FlushBytesMessage(value));
    }
  }

  /// ditto
  enum DEFAULT_MAX_FLUSH_BYTES = 1000 * 1024;

  /**
   * Maximum interval between flushing the file to disk.
   *
   * Currenlty cannot be set after the first call to write().
   */
  Duration maxFlushInterval() @property {
    return maxFlushInterval_;
  }

  /// ditto
  void maxFlushInterval(Duration value) @property {
    maxFlushInterval_ = value;
    if (isOpen) {
      send(writerThread_, FlushIntervalMessage(value));
    }
  }

  /// ditto
  enum DEFAULT_MAX_FLUSH_INTERVAL = dur!"seconds"(3);

  /**
   * When the writer thread encounteres an I/O error, it goes pauses for a
   * short time before trying to reopen the output file. This controls the
   * sleep duration.
   */
  Duration ioErrorSleepDuration() @property {
    return ioErrorSleepDuration_;
  }

  /// ditto
  void ioErrorSleepDuration(Duration value) @property {
    ioErrorSleepDuration_ = value;
    if (isOpen) {
      send(writerThread_, FlushIntervalMessage(value));
    }
  }

  /// ditto
  enum DEFAULT_IO_ERROR_SLEEP_DURATION = dur!"msecs"(500);

private:
  string path_;
  ulong chunkSize_;
  size_t eventBufferSize_;
  Duration ioErrorSleepDuration_;
  size_t maxFlushBytes_;
  Duration maxFlushInterval_;
  bool isOpen_;
  Tid writerThread_;
}

private {
  // Signals that the file should be flushed on disk. Sent to the writer
  // thread and sent back along with the tid for confirmation.
  struct FlushMessage {}

  // Signals that the writer thread should close the file and shut down. Sent
  // to the writer thread and sent back along with the tid for confirmation.
  struct ShutdownMessage {}

  struct FlushBytesMessage {
    size_t value;
  }

  struct FlushIntervalMessage {
    Duration value;
  }

  struct IoErrorSleepDurationMessage {
    Duration value;
  }

  void writerThread(
    string path,
    ulong chunkSize,
    size_t maxFlushBytes,
    Duration maxFlushInterval,
    Duration ioErrorSleepDuration
  ) {
    bool errorOpening;
    File file;
    ulong offset;
    try {
      // Open file in appending and binary mode.
      file = File(path, "ab");
      offset = file.tell();
    } catch (Exception e) {
      logError("Error on opening output file in writer thread: %s", e);
      errorOpening = true;
    }

    auto flushTimer = StopWatch(AutoStart.yes);
    size_t unflushedByteCount;

    Tid shutdownRequestTid;
    bool shutdownRequested;
    while (true) {
      if (shutdownRequested) break;

      bool forceFlush;
      Tid flushRequestTid;
      receiveTimeout(max(dur!"hnsecs"(0), maxFlushInterval - flushTimer.peek()),
        (immutable(ubyte)[] data) {
          while (errorOpening) {
            logError("Writer thread going to sleep for %s µs due to IO errors",
              ioErrorSleepDuration.fracSec.usecs);

            // Sleep for ioErrorSleepDuration, being ready to be interrupted
            // by shutdown requests.
            auto timedOut = receiveTimeout(ioErrorSleepDuration,
              (ShutdownMessage msg, Tid tid){ shutdownRequestTid = tid; });
            if (!timedOut) {
              // We got a shutdown request, just drop all events and exit the
              // main loop as to not block application shutdown with our tries
              // which we must assume to fail.
              break;
            }

            try {
              file = File(path, "ab");
              unflushedByteCount = 0;
              errorOpening = false;
              logError("Output file %s reopened during writer thread error " ~
                "recovery", path);
            } catch (Exception e) {
              logError("Unable to reopen output file %s during writer " ~
                "thread error recovery", path);
            }
          }

          // Make sure the event does not cross the chunk boundary by writing
          // a padding consisting of zeroes if it would.
          auto chunk1 = offset / chunkSize;
          auto chunk2 = (offset + EventSize.sizeof + data.length - 1) / chunkSize;

          if (chunk1 != chunk2) {
            // TODO: The C++ implementation refetches the offset here to »keep
            // in sync« – why would this be needed?
            auto padding = cast(size_t)
              ((((offset / chunkSize) + 1) * chunkSize) - offset);
            auto zeroes = new ubyte[padding];
            file.rawWrite(zeroes);
            unflushedByteCount += padding;
            offset += padding;
          }

          // TODO: 2 syscalls here, is this a problem performance-wise?
          // Probably abysmal performance on Windows due to rawWrite
          // implementation.
          uint len = cast(uint)data.length;
          file.rawWrite(cast(ubyte[])(&len)[0..1]);
          file.rawWrite(data);

          auto bytesWritten = EventSize.sizeof + data.length;
          unflushedByteCount += bytesWritten;
          offset += bytesWritten;
        }, (FlushBytesMessage msg) {
          maxFlushBytes = msg.value;
        }, (FlushIntervalMessage msg) {
          maxFlushInterval = msg.value;
        }, (IoErrorSleepDurationMessage msg) {
          ioErrorSleepDuration = msg.value;
        }, (FlushMessage msg, Tid tid) {
          forceFlush = true;
          flushRequestTid = tid;
        }, (OwnerTerminated msg) {
          shutdownRequested = true;
        }, (ShutdownMessage msg, Tid tid) {
          shutdownRequested = true;
          shutdownRequestTid = tid;
        }
      );

      if (errorOpening) continue;

      bool flush;
      if (forceFlush || shutdownRequested || unflushedByteCount > maxFlushBytes) {
        flush = true;
      } else if (cast(Duration)flushTimer.peek() > maxFlushInterval) {
        if (unflushedByteCount == 0) {
          // If the flush timer is due, but no data has been written, don't
          // needlessly fsync, but do reset the timer.
          flushTimer.reset();
        } else {
          flush = true;
        }
      }

      if (flush) {
        file.flush();
        flushTimer.reset();
        unflushedByteCount = 0;
        if (forceFlush) send(flushRequestTid, FlushMessage(), thisTid);
      }
    }

    file.close();

    if (shutdownRequestTid != Tid.init) {
      send(shutdownRequestTid, ShutdownMessage(), thisTid);
    }
  }
}

version (unittest) {
  import core.memory : GC;
  import std.file;
}

unittest {
  void tryRemove(string fileName) {
    try {
      remove(fileName);
    } catch (Exception) {}
  }

  immutable fileName = "unittest.dat.tmp";
  enforce(!exists(fileName), "Unit test output file " ~ fileName ~
    " already exists.");

  /*
   * Check the most basic reading/writing operations.
   */
  {
    scope (exit) tryRemove(fileName);

    auto writer = new TFileWriterTransport(fileName);
    writer.open();
    scope (exit) writer.close();

    writer.write([1, 2]);
    writer.write([3, 4]);
    writer.write([5, 6, 7]);
    writer.flush();

    auto reader = new TFileReaderTransport(fileName);
    reader.open();
    scope (exit) reader.close();

    auto buf = new ubyte[7];
    reader.readAll(buf);
    enforce(buf == [1, 2, 3, 4, 5, 6, 7]);
  }

  /*
   * Check that chunking works as expected.
   */
  {
    scope (exit) tryRemove(fileName);

    static assert(EventSize.sizeof == 4);
    enum CHUNK_SIZE = 10;

    // Write some contents to the file.
    {
      auto writer = new TFileWriterTransport(fileName);
      writer.chunkSize = CHUNK_SIZE;
      writer.open();
      scope (exit) writer.close();

      writer.write([0xde]);
      writer.write([0xad]);
      // Chunk boundary here.
      writer.write([0xbe]);
      // The next write doesn't fit in the five bytes remaining, so we expect
      // padding zero bytes to be written.
      writer.write([0xef, 0x12]);

      try {
        writer.write(new ubyte[CHUNK_SIZE]);
        enforce(false, "Could write event not fitting in a single chunk.");
      } catch (TTransportException e) {}

      writer.flush();
    }

    // Check the raw contents of the file to see if chunk padding was written
    // as expected.
    auto file = File(fileName, "r");
    enforce(file.size == 26);
    auto written = new ubyte[26];
    file.rawRead(written);
    enforce(written == [
      1, 0, 0, 0, 0xde,
      1, 0, 0, 0, 0xad,
      1, 0, 0, 0, 0xbe,
      0, 0, 0, 0, 0,
      2, 0, 0, 0, 0xef, 0x12
    ]);

    // Read the data back in, getting all the events at once.
    {
      auto reader = new TFileReaderTransport(fileName);
      reader.chunkSize = CHUNK_SIZE;
      reader.open();
      scope (exit) reader.close();

      auto buf = new ubyte[5];
      reader.readAll(buf);
      enforce(buf == [0xde, 0xad, 0xbe, 0xef, 0x12]);
    }
  }

  /*
   * Make sure that close() exits "quickly", i.e. that there is no problem
   * with the worker thread waking up.
   */
  {
    import std.conv : text;
    enum NUM_ITERATIONS = 1000;

    uint numOver = 0;
    foreach (n; 0 .. NUM_ITERATIONS) {
      scope (exit) tryRemove(fileName);

      auto transport = new TFileWriterTransport(fileName);
      transport.open();

      // Write something so that the writer thread gets started.
      transport.write(cast(ubyte[])"foo");

      // Every other iteration, also call flush(), just in case that potentially
      // has any effect on how the writer thread wakes up.
      if (n & 0x1) {
        transport.flush();
      }

      // Time the call to close().
      auto sw = StopWatch(AutoStart.yes);
      transport.close();
      sw.stop();

      // If any attempt takes more than 500ms, treat that as a fatal failure to
      // avoid looping over a potentially very slow operation.
      enforce(sw.peek().msecs < 500,
        text("close() took ", sw.peek().msecs, "ms."));

      // Normally, it takes less than 5ms on my dev box.
      // However, if the box is heavily loaded, some of the test runs can take
      // longer. Additionally, on a Windows Server 2008 instance running in
      // a VirtualBox VM, it has been observed that about a quarter of the runs
      // takes (217 ± 1) ms, for reasons not yet known.
      if (sw.peek().msecs > 5) {
        ++numOver;
      }

      // Force garbage collection runs every now and then to make sure we
      // don't run out of OS thread handles.
      if (!(n % 100)) GC.collect();
    }

    // Make sure fewer than a third of the runs took longer than 5ms.
    enforce(numOver < NUM_ITERATIONS / 3,
      text(numOver, " iterations took more than 10 ms."));
  }
}