tfsnippet.dataflow

class tfsnippet.dataflow.ArrayFlow(arrays, batch_size, shuffle=False, skip_incomplete=False, random_state=None)

Bases: tfsnippet.dataflow.base.ExtraInfoDataFlow

Using numpy-like arrays as data source flow.

Usage:

array_flow = DataFlow.arrays([x, y], batch_size=256, shuffle=True,
                             skip_incomplete=True)
for batch_x, batch_y in array_flow:
    ...
__init__(arrays, batch_size, shuffle=False, skip_incomplete=False, random_state=None)

Construct an ArrayFlow.

Parameters:
  • arrays – List of numpy-like arrays, to be iterated through mini-batches. These arrays should be at least 1-d, with identical first dimension.
  • batch_size (int) – Size of each mini-batch.
  • shuffle (bool) – Whether or not to shuffle data before iterating? (default False)
  • skip_incomplete (bool) – Whether or not to exclude the last mini-batch if it is incomplete? (default False)
  • random_state (RandomState) – Optional numpy RandomState for shuffling data before each epoch. (default None, use the global RandomState).
the_arrays

Get the tuple of arrays accessed by this ArrayFlow.

class tfsnippet.dataflow.DataFlow

Bases: object

Data flows are objects for constructing mini-batch iterators.

There are two major types of DataFlow classes: data sources and data transformers. Data sources, like the ArrayFlow, produce mini-batches from underlying data sources. Data transformers, like MapperFlow, produce mini-batches by transforming arrays from the source.

All DataFlow subclasses shipped by tfsnippet.dataflow can be constructed by factory methods of this base class. For example:

# :class:`ArrayFlow` from arrays
array_flow = DataFlow.arrays([x, y], batch_size=256, shuffle=True)

# :class:`MapperFlow` by adding the two arrays from `array_flow`
mapper_flow = array_flow.map(lambda x, y: (x + y,))
__iter__()

Iterate through the mini-batches. Not reentrant.

Some subclasses may also inherit from NoReentrantContext, thus a context must be firstly entered before using such data flows as iterators, for example:

with DataFlow.threaded(...) as df:
    for epoch in epochs:
        for batch_x, batch_y in df:
            ...
Yields:

tuple[np.ndarray]

Mini-batches of tuples of numpy arrays.

The arrays might be read-only.

_minibatch_iterator()

Get the mini-batch iterator. Subclasses should override this to implement the data flow.

Yields:

tuple[np.ndarray]

Mini-batches of tuples of numpy arrays.

The arrays might be read-only.

static arrays(arrays, batch_size, shuffle=False, skip_incomplete=False, random_state=None)

Construct an ArrayFlow.

Parameters:
  • arrays – List of numpy-like arrays, to be iterated through mini-batches. These arrays should be at least 1-d, with identical first dimension.
  • batch_size (int) – Size of each mini-batch.
  • shuffle (bool) – Whether or not to shuffle data before iterating? (default False)
  • skip_incomplete (bool) – Whether or not to exclude the last mini-batch if it is incomplete? (default False)
  • random_state (RandomState) – Optional numpy RandomState for shuffling data before each epoch. (default None, use the global RandomState).
Returns:

The data flow from arrays.

Return type:

tfsnippet.dataflow.ArrayFlow

current_batch

Get the the result of current batch (last call to next_batch(), if the implicit iterator has been opened and the last call to next_batch() does not raise a StopIteration).

Returns:The arrays of current batch.
Return type:tuple[np.ndarray] or None
static gather(flows)

Gather multiple data flows into a single flow.

Parameters:

flows (Iterable[DataFlow]) – The data flows to gather. At least one data flow should be specified, otherwise a ValueError will be raised.

Returns:

The gathered data flow.

Return type:

tfsnippet.dataflow.GatherFlow

Raises:
get_arrays()

Iterate through the data-flow, collecting mini-batches into arrays.

