tfsnippet.dataflow¶
-
class
tfsnippet.dataflow.
ArrayFlow
(arrays, batch_size, shuffle=False, skip_incomplete=False, random_state=None)¶ Bases:
tfsnippet.dataflow.base.ExtraInfoDataFlow
Using numpy-like arrays as data source flow.
Usage:
array_flow = DataFlow.arrays([x, y], batch_size=256, shuffle=True, skip_incomplete=True) for batch_x, batch_y in array_flow: ...
-
__init__
(arrays, batch_size, shuffle=False, skip_incomplete=False, random_state=None)¶ Construct an
ArrayFlow
.Parameters: - arrays – List of numpy-like arrays, to be iterated through mini-batches. These arrays should be at least 1-d, with identical first dimension.
- batch_size (int) – Size of each mini-batch.
- shuffle (bool) – Whether or not to shuffle data before iterating?
(default
False
) - skip_incomplete (bool) – Whether or not to exclude the last
mini-batch if it is incomplete? (default
False
) - random_state (RandomState) – Optional numpy RandomState for
shuffling data before each epoch. (default
None
, use the globalRandomState
).
-
-
class
tfsnippet.dataflow.
DataFlow
¶ Bases:
object
Data flows are objects for constructing mini-batch iterators.
There are two major types of
DataFlow
classes: data sources and data transformers. Data sources, like theArrayFlow
, produce mini-batches from underlying data sources. Data transformers, likeMapperFlow
, produce mini-batches by transforming arrays from the source.All
DataFlow
subclasses shipped bytfsnippet.dataflow
can be constructed by factory methods of this base class. For example:# :class:`ArrayFlow` from arrays array_flow = DataFlow.arrays([x, y], batch_size=256, shuffle=True) # :class:`MapperFlow` by adding the two arrays from `array_flow` mapper_flow = array_flow.map(lambda x, y: (x + y,))
-
__iter__
()¶ Iterate through the mini-batches. Not reentrant.
Some subclasses may also inherit from
NoReentrantContext
, thus a context must be firstly entered before using such data flows as iterators, for example:with DataFlow.threaded(...) as df: for epoch in epochs: for batch_x, batch_y in df: ...
Yields: tuple[np.ndarray] –
- Mini-batches of tuples of numpy arrays.
The arrays might be read-only.
-
_minibatch_iterator
()¶ Get the mini-batch iterator. Subclasses should override this to implement the data flow.
Yields: tuple[np.ndarray] –
- Mini-batches of tuples of numpy arrays.
The arrays might be read-only.
-
static
arrays
(arrays, batch_size, shuffle=False, skip_incomplete=False, random_state=None)¶ Construct an
ArrayFlow
.Parameters: - arrays – List of numpy-like arrays, to be iterated through mini-batches. These arrays should be at least 1-d, with identical first dimension.
- batch_size (int) – Size of each mini-batch.
- shuffle (bool) – Whether or not to shuffle data before iterating?
(default
False
) - skip_incomplete (bool) – Whether or not to exclude the last
mini-batch if it is incomplete? (default
False
) - random_state (RandomState) – Optional numpy RandomState for
shuffling data before each epoch. (default
None
, use the globalRandomState
).
Returns: The data flow from arrays.
Return type:
-
current_batch
¶ Get the the result of current batch (last call to
next_batch()
, if the implicit iterator has been opened and the last call tonext_batch()
does not raise aStopIteration
).Returns: The arrays of current batch. Return type: tuple[np.ndarray] or None
-
static
gather
(flows)¶ Gather multiple data flows into a single flow.
Parameters: flows (Iterable[DataFlow]) – The data flows to gather. At least one data flow should be specified, otherwise a
ValueError
will be raised.Returns: The gathered data flow.
Return type: Raises: ValueError
– If not even one data flow is specified.TypeError
– If a specified flow is not aDataFlow
.
-
get_arrays
()¶ Iterate through the data-flow, collecting mini-batches into arrays.
Returns: The collected arrays. Return type: tuple[np.ndarray] Raises: ValueError
– If this data-flow is empty.
-
static
iterator_factory
(factory)¶ Construct a
IteratorFactoryFlow
.Parameters: factory (() -> Iterator or Iterable) – A factory method for constructing the mini-batch iterators for each epoch. Returns: The data flow. Return type: tfsnippet.dataflow.IteratorFactoryFlow
-
map
(mapper, array_indices=None)¶ Construct a
MapperFlow
.Parameters: - mapper ((*np.ndarray) -> tuple[np.ndarray])) – The mapper function, which transforms numpy arrays into a tuple of other numpy arrays.
- array_indices (int or Iterable[int]) –
The indices of the arrays to be processed within a mini-batch.
If specified, will apply the mapper only on these selected arrays. This will require the mapper to produce exactly the same number of output arrays as the inputs.
If not specified, apply the mapper on all arrays, and do not require the number of output arrays to match the inputs.
Returns: The data flow with mapper applied.
Return type:
-
next_batch
()¶ Get the arrays of next mini-batch from the implicit iterator.
Returns: The arrays of mini-batch. Return type: tuple[np.ndarray] Raises: StopIteration
– If the implicit iterator is exhausted. Note that this error will only be triggered once at the end of an epoch. The next time calling this method, a new epoch will be opened.
-
select
(indices)¶ Construct a
DataFlow
, which selects and rearranges arrays in each mini-batch. For example:flow = DataFlow.arrays([x, y, z], batch_size=64) flow.select([0, 2, 0]) # selects ``(x, z, x)`` in each mini-batch
Parameters: indices (Iterable[int]) – The indices of arrays to select. Returns: The data flow with selected arrays in each mini-batch. Return type: DataFlow
-
static
seq
(start, stop, step=1, batch_size=None, shuffle=False, skip_incomplete=False, dtype=<type 'numpy.int32'>, random_state=None)¶ Construct a
SeqFlow
.Parameters: - start – The starting number of the sequence.
- stop – The ending number of the sequence.
- step – The step of the sequence. (default
1
) - batch_size – Batch size of the data flow. Required.
- shuffle (bool) – Whether or not to shuffle the numbers before
iterating? (default
False
) - skip_incomplete (bool) – Whether or not to exclude the last
mini-batch if it is incomplete? (default
False
) - dtype – Data type of the numbers. (default
np.int32
) - random_state (RandomState) – Optional numpy RandomState for
shuffling data before each epoch. (default
None
, use the globalRandomState
).
Returns: The data flow from number sequence.
Return type:
-
threaded
(prefetch)¶ Construct a
ThreadingFlow
from this flow.Parameters: prefetch (int) – Number of mini-batches to prefetch ahead. It should be at least 1. Returns: - The background threaded
- data flow to prefetch mini-batches from this flow.
Return type: tfsnippet.dataflow.ThreadingFlow
-
to_arrays_flow
(batch_size, shuffle=False, skip_incomplete=False, random_state=None)¶ Convert this data-flow to a
ArrayFlow
.This method will iterate through the data-flow, collecting mini-batches into arrays, and then construct an ArrayFlow.
Parameters: - batch_size (int) – Size of each mini-batch.
- shuffle (bool) – Whether or not to shuffle data before iterating?
(default
False
) - skip_incomplete (bool) – Whether or not to exclude the last
mini-batch if it is incomplete? (default
False
) - random_state (RandomState) – Optional numpy RandomState for
shuffling data before each epoch. (default
None
, use the globalRandomState
).
Returns: The constructed ArrayFlow.
Return type:
-
-
class
tfsnippet.dataflow.
ExtraInfoDataFlow
(array_count, data_length, data_shapes, batch_size, skip_incomplete, is_shuffled)¶ Bases:
tfsnippet.dataflow.base.DataFlow
Base class for
DataFlow
subclasses with auxiliary information about the mini-batches.-
__init__
(array_count, data_length, data_shapes, batch_size, skip_incomplete, is_shuffled)¶ Construct an
ExtraInfoDataFlow
.Parameters: - array_count (int) – The count of arrays in each mini-batch.
- data_length (int) – The total length of the data.
- data_shapes (tuple[tuple[int]]) – The shapes of data in a mini-batch. The batch dimension is not included.
- batch_size (int) – Size of each mini-batch.
- skip_incomplete (bool) – Whether or not to exclude the last mini-batch if it is incomplete?
- is_shuffled (bool) – Whether or not the data are first shuffled before iterated through mini-batches?
-
array_count
¶ Get the count of arrays in each mini-batch.
Returns: The count of arrays in each mini-batch. Return type: int
-
data_length
¶ Get the total length of the data.
Returns: The total length of the data. Return type: int
-
data_shapes
¶ Get the shapes of the data in each mini-batch.
Returns: - The shapes of data in a mini-batch.
- The batch dimension is not included.
Return type: tuple[tuple[int]]
-
is_shuffled
¶ Whether or not the data are first shuffled before iterated through mini-batches?
-
skip_incomplete
¶ Whether or not to exclude the last mini-batch if it is incomplete?
-
-
class
tfsnippet.dataflow.
DataMapper
¶ Bases:
object
Base class for all data mappers.
A
DataMapper
is a callable object, which maps input arrays into outputs arrays. Instances ofDataMapper
are usually used as themapper
of atfsnippet.dataflow.MapperFlow
.-
__call__
(*arrays)¶ Transform the input arrays into outputs.
Parameters: *arrays – Arrays to be transformed. Returns: The output arrays. Return type: tuple[np.ndarray]
-
_transform
(*args)¶ Subclasses should override this to implement the transformation.
-
-
class
tfsnippet.dataflow.
SlidingWindow
(data_array, window_size)¶ Bases:
tfsnippet.dataflow.data_mappers.DataMapper
DataMapper
for producing sliding windows according to indices.Usage:
data = np.arange(1000) sw = SlidingWindow(data, window_size=100) # construct a DataFlow from this SlidingWindow sw_flow = sw.as_flow(batch_size=64) # or equivalently sw_flow = DataFlow.seq( 0, len(data) - sw.window_size + 1, batch_size=64).map(sw)
-
__init__
(data_array, window_size)¶ Construct a
SlidingWindow
.Parameters: - data_array (np.ndarray) – The array from which to extract sliding windows.
- window_size (int) – Size of each window.
-
_transform
(indices)¶ Subclasses should override this to implement the transformation.
-
as_flow
(batch_size, shuffle=False, skip_incomplete=False)¶ Get a
DataFlow
which iterates through mini-batches of sliding windows upondata_array
.Parameters: Returns: The data flow for sliding windows.
Return type:
-
data_array
¶ Get the data array.
-
window_size
¶ Get the window size.
-
-
class
tfsnippet.dataflow.
GatherFlow
(flows)¶ Bases:
tfsnippet.dataflow.base.DataFlow
Gathering multiple data flows into a single flow.
Usage:
x_flow = DataFlow.arrays([x], batch_size=256) y_flow = DataFlow.arrays([y], batch_size=256) xy_flow = DataFlow.gather([x_flow, y_flow])
-
__init__
(flows)¶ Construct an
IteratorFlow
.Parameters: flows (Iterable[DataFlow]) – The data flows to gather. At least one data flow should be specified, otherwise a
ValueError
will be raised.Raises: ValueError
– If not even one data flow is specified.TypeError
– If a specified flow is not aDataFlow
.
-
-
class
tfsnippet.dataflow.
IteratorFactoryFlow
(factory)¶ Bases:
tfsnippet.dataflow.base.DataFlow
Data flow constructed from an iterator factory.
Usage:
x_flow = DataFlow.arrays([x], batch_size=256) y_flow = DataFlow.arrays([y], batch_size=256) xy_flow = DataFlow.iterator_factory(lambda: ( (x, y) for (x,), (y,) in zip(x_flow, y_flow) ))
-
__init__
(factory)¶ Construct an
IteratorFlow
.Parameters: factory (() -> Iterator or Iterable) – A factory method for constructing the mini-batch iterators for each epoch.
-
-
class
tfsnippet.dataflow.
MapperFlow
(source, mapper, array_indices=None)¶ Bases:
tfsnippet.dataflow.base.DataFlow
Data flow which transforms the mini-batch arrays from source flow by a specified mapper function.
Usage:
source_flow = Data.arrays([x, y], batch_size=256) mapper_flow = source_flow.map(lambda x, y: (x + y,))
-
__init__
(source, mapper, array_indices=None)¶ Construct a
MapperFlow
.Parameters: - source (DataFlow) – The source data flow.
- mapper ((*np.ndarray) -> tuple[np.ndarray])) – The mapper function, which transforms numpy arrays into a tuple of other numpy arrays.
- array_indices (int or Iterable[int]) –
The indices of the arrays to be processed within a mini-batch.
If specified, will apply the mapper only on these selected arrays. This will require the mapper to produce exactly the same number of output arrays as the inputs.
If not specified, apply the mapper on all arrays, and do not require the number of output arrays to match the inputs.
-
array_indices
¶ Get the indices of the arrays to be processed.
-
source
¶ Get the source data flow.
-
-
class
tfsnippet.dataflow.
SeqFlow
(start, stop, step=1, batch_size=None, shuffle=False, skip_incomplete=False, dtype=<type 'numpy.int32'>, random_state=None)¶ Bases:
tfsnippet.dataflow.array_flow.ArrayFlow
Using number sequence as data source flow.
This
SeqFlow
is particularly used for generating the seed number indices, then fetch the actual data byMapperFlow
according to the seed numbers.Usage:
seq_flow = DataFlow.seq(0, len(x), batch_size=256) mapper_flow = seq_flow.map(lambda idx: np.stack( [fetch_data_by_index(i) for i in idx] ))
-
__init__
(start, stop, step=1, batch_size=None, shuffle=False, skip_incomplete=False, dtype=<type 'numpy.int32'>, random_state=None)¶ Construct a
SeqFlow
.Parameters: - start – The starting number of the sequence.
- stop – The ending number of the sequence.
- step – The step of the sequence. (default
1
) - batch_size – Batch size of the data flow. Required.
- shuffle (bool) – Whether or not to shuffle the numbers before
iterating? (default
False
) - skip_incomplete (bool) – Whether or not to exclude the last
mini-batch if it is incomplete? (default
False
) - dtype – Data type of the numbers. (default
np.int32
) - random_state (RandomState) – Optional numpy RandomState for
shuffling data before each epoch. (default
None
, use the globalRandomState
).
-
start
¶ Get the starting number of the sequence.
-
step
¶ Get the step of the sequence.
-
stop
¶ Get the ending number of the sequence.
-
-
class
tfsnippet.dataflow.
ThreadingFlow
(source, prefetch)¶ Bases:
tfsnippet.dataflow.base.DataFlow
,tfsnippet.utils.concepts.AutoInitAndCloseable
Data flow to prefetch from the source data flow in a background thread.
Usage:
array_flow = DataFlow.arrays([x, y], batch_size=256) with array_flow.threaded(prefetch=5) as df: for epoch in epochs: for batch_x, batch_y in df: ...
-
EPOCH_END
= <object object>¶ Object to mark an ending position of an epoch.
-
__init__
(source, prefetch)¶ Construct a
ThreadingFlow
.Parameters:
-
prefetch_num
¶ Get the number of batches to prefetch.
-
source
¶ Get the source data flow.
-