DataFlow

class tfsnippet.dataflows.DataFlow

Bases: object

Data flows are objects for constructing mini-batch iterators.

There are two major types of DataFlow classes: data sources and data transformers. Data sources, like the ArrayFlow, produce mini-batches from underlying data sources. Data transformers, like MapperFlow, produce mini-batches by transforming arrays from the source.

All DataFlow subclasses shipped by tfsnippet.dataflows can be constructed by factory methods of this base class. For example:

# :class:`ArrayFlow` from arrays
array_flow = DataFlow.arrays([x, y], batch_size=256, shuffle=True)

# :class:`MapperFlow` by adding the two arrays from `array_flow`
mapper_flow = array_flow.map(lambda x, y: (x + y,))

Attributes Summary

current_batch Get the the result of current batch (last call to next_batch(), if the implicit iterator has been opened and the last call to next_batch() does not raise a StopIteration).

Methods Summary

arrays(arrays, batch_size[, shuffle, …]) Construct an ArrayFlow.
gather(flows) Gather multiple data flows into a single flow.
get_arrays() Iterate through the data-flow, collecting mini-batches into arrays.
iterator_factory(factory) Construct a IteratorFactoryFlow.
map(mapper[, array_indices]) Construct a MapperFlow.
next_batch() Get the arrays of next mini-batch from the implicit iterator.
select(indices) Construct a DataFlow, which selects and rearranges arrays in each mini-batch.
seq(start, stop[, step, batch_size, …]) Construct a SeqFlow.
threaded(prefetch) Construct a ThreadingFlow from this flow.
to_arrays_flow(batch_size[, shuffle, …]) Convert this data-flow to a ArrayFlow.

Attributes Documentation

current_batch

Get the the result of current batch (last call to next_batch(), if the implicit iterator has been opened and the last call to next_batch() does not raise a StopIteration).

Returns:The arrays of current batch.
Return type:tuple[np.ndarray] or None

Methods Documentation

static arrays(arrays, batch_size, shuffle=False, skip_incomplete=False, random_state=None)

Construct an ArrayFlow.

Parameters:
  • arrays – List of numpy-like arrays, to be iterated through mini-batches. These arrays should be at least 1-d, with identical first dimension.
  • batch_size (int) – Size of each mini-batch.
  • shuffle (bool) – Whether or not to shuffle data before iterating? (default False)
  • skip_incomplete (bool) – Whether or not to exclude the last mini-batch if it is incomplete? (default False)
  • random_state (RandomState) – Optional numpy RandomState for shuffling data before each epoch. (default None, construct a new RandomState).
Returns:

The data flow from arrays.

Return type:

tfsnippet.dataflow.ArrayFlow

static gather(flows)

Gather multiple data flows into a single flow.

Parameters:

flows (Iterable[DataFlow]) – The data flows to gather. At least one data flow should be specified, otherwise a ValueError will be raised.

Returns:

The gathered data flow.

Return type:

tfsnippet.dataflow.GatherFlow

Raises:
get_arrays()

Iterate through the data-flow, collecting mini-batches into arrays.

Returns:The collected arrays.
Return type:tuple[np.ndarray]
Raises:ValueError – If this data-flow is empty.
static iterator_factory(factory)

Construct a IteratorFactoryFlow.

Parameters:factory (() -> Iterator or Iterable) – A factory method for constructing the mini-batch iterators for each epoch.
Returns:The data flow.
Return type:tfsnippet.dataflow.IteratorFactoryFlow
map(mapper, array_indices=None)

Construct a MapperFlow.

Parameters:
  • mapper ((*np.ndarray) -> tuple[np.ndarray])) – The mapper function, which transforms numpy arrays into a tuple of other numpy arrays.
  • array_indices (int or Iterable[int]) –

    The indices of the arrays to be processed within a mini-batch.

    If specified, will apply the mapper only on these selected arrays. This will require the mapper to produce exactly the same number of output arrays as the inputs.

    If not specified, apply the mapper on all arrays, and do not require the number of output arrays to match the inputs.

Returns:

The data flow with mapper applied.

Return type:

tfsnippet.dataflow.MapperFlow

next_batch()

Get the arrays of next mini-batch from the implicit iterator.

Returns:The arrays of mini-batch.
Return type:tuple[np.ndarray]
Raises:StopIteration – If the implicit iterator is exhausted. Note that this error will only be triggered once at the end of an epoch. The next time calling this method, a new epoch will be opened.
select(indices)

Construct a DataFlow, which selects and rearranges arrays in each mini-batch. For example:

flow = DataFlow.arrays([x, y, z], batch_size=64)
flow.select([0, 2, 0])  # selects ``(x, z, x)`` in each mini-batch
Parameters:indices (Iterable[int]) – The indices of arrays to select.
Returns:The data flow with selected arrays in each mini-batch.
Return type:DataFlow
static seq(start, stop, step=1, batch_size=None, shuffle=False, skip_incomplete=False, dtype=<type 'numpy.int32'>, random_state=None)

Construct a SeqFlow.

Parameters:
  • start – The starting number of the sequence.
  • stop – The ending number of the sequence.
  • step – The step of the sequence. (default 1)
  • batch_size – Batch size of the data flow. Required.
  • shuffle (bool) – Whether or not to shuffle the numbers before iterating? (default False)
  • skip_incomplete (bool) – Whether or not to exclude the last mini-batch if it is incomplete? (default False)
  • dtype – Data type of the numbers. (default np.int32)
  • random_state (RandomState) – Optional numpy RandomState for shuffling data before each epoch. (default None, construct a new RandomState).
Returns:

The data flow from number sequence.

Return type:

tfsnippet.dataflow.SeqFlow

threaded(prefetch)

Construct a ThreadingFlow from this flow.

Parameters:prefetch (int) – Number of mini-batches to prefetch ahead. It should be at least 1.
Returns:
The background threaded
data flow to prefetch mini-batches from this flow.
Return type:tfsnippet.dataflow.ThreadingFlow
to_arrays_flow(batch_size, shuffle=False, skip_incomplete=False, random_state=None)

Convert this data-flow to a ArrayFlow.

This method will iterate through the data-flow, collecting mini-batches into arrays, and then construct an ArrayFlow.

Parameters:
  • batch_size (int) – Size of each mini-batch.
  • shuffle (bool) – Whether or not to shuffle data before iterating? (default False)
  • skip_incomplete (bool) – Whether or not to exclude the last mini-batch if it is incomplete? (default False)
  • random_state (RandomState) – Optional numpy RandomState for shuffling data before each epoch. (default None, construct a new RandomState).
Returns:

The constructed ArrayFlow.

Return type:

tfsnippet.dataflow.ArrayFlow