Returns:The collected arrays.
Return type:tuple[np.ndarray]
Raises:ValueError – If this data-flow is empty.
static iterator_factory(factory)

Construct a IteratorFactoryFlow.

Parameters:factory (() -> Iterator or Iterable) – A factory method for constructing the mini-batch iterators for each epoch.
Returns:The data flow.
Return type:tfsnippet.dataflow.IteratorFactoryFlow
map(mapper, array_indices=None)

Construct a MapperFlow.

Parameters:
  • mapper ((*np.ndarray) -> tuple[np.ndarray])) – The mapper function, which transforms numpy arrays into a tuple of other numpy arrays.
  • array_indices (int or Iterable[int]) –

    The indices of the arrays to be processed within a mini-batch.

    If specified, will apply the mapper only on these selected arrays. This will require the mapper to produce exactly the same number of output arrays as the inputs.

    If not specified, apply the mapper on all arrays, and do not require the number of output arrays to match the inputs.

Returns:

The data flow with mapper applied.

Return type:

tfsnippet.dataflow.MapperFlow

next_batch()

Get the arrays of next mini-batch from the implicit iterator.

Returns:The arrays of mini-batch.
Return type:tuple[np.ndarray]
Raises:StopIteration – If the implicit iterator is exhausted. Note that this error will only be triggered once at the end of an epoch. The next time calling this method, a new epoch will be opened.
select(indices)

Construct a DataFlow, which selects and rearranges arrays in each mini-batch. For example:

flow = DataFlow.arrays([x, y, z], batch_size=64)
flow.select([0, 2, 0])  # selects ``(x, z, x)`` in each mini-batch
Parameters:indices (Iterable[int]) – The indices of arrays to select.
Returns:The data flow with selected arrays in each mini-batch.
Return type:DataFlow
static seq(start, stop, step=1, batch_size=None, shuffle=False, skip_incomplete=False, dtype=<type 'numpy.int32'>, random_state=None)

Construct a SeqFlow.

Parameters:
  • start – The starting number of the sequence.
  • stop – The ending number of the sequence.
  • step – The step of the sequence. (default 1)
  • batch_size – Batch size of the data flow. Required.
  • shuffle (bool) – Whether or not to shuffle the numbers before iterating? (default False)
  • skip_incomplete (bool) – Whether or not to exclude the last mini-batch if it is incomplete? (default False)
  • dtype – Data type of the numbers. (default np.int32)
  • random_state (RandomState) – Optional numpy RandomState for shuffling data before each epoch. (default None, use the global RandomState).
Returns:

The data flow from number sequence.

Return type:

tfsnippet.dataflow.SeqFlow

threaded(prefetch)

Construct a ThreadingFlow from this flow.

Parameters:prefetch (int) – Number of mini-batches to prefetch ahead. It should be at least 1.
Returns:
The background threaded
data flow to prefetch mini-batches from this flow.
Return type:tfsnippet.dataflow.ThreadingFlow
to_arrays_flow(batch_size, shuffle=False, skip_incomplete=False, random_state=None)

Convert this data-flow to a ArrayFlow.

This method will iterate through the data-flow, collecting mini-batches into arrays, and then construct an ArrayFlow.

Parameters:
  • batch_size (int) – Size of each mini-batch.
  • shuffle (bool) – Whether or not to shuffle data before iterating? (default False)
  • skip_incomplete (bool) – Whether or not to exclude the last mini-batch if it is incomplete? (default False)
  • random_state (RandomState) – Optional numpy RandomState for shuffling data before each epoch. (default None, use the global RandomState).
Returns:

The constructed ArrayFlow.

Return type:

tfsnippet.dataflow.ArrayFlow

class tfsnippet.dataflow.ExtraInfoDataFlow(array_count, data_length, data_shapes, batch_size, skip_incomplete, is_shuffled)

Bases: tfsnippet.dataflow.base.DataFlow

Base class for DataFlow subclasses with auxiliary information about the mini-batches.

