ThreadingFlow¶
-
class
tfsnippet.dataflows.
ThreadingFlow
(source, prefetch)¶ Bases:
tfsnippet.dataflows.base.DataFlow
,tfsnippet.utils.concepts.AutoInitAndCloseable
Data flow to prefetch from the source data flow in a background thread.
Usage:
array_flow = DataFlow.arrays([x, y], batch_size=256) with array_flow.threaded(prefetch=5) as df: for epoch in epochs: for batch_x, batch_y in df: ...
Attributes Summary
EPOCH_END
Object to mark an ending position of an epoch. current_batch
Get the the result of current batch (last call to next_batch()
, if the implicit iterator has been opened and the last call tonext_batch()
does not raise aStopIteration
).prefetch_num
Get the number of batches to prefetch. source
Get the source data flow. Methods Summary
arrays
(arrays, batch_size[, shuffle, …])Construct an ArrayFlow
.close
()Ensure the internal states are destroyed. gather
(flows)Gather multiple data flows into a single flow. get_arrays
()Iterate through the data-flow, collecting mini-batches into arrays. init
()Ensure the internal states are initialized. iterator_factory
(factory)Construct a IteratorFactoryFlow
.map
(mapper[, array_indices])Construct a MapperFlow
.next_batch
()Get the arrays of next mini-batch from the implicit iterator. select
(indices)Construct a DataFlow
, which selects and rearranges arrays in each mini-batch.seq
(start, stop[, step, batch_size, …])Construct a SeqFlow
.threaded
(prefetch)Construct a ThreadingFlow
from this flow.to_arrays_flow
(batch_size[, shuffle, …])Convert this data-flow to a ArrayFlow
.Attributes Documentation
-
EPOCH_END
= <object object>¶ Object to mark an ending position of an epoch.
-
current_batch
¶ Get the the result of current batch (last call to
next_batch()
, if the implicit iterator has been opened and the last call tonext_batch()
does not raise aStopIteration
).Returns: The arrays of current batch. Return type: tuple[np.ndarray] or None
-
prefetch_num
¶ Get the number of batches to prefetch.
-
source
¶ Get the source data flow.
Methods Documentation
-
static
arrays
(arrays, batch_size, shuffle=False, skip_incomplete=False, random_state=None)¶ Construct an
ArrayFlow
.Parameters: - arrays – List of numpy-like arrays, to be iterated through mini-batches. These arrays should be at least 1-d, with identical first dimension.
- batch_size (int) – Size of each mini-batch.
- shuffle (bool) – Whether or not to shuffle data before iterating?
(default
False
) - skip_incomplete (bool) – Whether or not to exclude the last
mini-batch if it is incomplete? (default
False
) - random_state (RandomState) – Optional numpy RandomState for
shuffling data before each epoch. (default
None
, construct a newRandomState
).
Returns: The data flow from arrays.
Return type: tfsnippet.dataflow.ArrayFlow
-
close
()¶ Ensure the internal states are destroyed.
-
static
gather
(flows)¶ Gather multiple data flows into a single flow.
Parameters: flows (Iterable[DataFlow]) – The data flows to gather. At least one data flow should be specified, otherwise a
ValueError
will be raised.Returns: The gathered data flow.
Return type: tfsnippet.dataflow.GatherFlow
Raises: ValueError
– If not even one data flow is specified.TypeError
– If a specified flow is not aDataFlow
.
-
get_arrays
()¶ Iterate through the data-flow, collecting mini-batches into arrays.
Returns: The collected arrays. Return type: tuple[np.ndarray] Raises: ValueError
– If this data-flow is empty.
-
init
()¶ Ensure the internal states are initialized.
-
static
iterator_factory
(factory)¶ Construct a
IteratorFactoryFlow
.Parameters: factory (() -> Iterator or Iterable) – A factory method for constructing the mini-batch iterators for each epoch. Returns: The data flow. Return type: tfsnippet.dataflow.IteratorFactoryFlow
-
map
(mapper, array_indices=None)¶ Construct a
MapperFlow
.Parameters: - mapper ((*np.ndarray) -> tuple[np.ndarray])) – The mapper function, which transforms numpy arrays into a tuple of other numpy arrays.
- array_indices (int or Iterable[int]) –
The indices of the arrays to be processed within a mini-batch.
If specified, will apply the mapper only on these selected arrays. This will require the mapper to produce exactly the same number of output arrays as the inputs.
If not specified, apply the mapper on all arrays, and do not require the number of output arrays to match the inputs.
Returns: The data flow with mapper applied.
Return type: tfsnippet.dataflow.MapperFlow
-
next_batch
()¶ Get the arrays of next mini-batch from the implicit iterator.
Returns: The arrays of mini-batch. Return type: tuple[np.ndarray] Raises: StopIteration
– If the implicit iterator is exhausted. Note that this error will only be triggered once at the end of an epoch. The next time calling this method, a new epoch will be opened.
-
select
(indices)¶ Construct a
DataFlow
, which selects and rearranges arrays in each mini-batch. For example:flow = DataFlow.arrays([x, y, z], batch_size=64) flow.select([0, 2, 0]) # selects ``(x, z, x)`` in each mini-batch
Parameters: indices (Iterable[int]) – The indices of arrays to select. Returns: The data flow with selected arrays in each mini-batch. Return type: DataFlow
-
static
seq
(start, stop, step=1, batch_size=None, shuffle=False, skip_incomplete=False, dtype=<type 'numpy.int32'>, random_state=None)¶ Construct a
SeqFlow
.Parameters: - start – The starting number of the sequence.
- stop – The ending number of the sequence.
- step – The step of the sequence. (default
1
) - batch_size – Batch size of the data flow. Required.
- shuffle (bool) – Whether or not to shuffle the numbers before
iterating? (default
False
) - skip_incomplete (bool) – Whether or not to exclude the last
mini-batch if it is incomplete? (default
False
) - dtype – Data type of the numbers. (default
np.int32
) - random_state (RandomState) – Optional numpy RandomState for
shuffling data before each epoch. (default
None
, construct a newRandomState
).
Returns: The data flow from number sequence.
Return type: tfsnippet.dataflow.SeqFlow
-
threaded
(prefetch)¶ Construct a
ThreadingFlow
from this flow.Parameters: prefetch (int) – Number of mini-batches to prefetch ahead. It should be at least 1. Returns: - The background threaded
- data flow to prefetch mini-batches from this flow.
Return type: tfsnippet.dataflow.ThreadingFlow
-
to_arrays_flow
(batch_size, shuffle=False, skip_incomplete=False, random_state=None)¶ Convert this data-flow to a
ArrayFlow
.This method will iterate through the data-flow, collecting mini-batches into arrays, and then construct an ArrayFlow.
Parameters: - batch_size (int) – Size of each mini-batch.
- shuffle (bool) – Whether or not to shuffle data before iterating?
(default
False
) - skip_incomplete (bool) – Whether or not to exclude the last
mini-batch if it is incomplete? (default
False
) - random_state (RandomState) – Optional numpy RandomState for
shuffling data before each epoch. (default
None
, construct a newRandomState
).
Returns: The constructed ArrayFlow.
Return type: tfsnippet.dataflow.ArrayFlow
-