'How to retrieve file paths from a tf.data.Dataset created with from_tensor_slices() and shuffled after every epoch
First of all, I would like to say that this is my first question in stackOverflow, so I hope that the question as a whole respects the rules. I realize that the question is a bit long, but I would like to provide as much background and detail as possible .
I am currently developing a real-time image binary classification system based on Tensorflow 2.8.0 and I am quite new at it. Here are some of the peculiarities of the data that I have for the mentioned project:
- Too big to fit in memory: I have more than 200 GB of data. Keep in mind that I have labeled only a small portion of it, but I want to write code that could manage the whole dataset in the future.
- Some files are not directly compatible with Tensorflow: I have .FITS and .FIT files that cannot be opened directly with Tensorflow. Due to this issue, I use a library called Astropy to open these files.
- The classes are very unbalanced.
After reading the official documentation and tutorials, I thought that, in order to load, preprocess and feed data to my CNN, the best option was to build an input pipeline using the tf.data.Dataset class due to the ease of opening FITS files. My general procedure follows this idea:
- Get a list of file paths and split it into train, val and test partitions if desired.
- Create a tf.data.Dataset with the from_tensor_slices() method
- Shuffle the data (before the heavier reading and image processing operations)
- Read and process every path with map()
- Batch and prefetch
Here are some code fragments in case they help to understand my goal:
(...)
import config as cfg # Custom .py file
import tensorflow as tf
# x_train, x_val and x_test are previously split file paths lists
train_ds = tf.data.Dataset.from_tensor_slices([str(p) for p in x_train])
val_ds = tf.data.Dataset.from_tensor_slices([str(p) for p in x_val])
test_ds = tf.data.Dataset.from_tensor_slices([str(p) for p in x_test])
train_ds = configure_tf_ds(train_ds)
val_ds = configure_tf_ds(val_ds)
test_ds = configure_tf_ds(test_ds)
def configure_tf_ds(self, tf_ds, buf_size):
# reshuffle_each_iteration=True ensures that data is shuffled each time it is iterated
tf_ds = tf_ds.shuffle(buffer_size=cfg.SHUFFLE_BUF_SIZE, seed=cfg.seed, reshuffle_each_iteration=True)
tf_ds = tf_ds.map(lambda x: tf.py_function(self.process_path, [x], [self.img_dtpye, self.label_dtype]))
tf_ds = tf_ds.batch(self.batch_size)
tf_ds = tf_ds.prefetch(buffer_size=tf.data.AUTOTUNE)
return tf_ds
def process_path(self, file_path):
# Labels are extracted from the file path, not relevant for my problem
label = get_label(file_path)
path = bytes.decode(file_path.numpy()).lower()
img = None
# Open and process images depending on their file paths' extension: FITS, FIT, JPG
if "fit" in path:
img = tf.py_function(func=self.decode_fits, inp=[file_path], Tout=self.img_dtpye)
else:
img = tf.py_function(func=self.decode_img, inp=[file_path], Tout=self.img_dtpye)
return img, label
model.fit(train_ds, epochs=50, validation_data=val_ds)
# Then, I would like to obtain predictions, plot results, and so on but knowing which file paths I am working with
(...)
Following the previous idea, I have successfully created and tested different types of pipelines for different types of partitions of my dataset: unlabeled (remember that only a portion of the data is labeled), labeled and weighted labeled (I wanted to see if my models improve by specifying class weights when training).
However, in order to monitor results and make proper adjustments to my model, I would like to retrieve the usual predictions, real labels and images next to the file paths preserving the ability to shuffle the data after every epoch. I have managed to solve my question if I do not shuffle data with .shuffle(reshuffle_each_iteration=True), but models' performance is supposed to increase if data is shuffled after each epoch, according to several sources.
I have read different posts in stackOverflow related to my question. I will list those posts next to the problems that I have found for my particular use case:
- Solution 1: My dataset cannot be fed to the model as X, y because it is a tf.data.Dataset
- Solution 2: I want to obtain the image and the label too.
- Solution 3: This works, but it would not respect the expected tf.data.Dataset format in the future .fit() call as stated here:
A tf.data dataset. Should return a tuple of either (inputs, targets) or (inputs, targets, sample_weights)
I have also tried to keep a separate tf.data.Dataset with only the file paths but if I call the shuffle method with the reshuffle_each_iteration=True option in both tf.data.Dataset instances, the order of their elements does not match even if I set the same seed.
In short, is it possible to achieve what I want? If so, how should I proceed?
Thank you very much in advance.
Solution 1:[1]
Preprocess your data into three TFRecord files, one each for training, testing, and validation. Then you can shuffle and never cross records between the sets. This also speeds up data loading and can be done once and reused many times while playing with hyperparameters.
Here is an example of how you can preprocess and split your data. Your actual dataset data will have a different structure, this example has "encdata", a 2048-wide vector of vggface2 face encoding data. This assumes you have a single directory of data, with subdirectories named for a class and containing all the files for that class.
import tensorflow as tf
import numpy as np
import pickle
import sys
import os
# 80% to training, 10% to testing, 10% to validation
validation_portion = 1
testing_portion = 1
training_portion = 8
file_cycle_total = validation_portion + testing_portion + training_portion
# Where to store the TFRecord files
training_tfrecord_path = '/var/tmp/xtraining_tfrecords.tfr'
testing_tfrecord_path = '/var/tmp/xtesting_tfrecords.tfr'
validation_tfrecord_path = '/var/tmp/xvalidation_tfrecords.tfr'
# Where we keep the encodings
FACELIB_DIR='/aimiassd/Datasets/LabeledAstroFaces'
# Get list of all classes from all facelib dirs
classNames = sorted([x for x in os.listdir(FACELIB_DIR) if os.path.isdir(os.path.join(FACELIB_DIR,x)) and not x.startswith('.')])
classStrToInt = dict([(x,i) for i,x in enumerate(classNames)])
print('Found %d different classNames for labels\n' % len(classNames))
# Create our record writers
train_file_writer = tf.io.TFRecordWriter(training_tfrecord_path)
test_file_writer = tf.io.TFRecordWriter(testing_tfrecord_path)
val_file_writer = tf.io.TFRecordWriter(validation_tfrecord_path)
# Create a dataset of filenames of every enc2048 file in the facelibraries
cnt_records_written = [0,0,0]
for CN in classNames:
class_int = classStrToInt[CN]
# Get a list of all the encoding files
encfiles = sorted(filter((lambda x: x.endswith('.enc2048')), os.listdir(os.path.join(FACELIB_DIR, CN))))
# For each encoding file, read the encoding data and write it to the various tfrecords
for i, F in enumerate(encfiles):
file_path = os.path.join(FACELIB_DIR,CN,F)
with open(file_path,'rb') as fin:
encdata,_ = pickle.loads(fin.read()) # encodings, source_image_name
# Turn encdata into a tf.train.Example and serialize it for writing
record_bytes = tf.train.Example(features=tf.train.Features(feature={
"x": tf.train.Feature(float_list=tf.train.FloatList(value=encdata)),
"y": tf.train.Feature(int64_list=tf.train.Int64List(value=[class_int])),
})).SerializeToString()
# Write it out with the appropriate record writer
remainder = i % file_cycle_total
if remainder < validation_portion:
val_file_writer.write(record_bytes)
cnt_records_written[2] += 1
elif remainder < validation_portion + testing_portion:
test_file_writer.write(record_bytes)
cnt_records_written[1] += 1
else:
train_file_writer.write(record_bytes)
cnt_records_written[0] += 1
print('Writing records done.')
print('Wrote %d training, %d testing, %d validation records' %
(cnt_records_written[0], cnt_records_written[1], cnt_records_written[2]) )
train_file_writer.close()
test_file_writer.close()
val_file_writer.close()
print('Reading data back out...')
# Function to turn a serialized TFRecord back into a tf.train.Example
def decode_fn(record_bytes):
return tf.io.parse_single_example(
# Data
record_bytes,
# Schema
{"x": tf.io.FixedLenFeature([2048], dtype=tf.float32),
"y": tf.io.FixedLenFeature([], dtype=tf.int64)}
)
# Read and deserialize the datasets
train_ds = tf.data.TFRecordDataset([training_tfrecord_path]).map(decode_fn)
test_ds = tf.data.TFRecordDataset([ testing_tfrecord_path]).map(decode_fn)
validation_ds = tf.data.TFRecordDataset([validation_tfrecord_path]).map(decode_fn)
# Use a dataset
count = 0
for batch in tf.data.TFRecordDataset([training_tfrecord_path]).map(decode_fn):
print(batch)
count +=1
if count > 4:
sys.exit(0)
print('Done.')
Note how as the data is being process into TFRecords, it is alternately being written into the three datasets. Verify and Testing entries are written first, to ensure classes with very small amounts of samples still get something into the verify and testing datasets. This is controlled by the variables at the top, validation_portion, testing_portion, and training_portion, adjust per your preferences.
Finally, at the end, the TFRecords are re-read and used to build three new tf.data.Dataset, which can be fed to model.fit() and friends. The example code just prints four records to show the data is of the correct, original shape.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | RubinMac |