__init__(array_count, data_length, data_shapes, batch_size, skip_incomplete, is_shuffled)

Construct an ExtraInfoDataFlow.

Parameters:
  • array_count (int) – The count of arrays in each mini-batch.
  • data_length (int) – The total length of the data.
  • data_shapes (tuple[tuple[int]]) – The shapes of data in a mini-batch. The batch dimension is not included.
  • batch_size (int) – Size of each mini-batch.
  • skip_incomplete (bool) – Whether or not to exclude the last mini-batch if it is incomplete?
  • is_shuffled (bool) – Whether or not the data are first shuffled before iterated through mini-batches?
array_count

Get the count of arrays in each mini-batch.

Returns:The count of arrays in each mini-batch.
Return type:int
batch_size

Get the size of each mini-batch.

Returns:The size of each mini-batch.
Return type:int
data_length

Get the total length of the data.

Returns:The total length of the data.
Return type:int
data_shapes

Get the shapes of the data in each mini-batch.

Returns:
The shapes of data in a mini-batch.
The batch dimension is not included.
Return type:tuple[tuple[int]]
is_shuffled

Whether or not the data are first shuffled before iterated through mini-batches?

skip_incomplete

Whether or not to exclude the last mini-batch if it is incomplete?

class tfsnippet.dataflow.DataMapper

Bases: object

Base class for all data mappers.

A DataMapper is a callable object, which maps input arrays into outputs arrays. Instances of DataMapper are usually used as the mapper of a tfsnippet.dataflow.MapperFlow.

__call__(*arrays)

Transform the input arrays into outputs.

Parameters:*arrays – Arrays to be transformed.
Returns:The output arrays.
Return type:tuple[np.ndarray]
_transform(*args)

Subclasses should override this to implement the transformation.

class tfsnippet.dataflow.SlidingWindow(data_array, window_size)

Bases: tfsnippet.dataflow.data_mappers.DataMapper

DataMapper for producing sliding windows according to indices.

Usage:

data = np.arange(1000)
sw = SlidingWindow(data, window_size=100)

# construct a DataFlow from this SlidingWindow
sw_flow = sw.as_flow(batch_size=64)
# or equivalently
sw_flow = DataFlow.seq(
    0, len(data) - sw.window_size + 1, batch_size=64).map(sw)
__init__(data_array, window_size)

Construct a SlidingWindow.

Parameters:
  • data_array (np.ndarray) – The array from which to extract sliding windows.
  • window_size (int) – Size of each window.
_transform(indices)

Subclasses should override this to implement the transformation.

as_flow(batch_size, shuffle=False, skip_incomplete=False)

Get a DataFlow which iterates through mini-batches of sliding windows upon data_array.

Parameters:
  • batch_size (int) – Batch size of the data flow. Required.
  • shuffle (bool) – Whether or not to shuffle the numbers before iterating? (default False)
  • skip_incomplete (bool) – Whether or not to exclude the last mini-batch if it is incomplete? (default False)
Returns:

The data flow for sliding windows.

Return type:

DataFlow

data_array

Get the data array.

window_size

Get the window size.

class tfsnippet.dataflow.GatherFlow(flows)

Bases: tfsnippet.dataflow.base.DataFlow

Gathering multiple data flows into a single flow.

Usage:

x_flow = DataFlow.arrays([x], batch_size=256)
y_flow = DataFlow.arrays([y], batch_size=256)
xy_flow = DataFlow.gather([x_flow, y_flow])
__init__(flows)

Construct an IteratorFlow.

Parameters:

flows (Iterable[DataFlow]) – The data flows to gather. At least one data flow should be specified, otherwise a ValueError will be raised.

Raises:
flows

Get the data flows to be gathered.

Returns:The data flows to be gathered.
Return type:tuple[DataFlow]
class tfsnippet.dataflow.IteratorFactoryFlow(factory)

