tfsnippet.dataflow¶
-
class
tfsnippet.dataflow.ArrayFlow(arrays, batch_size, shuffle=False, skip_incomplete=False, random_state=None)¶ Bases:
tfsnippet.dataflow.base.ExtraInfoDataFlowUsing numpy-like arrays as data source flow.
Usage:
array_flow = DataFlow.arrays([x, y], batch_size=256, shuffle=True, skip_incomplete=True) for batch_x, batch_y in array_flow: ...
-
__init__(arrays, batch_size, shuffle=False, skip_incomplete=False, random_state=None)¶ Construct an
ArrayFlow.Parameters: - arrays – List of numpy-like arrays, to be iterated through mini-batches. These arrays should be at least 1-d, with identical first dimension.
- batch_size (int) – Size of each mini-batch.
- shuffle (bool) – Whether or not to shuffle data before iterating?
(default
False) - skip_incomplete (bool) – Whether or not to exclude the last
mini-batch if it is incomplete? (default
False) - random_state (RandomState) – Optional numpy RandomState for
shuffling data before each epoch. (default
None, use the globalRandomState).
-
-
class
tfsnippet.dataflow.DataFlow¶ Bases:
objectData flows are objects for constructing mini-batch iterators.
There are two major types of
DataFlowclasses: data sources and data transformers. Data sources, like theArrayFlow, produce mini-batches from underlying data sources. Data transformers, likeMapperFlow, produce mini-batches by transforming arrays from the source.All
DataFlowsubclasses shipped bytfsnippet.dataflowcan be constructed by factory methods of this base class. For example:# :class:`ArrayFlow` from arrays array_flow = DataFlow.arrays([x, y], batch_size=256, shuffle=True) # :class:`MapperFlow` by adding the two arrays from `array_flow` mapper_flow = array_flow.map(lambda x, y: (x + y,))
-
__iter__()¶ Iterate through the mini-batches. Not reentrant.
Some subclasses may also inherit from
NoReentrantContext, thus a context must be firstly entered before using such data flows as iterators, for example:with DataFlow.threaded(...) as df: for epoch in epochs: for batch_x, batch_y in df: ...
Yields: tuple[np.ndarray] –
- Mini-batches of tuples of numpy arrays.
The arrays might be read-only.
-
_minibatch_iterator()¶ Get the mini-batch iterator. Subclasses should override this to implement the data flow.
Yields: tuple[np.ndarray] –
- Mini-batches of tuples of numpy arrays.
The arrays might be read-only.
-
static
arrays(arrays, batch_size, shuffle=False, skip_incomplete=False, random_state=None)¶ Construct an
ArrayFlow.Parameters: - arrays – List of numpy-like arrays, to be iterated through mini-batches. These arrays should be at least 1-d, with identical first dimension.
- batch_size (int) – Size of each mini-batch.
- shuffle (bool) – Whether or not to shuffle data before iterating?
(default
False) - skip_incomplete (bool) – Whether or not to exclude the last
mini-batch if it is incomplete? (default
False) - random_state (RandomState) – Optional numpy RandomState for
shuffling data before each epoch. (default
None, use the globalRandomState).
Returns: The data flow from arrays.
Return type:
-
current_batch¶ Get the the result of current batch (last call to
next_batch(), if the implicit iterator has been opened and the last call tonext_batch()does not raise aStopIteration).Returns: The arrays of current batch. Return type: tuple[np.ndarray] or None
-
static
gather(flows)¶ Gather multiple data flows into a single flow.
Parameters: flows (Iterable[DataFlow]) – The data flows to gather. At least one data flow should be specified, otherwise a
ValueErrorwill be raised.Returns: The gathered data flow.
Return type: Raises: ValueError– If not even one data flow is specified.TypeError– If a specified flow is not aDataFlow.
-
get_arrays()¶ Iterate through the data-flow, collecting mini-batches into arrays.
Returns: The collected arrays. Return type: tuple[np.ndarray] Raises: ValueError– If this data-flow is empty.
-
static
iterator_factory(factory)¶ Construct a
IteratorFactoryFlow.Parameters: factory (() -> Iterator or Iterable) – A factory method for constructing the mini-batch iterators for each epoch. Returns: The data flow. Return type: tfsnippet.dataflow.IteratorFactoryFlow
-
map(mapper, array_indices=None)¶ Construct a
MapperFlow.Parameters: - mapper ((*np.ndarray) -> tuple[np.ndarray])) – The mapper function, which transforms numpy arrays into a tuple of other numpy arrays.
- array_indices (int or Iterable[int]) –
The indices of the arrays to be processed within a mini-batch.
If specified, will apply the mapper only on these selected arrays. This will require the mapper to produce exactly the same number of output arrays as the inputs.
If not specified, apply the mapper on all arrays, and do not require the number of output arrays to match the inputs.
Returns: The data flow with mapper applied.
Return type:
-
next_batch()¶ Get the arrays of next mini-batch from the implicit iterator.
Returns: The arrays of mini-batch. Return type: tuple[np.ndarray] Raises: StopIteration– If the implicit iterator is exhausted. Note that this error will only be triggered once at the end of an epoch. The next time calling this method, a new epoch will be opened.
-
select(indices)¶ Construct a
DataFlow, which selects and rearranges arrays in each mini-batch. For example:flow = DataFlow.arrays([x, y, z], batch_size=64) flow.select([0, 2, 0]) # selects ``(x, z, x)`` in each mini-batch
Parameters: indices (Iterable[int]) – The indices of arrays to select. Returns: The data flow with selected arrays in each mini-batch. Return type: DataFlow
-
static
seq(start, stop, step=1, batch_size=None, shuffle=False, skip_incomplete=False, dtype=<type 'numpy.int32'>, random_state=None)¶ Construct a
SeqFlow.Parameters: - start – The starting number of the sequence.
- stop – The ending number of the sequence.
- step – The step of the sequence. (default
1) - batch_size – Batch size of the data flow. Required.
- shuffle (bool) – Whether or not to shuffle the numbers before
iterating? (default
False) - skip_incomplete (bool) – Whether or not to exclude the last
mini-batch if it is incomplete? (default
False) - dtype – Data type of the numbers. (default
np.int32) - random_state (RandomState) – Optional numpy RandomState for
shuffling data before each epoch. (default
None, use the globalRandomState).
Returns: The data flow from number sequence.
Return type:
-
threaded(prefetch)¶ Construct a
ThreadingFlowfrom this flow.Parameters: prefetch (int) – Number of mini-batches to prefetch ahead. It should be at least 1. Returns: - The background threaded
- data flow to prefetch mini-batches from this flow.
Return type: tfsnippet.dataflow.ThreadingFlow
-
to_arrays_flow(batch_size, shuffle=False, skip_incomplete=False, random_state=None)¶ Convert this data-flow to a
ArrayFlow.This method will iterate through the data-flow, collecting mini-batches into arrays, and then construct an ArrayFlow.
Parameters: - batch_size (int) – Size of each mini-batch.
- shuffle (bool) – Whether or not to shuffle data before iterating?
(default
False) - skip_incomplete (bool) – Whether or not to exclude the last
mini-batch if it is incomplete? (default
False) - random_state (RandomState) – Optional numpy RandomState for
shuffling data before each epoch. (default
None, use the globalRandomState).
Returns: The constructed ArrayFlow.
Return type:
-
-
class
tfsnippet.dataflow.ExtraInfoDataFlow(array_count, data_length, data_shapes, batch_size, skip_incomplete, is_shuffled)¶ Bases:
tfsnippet.dataflow.base.DataFlowBase class for
DataFlowsubclasses with auxiliary information about the mini-batches.-
__init__(array_count, data_length, data_shapes, batch_size, skip_incomplete, is_shuffled)¶ Construct an
ExtraInfoDataFlow.Parameters: - array_count (int) – The count of arrays in each mini-batch.
- data_length (int) – The total length of the data.
- data_shapes (tuple[tuple[int]]) – The shapes of data in a mini-batch. The batch dimension is not included.
- batch_size (int) – Size of each mini-batch.
- skip_incomplete (bool) – Whether or not to exclude the last mini-batch if it is incomplete?
- is_shuffled (bool) – Whether or not the data are first shuffled before iterated through mini-batches?
-
array_count¶ Get the count of arrays in each mini-batch.
Returns: The count of arrays in each mini-batch. Return type: int
-
data_length¶ Get the total length of the data.
Returns: The total length of the data. Return type: int
-
data_shapes¶ Get the shapes of the data in each mini-batch.
Returns: - The shapes of data in a mini-batch.
- The batch dimension is not included.
Return type: tuple[tuple[int]]
-
is_shuffled¶ Whether or not the data are first shuffled before iterated through mini-batches?
-
skip_incomplete¶ Whether or not to exclude the last mini-batch if it is incomplete?
-
-
class
tfsnippet.dataflow.DataMapper¶ Bases:
objectBase class for all data mappers.
A
DataMapperis a callable object, which maps input arrays into outputs arrays. Instances ofDataMapperare usually used as themapperof atfsnippet.dataflow.MapperFlow.-
__call__(*arrays)¶ Transform the input arrays into outputs.
Parameters: *arrays – Arrays to be transformed. Returns: The output arrays. Return type: tuple[np.ndarray]
-
_transform(*args)¶ Subclasses should override this to implement the transformation.
-
-
class
tfsnippet.dataflow.SlidingWindow(data_array, window_size)¶ Bases:
tfsnippet.dataflow.data_mappers.DataMapperDataMapperfor producing sliding windows according to indices.Usage:
data = np.arange(1000) sw = SlidingWindow(data, window_size=100) # construct a DataFlow from this SlidingWindow sw_flow = sw.as_flow(batch_size=64) # or equivalently sw_flow = DataFlow.seq( 0, len(data) - sw.window_size + 1, batch_size=64).map(sw)
-
__init__(data_array, window_size)¶ Construct a
SlidingWindow.Parameters: - data_array (np.ndarray) – The array from which to extract sliding windows.
- window_size (int) – Size of each window.
-
_transform(indices)¶ Subclasses should override this to implement the transformation.
-
as_flow(batch_size, shuffle=False, skip_incomplete=False)¶ Get a
DataFlowwhich iterates through mini-batches of sliding windows upondata_array.Parameters: Returns: The data flow for sliding windows.
Return type:
-
data_array¶ Get the data array.
-
window_size¶ Get the window size.
-
-
class
tfsnippet.dataflow.GatherFlow(flows)¶ Bases:
tfsnippet.dataflow.base.DataFlowGathering multiple data flows into a single flow.
Usage:
x_flow = DataFlow.arrays([x], batch_size=256) y_flow = DataFlow.arrays([y], batch_size=256) xy_flow = DataFlow.gather([x_flow, y_flow])
-
__init__(flows)¶ Construct an
IteratorFlow.Parameters: flows (Iterable[DataFlow]) – The data flows to gather. At least one data flow should be specified, otherwise a
ValueErrorwill be raised.Raises: ValueError– If not even one data flow is specified.TypeError– If a specified flow is not aDataFlow.
-
-
class
tfsnippet.dataflow.IteratorFactoryFlow(factory)¶ Bases:
tfsnippet.dataflow.base.DataFlowData flow constructed from an iterator factory.
Usage:
x_flow = DataFlow.arrays([x], batch_size=256) y_flow = DataFlow.arrays([y], batch_size=256) xy_flow = DataFlow.iterator_factory(lambda: ( (x, y) for (x,), (y,) in zip(x_flow, y_flow) ))
-
__init__(factory)¶ Construct an
IteratorFlow.Parameters: factory (() -> Iterator or Iterable) – A factory method for constructing the mini-batch iterators for each epoch.
-
-
class
tfsnippet.dataflow.MapperFlow(source, mapper, array_indices=None)¶ Bases:
tfsnippet.dataflow.base.DataFlowData flow which transforms the mini-batch arrays from source flow by a specified mapper function.
Usage:
source_flow = Data.arrays([x, y], batch_size=256) mapper_flow = source_flow.map(lambda x, y: (x + y,))
-
__init__(source, mapper, array_indices=None)¶ Construct a
MapperFlow.Parameters: - source (DataFlow) – The source data flow.
- mapper ((*np.ndarray) -> tuple[np.ndarray])) – The mapper function, which transforms numpy arrays into a tuple of other numpy arrays.
- array_indices (int or Iterable[int]) –
The indices of the arrays to be processed within a mini-batch.
If specified, will apply the mapper only on these selected arrays. This will require the mapper to produce exactly the same number of output arrays as the inputs.
If not specified, apply the mapper on all arrays, and do not require the number of output arrays to match the inputs.
-
array_indices¶ Get the indices of the arrays to be processed.
-
source¶ Get the source data flow.
-
-
class
tfsnippet.dataflow.SeqFlow(start, stop, step=1, batch_size=None, shuffle=False, skip_incomplete=False, dtype=<type 'numpy.int32'>, random_state=None)¶ Bases:
tfsnippet.dataflow.array_flow.ArrayFlowUsing number sequence as data source flow.
This
SeqFlowis particularly used for generating the seed number indices, then fetch the actual data byMapperFlowaccording to the seed numbers.Usage:
seq_flow = DataFlow.seq(0, len(x), batch_size=256) mapper_flow = seq_flow.map(lambda idx: np.stack( [fetch_data_by_index(i) for i in idx] ))
-
__init__(start, stop, step=1, batch_size=None, shuffle=False, skip_incomplete=False, dtype=<type 'numpy.int32'>, random_state=None)¶ Construct a
SeqFlow.Parameters: - start – The starting number of the sequence.
- stop – The ending number of the sequence.
- step – The step of the sequence. (default
1) - batch_size – Batch size of the data flow. Required.
- shuffle (bool) – Whether or not to shuffle the numbers before
iterating? (default
False) - skip_incomplete (bool) – Whether or not to exclude the last
mini-batch if it is incomplete? (default
False) - dtype – Data type of the numbers. (default
np.int32) - random_state (RandomState) – Optional numpy RandomState for
shuffling data before each epoch. (default
None, use the globalRandomState).
-
start¶ Get the starting number of the sequence.
-
step¶ Get the step of the sequence.
-
stop¶ Get the ending number of the sequence.
-
-
class
tfsnippet.dataflow.ThreadingFlow(source, prefetch)¶ Bases:
tfsnippet.dataflow.base.DataFlow,tfsnippet.utils.concepts.AutoInitAndCloseableData flow to prefetch from the source data flow in a background thread.
Usage:
array_flow = DataFlow.arrays([x, y], batch_size=256) with array_flow.threaded(prefetch=5) as df: for epoch in epochs: for batch_x, batch_y in df: ...
-
EPOCH_END= <object object>¶ Object to mark an ending position of an epoch.
-
__init__(source, prefetch)¶ Construct a
ThreadingFlow.Parameters:
-
prefetch_num¶ Get the number of batches to prefetch.
-
source¶ Get the source data flow.
-