Bases: tfsnippet.dataflow.base.DataFlow

Data flow constructed from an iterator factory.

Usage:

x_flow = DataFlow.arrays([x], batch_size=256)
y_flow = DataFlow.arrays([y], batch_size=256)
xy_flow = DataFlow.iterator_factory(lambda: (
    (x, y) for (x,), (y,) in zip(x_flow, y_flow)
))
__init__(factory)

Construct an IteratorFlow.

Parameters:factory (() -> Iterator or Iterable) – A factory method for constructing the mini-batch iterators for each epoch.
class tfsnippet.dataflow.MapperFlow(source, mapper, array_indices=None)

Bases: tfsnippet.dataflow.base.DataFlow

Data flow which transforms the mini-batch arrays from source flow by a specified mapper function.

Usage:

source_flow = Data.arrays([x, y], batch_size=256)
mapper_flow = source_flow.map(lambda x, y: (x + y,))
__init__(source, mapper, array_indices=None)

Construct a MapperFlow.

Parameters:
  • source (DataFlow) – The source data flow.
  • mapper ((*np.ndarray) -> tuple[np.ndarray])) – The mapper function, which transforms numpy arrays into a tuple of other numpy arrays.
  • array_indices (int or Iterable[int]) –

    The indices of the arrays to be processed within a mini-batch.

    If specified, will apply the mapper only on these selected arrays. This will require the mapper to produce exactly the same number of output arrays as the inputs.

    If not specified, apply the mapper on all arrays, and do not require the number of output arrays to match the inputs.

array_indices

Get the indices of the arrays to be processed.

source

Get the source data flow.

class tfsnippet.dataflow.SeqFlow(start, stop, step=1, batch_size=None, shuffle=False, skip_incomplete=False, dtype=<type 'numpy.int32'>, random_state=None)

Bases: tfsnippet.dataflow.array_flow.ArrayFlow

Using number sequence as data source flow.

This SeqFlow is particularly used for generating the seed number indices, then fetch the actual data by MapperFlow according to the seed numbers.

Usage:

seq_flow = DataFlow.seq(0, len(x), batch_size=256)
mapper_flow = seq_flow.map(lambda idx: np.stack(
    [fetch_data_by_index(i) for i in idx]
))
__init__(start, stop, step=1, batch_size=None, shuffle=False, skip_incomplete=False, dtype=<type 'numpy.int32'>, random_state=None)

Construct a SeqFlow.

Parameters:
  • start – The starting number of the sequence.
  • stop – The ending number of the sequence.
  • step – The step of the sequence. (default 1)
  • batch_size – Batch size of the data flow. Required.
  • shuffle (bool) – Whether or not to shuffle the numbers before iterating? (default False)
  • skip_incomplete (bool) – Whether or not to exclude the last mini-batch if it is incomplete? (default False)
  • dtype – Data type of the numbers. (default np.int32)
  • random_state (RandomState) – Optional numpy RandomState for shuffling data before each epoch. (default None, use the global RandomState).
start

Get the starting number of the sequence.

step

Get the step of the sequence.

stop

Get the ending number of the sequence.

class tfsnippet.dataflow.ThreadingFlow(source, prefetch)

Bases: tfsnippet.dataflow.base.DataFlow, tfsnippet.utils.concepts.AutoInitAndCloseable

Data flow to prefetch from the source data flow in a background thread.

Usage:

array_flow = DataFlow.arrays([x, y], batch_size=256)
with array_flow.threaded(prefetch=5) as df:
    for epoch in epochs:
        for batch_x, batch_y in df:
            ...
EPOCH_END = <object object>

Object to mark an ending position of an epoch.

__init__(source, prefetch)

Construct a ThreadingFlow.

Parameters:
  • source (DataFlow) – The source data flow.
  • prefetch (int) – Number of mini-batches to prefetch ahead. It should be at least 1.
prefetch_num

Get the number of batches to prefetch.

source

Get the source data flow.